In [1]:
import os
import pickle
from ase.db import connect
from ase.visualize import view

from GAMERNet.rnet.networks.reaction_network import ReactionNetwork
from GAMERNet.rnet.networks.surface import Surface

with open('../scripts/test_c1/rxn_net.pkl', 'rb') as pickle_file:
    content = pickle.load(pickle_file)  # dict of elementary reactions

rxn_net = ReactionNetwork().from_dict(content)
print(rxn_net)
graph = rxn_net.graph

ReactionNetwork(43 intermediates, 14 closed-shell molecules, 105 reactions)
Surface: Pd48(111)
Network Carbon cutoff: C1



# Fix shortest path detection

In [2]:
# steps from CO2(g) to MeOH(g)
x = rxn_net.get_shortest_path(['101101g', '020101g'], '141101g')
print(x)

ITERATION 1
101101*
101101g(CO(g))+000000*(Pd48*)<->101101*(CO*)
{'010101*', '101101g', '101101*', '020101*', '020101g', '000000*'}
ITERATION 2
100101*
101101*(CO*)+000000*(Pd48*)<->100101*(C*)+001101*(O*)
{'001101*', '102101*', '010101*', '101101g', '111102*', '100101*', '111101*', '101101*', '020101*', '020101g', '000000*'}
ITERATION 3
001101*
101101*(CO*)+000000*(Pd48*)<->100101*(C*)+001101*(O*)
{'001101*', '102101*', '010101*', '101101g', '110101*', '111102*', '100101*', '111101*', '101101*', '020101*', '020101g', '000000*'}
ITERATION 4
111101*
111101*(CHO*)+000000*(Pd48*)<->101101*(CO*)+010101*(H*)
{'102101*', '113101*', '110101*', '112101*', '111101*', '001101*', '114101*', '020101*', '020101g', '002101*', '011101*', '010101*', '101101g', '012101*', '002101g', '111102*', '100101*', '104101*', '113102*', '101101*', '103101*', '000000*', '112102*'}
ITERATION 5
111102*
000000*(Pd48*)+111102*(CHO*)<->101101*(CO*)+010101*(H*)
{'102101*', '113101*', '110101*', '112101*', '111101*', '12

In [3]:
rxn_net.write_dotgraph(".", 'HIGHLIGHT_test.png', highlight=x[1])

In [None]:
import networkx as nx
from collections import deque

def constrained_shortest_path(graph, source, target, intermediates_to_avoid):
    visited = set()
    queue = deque([(source, [source])])  # Each element of the queue is a tuple (node, path_so_far)

    while queue:
        current_node, path_so_far = queue.popleft()

        if current_node == target:
            return path_so_far

        visited.add(current_node)

        for neighbor in graph.neighbors(current_node):
            # Skip nodes that are not elementary reactions
            if 'type' in graph.nodes[neighbor] and graph.nodes[neighbor]['type'] != 'elementary_reaction':
                continue

            # Skip intermediates to avoid
            if neighbor in intermediates_to_avoid:
                continue
            
            if neighbor not in visited:
                queue.append((neighbor, path_so_far + [neighbor]))
                visited.add(neighbor)
                
    return None  # return None if no such path exists


In [None]:
import networkx as nx
from collections import deque

# Define your graph here
G = nx.Graph()
G.add_edges_from([
    ("A", "R1"),
    ("R1", "B"),
    ("B", "R2"),
    ("R2", "C"),
    ("B", "R3"),
    ("R3", "D"),
])

# Annotate nodes with their types
for node in ["A", "B", "C", "D"]:
    G.nodes[node]['type'] = 'intermediate'
for node in ["R1", "R2", "R3"]:
    G.nodes[node]['type'] = 'elementary_reaction'

def custom_shortest_path(rxn_net, graph, source, target):
    queue = deque([(source, [])])
    graph = graph.to_undirected()
    intermediates_visited = set()
    reactions_visited = set()
    while queue:
        current_node, path_so_far = queue.popleft()
        
        # Check if the target node is reached
        if current_node == target:
            return path_so_far

        intermediates_visited.add(current_node)

        for step in graph.neighbors(current_node):
            if step in reactions_visited:
                continue

            # Update the path based on the type of the neighbor
            inters = list(rxn_net.reactions[step].reactants) + list(rxn_net.reactions[step].products)
            unvisited_inters = [inter for inter in inters if inter not in intermediates_visited]
            if len(unvisited_inters) != 0:
                continue
            new_path = path_so_far + [step] if graph.nodes[step]['category'] not in ('ads', 'sur', 'gas') else path_so_far

            queue.append((step, new_path))
            visited.add(neighbor)

    return None  # Path not found

def custom_shortest_path(net, graph, source, target):
    visited = set()
    visited_inters = {'000000*', '010101*'}
    queue = deque([(source, [])])

    while queue:
        current_node, path_so_far = queue.popleft()
        
        # Check if the target node is reached
        if current_node == target:
            return path_so_far

        visited.add(current_node)
        if graph.nodes[current_node]['category'] in ('ads', 'sur', 'gas'):
            visited_inters.add(current_node)

        for neighbor in graph.neighbors(current_node):
            # Skip if already visited
            if neighbor in visited:
                continue

            if graph.nodes[neighbor]['category'] not in ('ads', 'sur', 'gas'):
                index = rxn_net.reactions[neighbor].index
                inters = list(net.reactions[index].reactants) + list(net.reactions[index].products)
                if not all([inter in visited_inters for inter in inters]):
                    continue

            # Update the path based on the type of the neighbor
            new_path = path_so_far + [neighbor] if graph.nodes[neighbor]['type'] == 'elementary_reaction' else path_so_far

            queue.append((neighbor, new_path))
            visited.add(neighbor)

    return None  # Path not found

def shortest_path_sm(graph, source, target):
    # select all nodes that are not intermediates
    visited_intermediates = set()
    rxn_nodes = [node for node in graph.nodes if graph.nodes[node]['category'] not in ('ads', 'sur', 'gas')]
    break_condition = lambda node: target in node
    cc_condition = lambda node: all([intermediate in visited_intermediates for intermediate in node])




In [None]:
nx_graph = rxn_net.graph

In [None]:
x = custom_shortest_path(rxn_net, nx_graph, '102101g', '141101g')
x

In [None]:
rxn_net.reactions

# Other

In [None]:
for inter in rxn_net.intermediates.values():
    if inter.closed_shell:
        print(inter.smiles)

In [None]:
rxn_net.add_eley_rideal('101101g', '001101*', '102101*')
print(rxn_net)

In [None]:
print(len(rxn_net.reactions))

In [None]:
print(len(rxn_net.reactions))

In [None]:
counter  =0 
for reaction in rxn_net.reactions:
    counter += 1
    print(counter, reaction.code, reaction.components, reaction.r_type)

In [None]:
for reaction in rxn_net.reactions:
    print(reaction.components)
    for component in reaction.components:
        for inter in component:
            print(inter.code)

In [None]:
print(len(rxn_net.intermediates))

In [None]:
closed_shell_atoms = []
for inter in rxn_net.intermediates.values():
    if inter.closed_shell == True:
        closed_shell_atoms.append(inter.molecule)
print(len(closed_shell_atoms))

In [None]:
closed_shell_atoms = []
for inter in rxn_net.intermediates.values():
    if inter.closed_shell == True:
        closed_shell_atoms.append(inter.molecule)
print(len(closed_shell_atoms))

In [None]:
closed_shell_atoms[0].get_chemical_symbols().count("H")

In [None]:
view(closed_shell_atoms)

In [None]:
y = rxn_net.gen_graph()
# y.remove_node("")
for node in y.nodes(data=True):
    print(node)
print(y)

In [None]:
rxn_net.surface.facet

In [None]:
rxn_net.write_dotgraph(".", 'OLIV_test.png', del_surf=True)

Look for intermediates with specified composition

In [None]:
rxn_net.search_inter_by_elements({'C':1, 'H':2, 'O':2})

Look for all elementary steps involving a specific intermediate

In [None]:
rxn_net.search_ts(["222101"])

In [None]:
types = []
for reaction in rxn_net.reactions:
    types.append(reaction.r_type)
print(set(types))

In [None]:
types = []
for inter in rxn_net.intermediates.values():
    types.append(inter.phase)
print(set(types))

# closed shell

In [None]:
def is_closed_shell_santi(self):
        """
        Check if a molecule CxHyOz is closed-shell or not.
        """
        graph = self.graph
        # print(graph.nodes()) list of node indexes, element symbol stored as "elem"
        molecule = self.molecule
        valence_electrons = {'C': 4, 'H': 1, 'O': 2}
        graph = graph.to_undirected()
        mol_composition = molecule.get_chemical_symbols()
        mol = {'C': mol_composition.count('C'), 'H': mol_composition.count('H'), 'O': mol_composition.count('O')} # CxHyOz

        if mol['C'] != 0 and mol['H'] == 0 and mol['O'] == 0: # Cx
                return False
        elif mol['C'] == 0 and mol['H'] != 0 and mol['O'] == 0: # Hy
                return True if mol['H'] == 2 else False
        elif mol['C'] == 0 and mol['H'] == 0 and mol['O'] != 0: # Oz
                return True if mol['O'] == 2 else False
        elif mol['C'] != 0 and mol['H'] == 0 and mol['O'] != 0: # CxOz
                return True if mol['C'] == 1 and mol['O'] in (1,2) else False
        elif mol['C'] != 0 and mol['H'] != 0: # CxHyOz (z can be zero)
            node_val = lambda graph: {node: (graph.degree(node), 
                                        valence_electrons.get(graph.nodes[node]["elem"], 0)) for node in graph.nodes()}
            num_unsaturated_nodes = lambda dict: len([node for node in dict.keys() if dict[node][0] < dict[node][1]])
            node_valence_dict = node_val(graph)
            if num_unsaturated_nodes(node_valence_dict): # all atoms are saturated
                return True
            elif num_unsaturated_nodes(node_valence_dict) == 1: # only one unsaturated atom
                return False
            else:
                saturation_condition = lambda dict: all(dict[node][0] == dict[node][1] for node in dict.keys())
                while saturation_condition(node_valence_dict) == False:
                    unsat_nodes = [node for node in node_valence_dict.keys() if node_valence_dict[node][0] < node_valence_dict[node][1]]
                    O_unsat_nodes = [node for node in unsat_nodes if graph.nodes[node]["elem"] == 'O']  # all oxygens unsaturated
                    if len(O_unsat_nodes) != 0: # only one unsaturated oxygen
                        for oxygen in O_unsat_nodes:
                            node_valence_dict[oxygen][0] += 1
                            # increase the valence of the oxygen neighbour by 1
                            for neighbour in graph.neighbors(oxygen): # only one neighbour
                                if node_valence_dict[neighbour][0] < node_valence_dict[neighbour][1]:
                                    node_valence_dict[neighbour][1] += 1
                                else:
                                    return False # O neighbour is saturated already
                    else: # CxHy
                         # select node with the highest degree
                        max_degree = max([node_valence_dict[node][0] for node in unsat_nodes])
                        max_degree_node = [node for node in unsat_nodes if node_valence_dict[node][0] == max_degree][0]
                        max_degree_node_unsat_neighbours = [neighbour for neighbour in graph.neighbors(max_degree_node) if neighbour in unsat_nodes]
                        if len(max_degree_node_unsat_neighbours) == 0: # all neighbours are saturated
                            return False
                        node_valence_dict[max_degree_node][0] += 1
                        node_valence_dict[max_degree_node_unsat_neighbours][0] += 1
                         
                            

                
        
        # # Getting the unsaturated nodes (if there are not unsaturated nodes, the molecule is closed-shell)
        # unsat_nodes = [node for node in graph.nodes() if graph.degree(node) < valence_electrons.get(graph.nodes[node]["elem"], 0)]

        # # If the graph only has Carbon as an element and not H or O, then it is open-shell
        # if not 'H' and 'O' in molecule.get_chemical_formula():
        #     print(f'System {molecule.get_chemical_formula()} is open-shell: only C atoms')
        #     return False 
        
        # # Specific case for O2
        # if not 'C' and 'H' in molecule.get_chemical_formula() and len(unsat_nodes) == 2:
        #     print(f'System {molecule.get_chemical_formula()} is closed-shell: Oxygen')
        #     return True 
        
        # # CO and CO2
        # if not 'H' in molecule.get_chemical_formula() and len(molecule.get_chemical_symbols()['C']) == 1 and len(molecule.get_chemical_symbols()['O']) in (1,2):
        #     print(f'System {molecule.get_chemical_formula()} is closed-shell: CO or CO2')
        #     return True
        
        # if unsat_nodes:
        #     # If the molecule has only one unsaturated node, then it is open-shell
        #     if len(unsat_nodes) == 1:
        #         print(f'System {molecule.get_chemical_formula()} is open-shell')
        #         return False 
        #     else:
        #         # Checking if there is one unsaturated node that does not have as neighbour another unsaturated node
        #         for node in unsat_nodes:
        #             # If the molecule has only one unsaturated node, then it is open-shell
        #             if not [n for n in graph.neighbors(node) if n in unsat_nodes]:
        #                 print(f'System {molecule.get_chemical_formula()} is open-shell: one node is unsaturated but does not have as neighbour another unsaturated node')
        #                 return False 
        #             else:
        #                 # Case for molecules where an unsaturated node is oxygen
        #                 if graph.nodes[node]["elem"] == 'O':
        #                     # Adding one bond order (valence electrons) to the oxygen node by adding it to the unsat_nodes list

In [None]:
is_closed_shell_santi(rxn_net.intermediates['121101'])