### **Imports**

In [19]:
from collections import deque
import regex as re
import heapq

# Graphs

## High-Level Description (Conceptual)

Goal: Provide a foundational graph structure to support the representation of directed relationships between abstract entities. This structure serves as a base for more advanced graph-based models used throughout the project, such as metabolic networks and sequence assembly.

Context and Motivation:Graphs are a central data structure in computational biology, data science, and computer science in general. A general-purpose graph class allows the reuse of core logic (vertex and edge management) across multiple use cases. By abstracting the graph operations, it becomes easier to construct and analyze specific types of networks (e.g., metabolic or genetic).

Approach:

### 1- Directed Graph Representation:

- The graph is internally represented as a Python dictionary.

- Each key is a node (vertex).

- Each value is a list of nodes to which the key has directed edges.

- This structure models directed graphs using adjacency lists, optimizing both space and lookup operations.

### 2- Dynamic Graph Construction:

- Vertices and edges can be added incrementally:

- add_vertex(v) ensures a node exists.

- add_edge(o, d) adds a directed edge from origin o to destination d.

- These methods prevent duplication and enforce graph consistency.

### 3- Data Import Flexibility:

- The class constructor can take a pre-existing dictionary representing a graph and automatically convert it into an internal structure.

- This is useful for building graphs from external data (e.g., reaction files, k-mer lists).

### 4- Auxiliary Search Function:

- A utility function is_in_tuple_list(tl, val) is included to check for the presence of a value in the first position of tuples within a list.

- This supports downstream graph-related algorithms, such as edge filtering or path reconstruction.

### 5- Extensibility:

- This graph class is deliberately minimal and modular, enabling further extension for:

- Weighted edges

- Bidirectional graphs

- Network traversal algorithms (DFS, BFS, etc.)

- Centrality measures or clustering

## Low-Level Description (Implementation)

### Graph Class:

- __init__(self, g=None)

Initializes the graph as an empty dictionary.

If a dictionary g is provided, iterates through it to insert all nodes and directed edges using internal methods.

- add_vertex(self, v)

Adds a vertex v to the graph if it is not already present.

Initializes its adjacency list as an empty list.

- add_edge(self, o, d)

Adds a directed edge from vertex o to vertex d.

Ensures both vertices exist in the graph (calling add_vertex if necessary).

Appends d to o’s adjacency list, unless already present.

### Utility Function:

- is_in_tuple_list(tl, val)

Receives a list of tuples tl and a value val.

Returns True if any tuple in the list has val as its first element; otherwise returns False.

Useful for filtering or checking connections in edge-based datasets.



In [20]:
def is_in_tuple_list(tl, val):
    """
    Checks whether a value exists as the first element in any tuple in a list.

    :param tl: List of tuples.
    :param val: Value to search for.
    :return: True if val is found as the first element in any tuple, False otherwise.
    """
    for (x, _) in tl:
        if val == x:
            return True
    return False


class Graph:
    """
    Class representing a directed, unweighted graph using adjacency lists.
    """

    def __init__(self, g=None):
        """
        Initializes the graph. If a dictionary `g` is provided, adds its vertices and edges.

        :param g: Optional dictionary representing the graph (e.g., {'A': ['B', 'C']}).
        """
        self.graph = {}
        if g:
            for v, neighbors in g.items():
                self.add_vertex(v)
                for d in neighbors:
                    self.add_edge(v, d)

    def add_vertex(self, v):
        """
        Adds a vertex to the graph if not already present.

        :param v: Vertex label.
        """
        if v not in self.graph:
            self.graph[v] = []

    def add_edge(self, o, d):
        """
        Adds a directed edge from vertex `o` to vertex `d`.

        :param o: Origin vertex.
        :param d: Destination vertex.
        """
        self.add_vertex(o)
        self.add_vertex(d)
        if d not in self.graph[o]:
            self.graph[o].append(d)

    def print_graph(self):
        """
        Prints the adjacency list representation of the graph.
        """
        for v in self.graph:
            print(v, "->", self.graph[v])

    def size(self):
        """
        Returns the number of vertices and edges in the graph.

        :return: Tuple (number of nodes, number of edges).
        """
        return len(self.get_nodes()), len(self.get_edges())

    def get_nodes(self):
        """
        Returns a list of all vertices in the graph.

        :return: List of vertex labels.
        """
        return list(self.graph.keys())

    def get_edges(self):
        """
        Returns a list of all edges in the graph as (origin, destination) tuples.

        :return: List of edge tuples.
        """
        edges = []
        for v in self.graph:
            for d in self.graph[v]:
                edges.append((v, d))
        return edges

    def get_successors(self, v):
        """
        Returns the outgoing neighbors (successors) of a vertex.

        :param v: Vertex label.
        :return: List of successors.
        """
        if v not in self.graph:
            raise KeyError(f"Node {v} not found.")
        
        return self.graph.get(v, [])

    def get_predecessors(self, v):
        """
        Returns the incoming neighbors (predecessors) of a vertex.

        :param v: Vertex label.
        :return: List of predecessors.
        """
        if v not in self.graph:
            raise KeyError(f"Node {v} not found.")
        
        return [u for u in self.graph if v in self.graph[u]]

    def get_adjacents(self, v):
        """
        Returns all adjacent vertices (union of predecessors and successors).

        :param v: Vertex label.
        :return: List of adjacent vertices.
        """
        return list(set(self.get_successors(v)) | set(self.get_predecessors(v)))

    def in_degree(self, v):
        """
        Returns the number of incoming edges for a vertex.

        :param v: Vertex label.
        :return: In-degree.
        """
        if v not in self.graph:
            raise KeyError(f"Node {v} not found.")
        
        return len(self.get_predecessors(v))

    def out_degree(self, v):
        """
        Returns the number of outgoing edges for a vertex.

        :param v: Vertex label.
        :return: Out-degree.
        """
        if v not in self.graph:
            raise KeyError(f"Node {v} not found.")
        
        return len(self.get_successors(v))

    def degree(self, v):
        """
        Returns the total degree (in + out) of a vertex.

        :param v: Vertex label.
        :return: Total degree.
        """
        
        return self.in_degree(v) + self.out_degree(v)

    def reachable_bfs(self, v):
        """
        Returns all vertices reachable from `v` using Breadth-First Search.

        :param v: Starting vertex.
        :return: List of reachable vertices.
        """
        queue = [v]
        visited = set()
        while queue:
            node = queue.pop(0)
            for neighbor in self.get_successors(node):
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append(neighbor)
        return list(visited)

    def reachable_dfs(self, v):
        """
        Returns all vertices reachable from `v` using Depth-First Search.

        :param v: Starting vertex.
        :return: List of reachable vertices.
        """
        stack = [v]
        visited = set()
        while stack:
            node = stack.pop()
            for neighbor in reversed(self.get_successors(node)):
                if neighbor not in visited:
                    visited.add(neighbor)
                    stack.append(neighbor)
        return list(visited)

    def distance(self, s, d):
        """
        Computes the shortest distance between `s` and `d` using BFS.

        :param s: Source vertex.
        :param d: Destination vertex.
        :return: Distance (int) or None if unreachable.
        """
        if s == d:
            return 0
        queue = [(s, 0)]
        visited = set([s])
        while queue:
            node, dist = queue.pop(0)
            for neighbor in self.get_successors(node):
                if neighbor == d:
                    return dist + 1
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append((neighbor, dist + 1))
        return None

    def shortest_path(self, s, d):
        """
        Finds the shortest path from `s` to `d` using BFS.

        :param s: Source vertex.
        :param d: Destination vertex.
        :return: List of vertices forming the path, or None if no path.
        """
        if s == d:
            return [s]
        queue = [(s, [])]
        visited = set([s])
        while queue:
            node, path = queue.pop(0)
            for neighbor in self.get_successors(node):
                if neighbor == d:
                    return path + [node, neighbor]
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append((neighbor, path + [node]))
        return None

    def reachable_with_dist(self, s):
        """
        Returns all vertices reachable from `s` with their respective distances.

        :param s: Starting vertex.
        :return: List of tuples (vertex, distance).
        """
        res = []
        l = [(s, 0)]
        while l:
            node, dist = l.pop(0)
            if node != s:
                res.append((node, dist))
            for elem in self.get_successors(node):
                if not is_in_tuple_list(l, elem) and not is_in_tuple_list(res, elem):
                    l.append((elem, dist + 1))
        return res

    def node_has_cycle(self, v):
        """
        Checks if there's a cycle starting and ending at vertex `v`.

        :param v: Vertex label.
        :return: True if cycle found, else False.
        """
        queue = [v]
        visited = set([v])
        while queue:
            node = queue.pop(0)
            for neighbor in self.get_successors(node):
                if neighbor == v:
                    return True
                if neighbor not in visited:
                    visited.add(neighbor)
                    queue.append(neighbor)
        return False

    def has_cycle(self):
        """
        Checks if the graph has any cycles.

        :return: True if the graph contains a cycle, else False.
        """
        return any(self.node_has_cycle(v) for v in self.graph)

    def visualize(self):
        """
        Generates a PNG visualization of the graph using Graphviz.
        The output image is saved and opened.
        """
        dot = graphviz.Digraph(comment='Graph', format='png')
        for v in self.graph:
            dot.node(v)
        for v in self.graph:
            for d in self.graph[v]:
                dot.edge(v, d)
        dot.render('output/unweighted_graph', view=True)


class WeightedGraph(Graph):
    """
    Class representing a directed, weighted graph. Inherits from Graph.
    """

    def __init__(self, g=None):
        """
        Initializes the weighted graph. Accepts a dictionary in the form:
        {'A': [('B', weight), ('C', weight)]}.

        :param g: Optional weighted graph representation.
        """
        self.graph = {}
        if g:
            for v, neighbors in g.items():
                self.add_vertex(v)
                for d, w in neighbors:
                    self.add_edge(v, d, w)

    def add_edge(self, o, d, w):
        """
        Adds a weighted edge from vertex `o` to vertex `d`.If there is already one, updates with new weight.

        :param o: Origin vertex.
        :param d: Destination vertex.
        :param w: Edge weight (numeric).
        """
        if not isinstance(w,int) :
            raise TypeError(f"Weight {w} not valid.")
        
        self.add_vertex(o)
        self.add_vertex(d)

        for i, (dest, weight) in enumerate(self.graph[o]):
            if dest == d:
                self.graph[o][i] = (d, w)
                return
            
        self.graph[o].append((d, w))

    def get_edges(self):
        """
        Returns a list of all edges with weights.

        :return: List of tuples (origin, destination, weight).
        """
        edges = []
        for v in self.graph:
            for d, w in self.graph[v]:
                edges.append((v, d, w))
        return edges

    def get_successors(self, v):
        """
        Returns the outgoing neighbors of a vertex, ignoring weights.

        :param v: Vertex label.
        :return: List of successor vertices.
        """
        if v not in self.graph:
            raise KeyError(f"Node {v} not found.")
        
        return [d for d, _ in self.graph.get(v, [])]

    def dijkstra(self, start):
        """
        Computes shortest paths from `start` to all other vertices using Dijkstra's algorithm.

        :param start: Starting vertex.
        :return: Dictionary mapping each vertex to its shortest distance from `start`.
        """
        distances = {v: float('inf') for v in self.graph}
        distances[start] = 0
        heap = [(0, start)]
        visited = set()

        while heap:
            dist_u, u = heapq.heappop(heap)
            if u in visited:
                continue
            visited.add(u)
            for v, weight in self.graph[u]:
                alt = dist_u + weight
                if alt < distances[v]:
                    distances[v] = alt
                    heapq.heappush(heap, (alt, v))
        return distances

    def visualize(self):
        """
        Generates a PNG visualization of the weighted graph using Graphviz.
        Each edge is labeled with its weight.
        """
        dot = graphviz.Digraph(comment='Weighted Graph', format='png')
        for v in self.graph:
            dot.node(v)
        for v in self.graph:
            for d, w in self.graph[v]:
                dot.edge(v, d, label=str(w))
        dot.render('output/weighted_graph', view=True)

In [21]:
print("=== Unweighted Graph ===")
g1 = {
    'A': ['B', 'C'],
    'B': ['D'],
    'C': ['D'],
    'D': []
}

G1 = Graph(g1)
G1.print_graph()
print("Nodes:", G1.get_nodes())
print("Edges:", G1.get_edges())
print("Size:", G1.size())
print("Successors of A:", G1.get_successors("A"))
print("Predecessors of B:", G1.get_predecessors("B"))
print("Adjacents of C:", G1.get_adjacents("C"))
print("Out-degree of B:", G1.out_degree("B"))
print("In-degree of C:", G1.in_degree("C"))
print("Degree of A:", G1.degree("A"))
print("Shortest path from A to D:", G1.shortest_path('A', 'D'))
print("Distance from A to D:", G1.distance("A", "D"))
print("Reachable from A (BFS):", G1.reachable_bfs('A'))
print("Reachable from A (DFS):", G1.reachable_dfs('A'))
print("Graph has cycle:", G1.has_cycle())
print()

=== Unweighted Graph ===
A -> ['B', 'C']
B -> ['D']
C -> ['D']
D -> []
Nodes: ['A', 'B', 'C', 'D']
Edges: [('A', 'B'), ('A', 'C'), ('B', 'D'), ('C', 'D')]
Size: (4, 4)
Successors of A: ['B', 'C']
Predecessors of B: ['A']
Adjacents of C: ['A', 'D']
Out-degree of B: 1
In-degree of C: 1
Degree of A: 2
Shortest path from A to D: ['A', 'B', 'D']
Distance from A to D: 2
Reachable from A (BFS): ['C', 'D', 'B']
Reachable from A (DFS): ['C', 'D', 'B']
Graph has cycle: False



In [22]:
print("=== Weighted Graph ===")
g2 = {
    'A': [('B', 1), ('C', 4)],
    'B': [('C', 2), ('D', 5)],
    'C': [('D', 1)],
    'D': []
}

G2 = WeightedGraph(g2)
G2.print_graph()
print("Nodes:", G2.get_nodes())
print("Edges:", G2.get_edges())
print("Size:", G2.size())
print("Successors of A:", G2.get_successors("A"))
print("Predecessors of B:", G2.get_predecessors("B"))
print("Adjacents of C:", G2.get_adjacents("C"))
print("Out-degree of B:", G2.out_degree("B"))
print("In-degree of C:", G2.in_degree("C"))
print("Degree of A:", G2.degree("A"))
print("Dijkstra distances from A:", G2.dijkstra('A'))
print("Reachable from A (DFS):", G2.reachable_dfs('A'))
print("Graph has cycle:", G2.has_cycle())

=== Weighted Graph ===
A -> [('B', 1), ('C', 4)]
B -> [('C', 2), ('D', 5)]
C -> [('D', 1)]
D -> []
Nodes: ['A', 'B', 'C', 'D']
Edges: [('A', 'B', 1), ('A', 'C', 4), ('B', 'C', 2), ('B', 'D', 5), ('C', 'D', 1)]
Size: (4, 5)
Successors of A: ['B', 'C']
Predecessors of B: []
Adjacents of C: ['D']
Out-degree of B: 2
In-degree of C: 0
Degree of A: 2
Dijkstra distances from A: {'A': 0, 'B': 1, 'C': 3, 'D': 4}
Reachable from A (DFS): ['C', 'D', 'B']
Graph has cycle: False


## Metabolic Networks

### High-Level Description (Conceptual)

**Goal**: Model and analyze a metabolic network to understand the role of each metabolite and simulate biochemical propagation.

**Approach**:

1. **Input Data (`ecoli.txt`)**:
   * Each line describes a biochemical reaction.
   * Format: `ReactionID: Substrate1 + Substrate2 => Product1 + Product2`
   * Parsed into structured dictionaries containing substrates and products.

2. **Metabolic Graph Construction**:
   * Nodes represent **metabolites**.
   * Edges connect metabolites that co-occur in the same reaction.
   * Built using a custom graph class (`MN_Graph`).

3. **Centrality Analysis**:
   * Use `CentralityAnalyzer` to evaluate node importance.
   * Metrics:
     - **Degree Centrality**: number of direct connections.
     - **Closeness Centrality**: inverse of average distance to reachable nodes.
     - **Betweenness Centrality**: frequency of appearing on shortest paths.

4. **Metabolite Propagation**:
   * Start with a set of known metabolites.
   * Iteratively:
     - Identify **active reactions** (all substrates available).
     - Add newly produced products to the known set.
   * Continues until no new metabolites are produced.

5. **Output**:
   * Ranked lists of the most central metabolites.
   * Complete set of metabolites producible from the initial input.

---
### Low-Level Description (Implementation)

**Parsing & Graph Building**:

* `parse_reactions(filepath)`
  - Reads and parses `ecoli.txt` into a list of reaction dictionaries:
    ```python
    {
      "id": "R1",
      "substrates": ["A", "B"],
      "products": ["C", "D"]
    }
    ```

* `build_metabolite_graph(reactions)`
  - Constructs an undirected graph connecting metabolites that appear together in reactions.

**Centrality Analysis**:

* `CentralityAnalyzer` class:
  - `degree_centrality()`: Count of successors per node.
  - `closeness_centrality()`: Based on BFS distances.
  - `betweenness_centrality()`: Uses Brandes’ algorithm.
  - `top_nodes(centrality_dict, top_n)`: Ranks nodes by centrality value.

**Metabolite Propagation Simulation**:

* `get_active_reactions(metabolites, reactions)`
  - Filters reactions where all substrates are currently available.

* `get_produced_metabolites(active_reactions)`
  - Collects products from the currently active reactions.

* `compute_final_metabolites(initial_metabs, reactions)`
  - Iteratively expands the set of known metabolites until saturation.# Redes Metabólicas


In [23]:
class MN_Graph(Graph):
    """
    Metabolite Network Graph extending a base Graph class with additional methods 
    for degree analysis, centrality, clustering, and network metrics.
    """

    def all_degrees(self, deg_type="inout"):
        """
        Compute the degree of each node based on the specified type.

        Parameters:
            deg_type (str): "in", "out", or "inout" (default) indicating degree type.

        Returns:
            dict: Mapping of node to degree count.
        """
        degs = {}
        for v in self.graph:
            if deg_type == "out" or deg_type == "inout":
                degs[v] = len(self.graph[v])
            else:
                degs[v] = 0
        if deg_type == "in" or deg_type == "inout":
            for v in self.graph:
                for d in self.graph[v]:
                    if deg_type == "in" or v not in self.graph[d]:
                        degs[d] = degs.get(d, 0) + 1
        return degs

    def highest_degrees(self, all_deg=None, deg_type="inout", top=10):
        """
        Return nodes with the highest degrees.

        Parameters:
            all_deg (dict, optional): Precomputed degree dict.
            deg_type (str): Degree type.
            top (int): Number of top nodes to return.

        Returns:
            list: Top nodes sorted by degree.
        """
        if all_deg is None:
            all_deg = self.all_degrees(deg_type)
        ord_deg = sorted(all_deg.items(), key=lambda x: x[1], reverse=True)
        return [x[0] for x in ord_deg[:top]]

    def mean_degree(self, deg_type="inout"):
        """
        Compute the mean degree of all nodes.

        Parameters:
            deg_type (str): Degree type.

        Returns:
            float: Mean degree.
        """
        degs = self.all_degrees(deg_type)
        return sum(degs.values()) / float(len(degs))

    def prob_degree(self, deg_type="inout"):
        """
        Calculate the degree distribution as a probability.

        Parameters:
            deg_type (str): Degree type.

        Returns:
            dict: Mapping of degree value to probability.
        """
        degs = self.all_degrees(deg_type)
        res = {}
        for k in degs:
            res[degs[k]] = res.get(degs[k], 0) + 1
        for k in res:
            res[k] /= float(len(degs))
        return res

    def mean_distances(self):
        """
        Compute mean shortest path length and density.

        Returns:
            tuple: (mean distance, network density)
        """
        tot = 0
        num_reachable = 0
        for k in self.get_nodes():
            distsk = self.reachable_with_dist(k)
            for _, dist in distsk:
                tot += dist
            num_reachable += len(distsk)
        meandist = float(tot) / num_reachable if num_reachable else 0
        n = len(self.get_nodes())
        density = float(num_reachable) / ((n - 1) * n) if n > 1 else 0
        return meandist, density

    def closeness_centrality(self, node):
        """
        Compute closeness centrality of a node.

        Parameters:
            node: Target node.

        Returns:
            float: Closeness centrality.
        """
        dist = self.reachable_with_dist(node)
        if len(dist) == 0:
            return 0.0
        s = sum(d[1] for d in dist)
        return len(dist) / s

    def highest_closeness(self, top=10):
        """
        Return nodes with the highest closeness centrality.

        Parameters:
            top (int): Number of top nodes.

        Returns:
            list: Top nodes.
        """
        cc = {k: self.closeness_centrality(k) for k in self.get_nodes()}
        ord_cl = sorted(cc.items(), key=lambda x: x[1], reverse=True)
        return [x[0] for x in ord_cl[:top]]

    def betweenness_centrality(self, node):
        """
        Estimate betweenness centrality using path enumeration.

        Parameters:
            node: Target node.

        Returns:
            float: Betweenness score.
        """
        total_sp = 0
        sps_with_node = 0
        for s in self.get_nodes():
            for t in self.get_nodes():
                if s != t and s != node and t != node:
                    sp = self.shortest_path(s, t)
                    if sp:
                        total_sp += 1
                        if node in sp:
                            sps_with_node += 1
        return sps_with_node / total_sp if total_sp > 0 else 0.0

    def clustering_coef(self, v):
        """
        Calculate local clustering coefficient for a node.

        Parameters:
            v: Node ID.

        Returns:
            float: Clustering coefficient.
        """
        adjs = self.get_adjacents(v)
        if len(adjs) <= 1:
            return 0.0
        ligs = 0
        for i in adjs:
            for j in adjs:
                if i != j and (j in self.get_successors(i) or i in self.get_successors(j)):
                    ligs += 1
        return float(ligs) / (len(adjs) * (len(adjs) - 1))

    def all_clustering_coefs(self):
        """
        Compute clustering coefficients for all nodes.

        Returns:
            dict: Node to coefficient mapping.
        """
        return {k: self.clustering_coef(k) for k in self.get_nodes()}

    def mean_clustering_coef(self):
        """
        Compute mean clustering coefficient.

        Returns:
            float: Mean coefficient.
        """
        ccs = self.all_clustering_coefs()
        return sum(ccs.values()) / float(len(ccs)) if ccs else 0.0

    def mean_clustering_perdegree(self, deg_type="inout"):
        """
        Average clustering coefficient grouped by node degree.

        Parameters:
            deg_type (str): Degree type.

        Returns:
            dict: Degree to average coefficient.
        """
        degs = self.all_degrees(deg_type)
        ccs = self.all_clustering_coefs()
        degs_k = {}
        for k in degs:
            degs_k.setdefault(degs[k], []).append(k)
        ck = {}
        for k in degs_k:
            tot = sum(ccs[v] for v in degs_k[k])
            ck[k] = tot / len(degs_k[k])
        return ck


class CentralityAnalyzer:
    """
    Analyzer class for computing centrality metrics on a graph.
    """

    def __init__(self, graph):
        """
        Initialize with a graph instance.
        """
        self.graph = graph

    def degree_centrality(self):
        """
        Calculate degree centrality for all nodes.

        Returns:
            dict: Node to centrality value.
        """
        return {node: len(self.graph.get_successors(node)) for node in self.graph.get_nodes()}

    def closeness_centrality(self):
        """
        Compute closeness centrality for all nodes.

        Returns:
            dict: Node to centrality value.
        """
        centrality = {}
        for node in self.graph.get_nodes():
            total_dist, reachable_count = self._bfs_total_distance_and_reach_count(node)
            centrality[node] = (reachable_count / total_dist) if total_dist > 0 else 0.0
        return centrality

    def _bfs_total_distance_and_reach_count(self, start):
        """
        Helper to perform BFS and compute distance metrics.

        Returns:
            tuple: (total distance, reachable node count)
        """
        visited = set()
        queue = deque([(start, 0)])
        total = 0
        reachable_count = 0

        while queue:
            node, dist = queue.popleft()
            if node not in visited:
                visited.add(node)
                if node != start:
                    total += dist
                    reachable_count += 1
                for neighbor in self.graph.get_successors(node):
                    if neighbor not in visited:
                        queue.append((neighbor, dist + 1))
        return total, reachable_count

    def betweenness_centrality(self):
        """
        Calculate betweenness centrality for all nodes using Brandes' algorithm.

        Returns:
            dict: Node to centrality value.
        """
        centrality = dict.fromkeys(self.graph.get_nodes(), 0.0)
        for s in self.graph.get_nodes():
            stack = []
            pred = {w: [] for w in self.graph.get_nodes()}
            sigma = dict.fromkeys(self.graph.get_nodes(), 0)
            dist = dict.fromkeys(self.graph.get_nodes(), -1)
            sigma[s] = 1
            dist[s] = 0
            queue = deque([s])

            while queue:
                v = queue.popleft()
                stack.append(v)
                for w in self.graph.get_successors(v):
                    if dist[w] < 0:
                        dist[w] = dist[v] + 1
                        queue.append(w)
                    if dist[w] == dist[v] + 1:
                        sigma[w] += sigma[v]
                        pred[w].append(v)

            delta = dict.fromkeys(self.graph.get_nodes(), 0)
            while stack:
                w = stack.pop()
                for v in pred[w]:
                    delta[v] += (sigma[v] / sigma[w]) * (1 + delta[w])
                if w != s:
                    centrality[w] += delta[w]
        return centrality

    def top_nodes(self, centrality_dict, top_n=5):
        """
        Return top N nodes by centrality score.

        Parameters:
            centrality_dict (dict): Centrality mapping.
            top_n (int): Number of top nodes.

        Returns:
            list: Top node-score pairs.
        """
        return heapq.nlargest(top_n, centrality_dict.items(), key=lambda x: x[1])


# REACTION PARSER
def parse_reactions(file_path):
    """
    Parses reactions from file into structured dicts.

    Parameters:
        file_path (str): Path to the reaction file.

    Returns:
        list: List of reaction dicts with substrates and products.
    """
    reactions = []
    with open(file_path, 'r') as f:
        for line in f:
            if ':' not in line:
                continue
            parts = re.split(r':\s*', line.strip(), maxsplit=1)
            if len(parts) != 2:
                continue
            reaction_id, formula = parts
            match = re.search(r"^(.*?)\s*(<=>|=>)\s*(.*?)$", formula)
            if not match:
                continue
            substrates = [m.strip() for m in match.group(1).split('+')]
            products = [m.strip() for m in match.group(3).split('+')]
            reactions.append({
                'id': reaction_id,
                'substrates': substrates,
                'products': products
            })
    return reactions


def build_metabolite_graph(reactions):
    """
    Builds a graph where edges represent shared participation in a reaction.

    Parameters:
        reactions (list): List of parsed reactions.

    Returns:
        MN_Graph: Constructed metabolite graph.
    """
    g = MN_Graph()
    for r in reactions:
        metabolites = r['substrates'] + r['products']
        for i in range(len(metabolites)):
            for j in range(i + 1, len(metabolites)):
                g.add_edge(metabolites[i], metabolites[j])
    return g


# ASSESSMENT FUNCTIONS
def get_active_reactions(metabolites_set, reactions):
    """
    Get reactions whose substrates are all in the known metabolite set.

    Parameters:
        metabolites_set (set): Set of available metabolites.
        reactions (list): List of all reactions.

    Returns:
        list: Active reactions.
    """
    return [r for r in reactions if all(sub in metabolites_set for sub in r['substrates'])]

def get_produced_metabolites(active_reactions):
    """
    Get products produced by active reactions.

    Parameters:
        active_reactions (list): List of active reactions.

    Returns:
        set: Produced metabolites.
    """
    produced = set()
    for r in active_reactions:
        produced.update(r['products'])
    return produced

def compute_final_metabolites(initial_metabolites, reactions):
    """
    Iteratively compute the full set of metabolites that can be produced.

    Parameters:
        initial_metabolites (set): Starting set of metabolites.
        reactions (list): All reactions.

    Returns:
        set: Complete reachable metabolite set.
    """
    known_metabolites = set(initial_metabolites)
    while True:
        active = get_active_reactions(known_metabolites, reactions)
        new = get_produced_metabolites(active)
        if new.issubset(known_metabolites):
            break
        known_metabolites.update(new)
    return known_metabolites

In [24]:
# Define the path to the reactions file
file_path = "ecoli.txt"

# Parse the reaction data from the file
reactions = parse_reactions(file_path)
print("Reactions parsed:", len(reactions))


# --- Centrality Analysis Section ---

# Build a graph where metabolites are nodes and shared participation forms edges
g = build_metabolite_graph(reactions)

# Initialize the centrality analyzer with the graph
analyzer = CentralityAnalyzer(g)

# --- Degree Centrality ---
# Degree centrality is calculated as the number of direct successors (out-degree)
print("\n--- Degree Centrality ---")
for node, val in analyzer.top_nodes(analyzer.degree_centrality()):
    print(f"{node}: {val}")

# --- Closeness Centrality ---
# Closeness centrality is based on the average shortest distance to all reachable nodes
print("\n--- Closeness Centrality ---")
for node, val in analyzer.top_nodes(analyzer.closeness_centrality()):
    print(f"{node}: {val:.4f}")

# --- Betweenness Centrality ---
# Betweenness centrality quantifies how often a node appears on shortest paths
print("\n--- Betweenness Centrality ---")
for node, val in analyzer.top_nodes(analyzer.betweenness_centrality()):
    print(f"{node}: {val:.4f}")

# --- Metabolite Propagation Section ---

# Define the set of initial metabolites available at the start
initial_metabs = ["M_glc_DASH_D_c", "M_h2o_c", "M_nad_c", "M_atp_c"]

# Get active reactions based on the initial metabolites
active_reactions = get_active_reactions(initial_metabs, reactions)

# Get produced metabolites based on the active reactions
produced_metabolites = get_produced_metabolites(active_reactions)

# Compute all metabolites that can be reached via reactions starting from the initial set
final_metabs = compute_final_metabolites(initial_metabs, reactions)

# Output the results
print("\n--- Initial Metabolites ---")
print(initial_metabs)

print("\n--- Active Reactions ---")
print(active_reactions)

print("\n--- Final Reachable Metabolites ---")
print(sorted(final_metabs))

Reactions parsed: 931

--- Degree Centrality ---
M_atp_c: 234
M_h2o_c: 211
M_h_c: 196
M_pi_c: 132
M_h_e: 98

--- Closeness Centrality ---
M_12ppd_DASH_S_e: 1.0000
M_h2o_c: 0.5682
M_h_c: 0.5672
M_atp_c: 0.5637
M_pi_c: 0.5192

--- Betweenness Centrality ---
M_h_c: 216934.6170
M_h2o_c: 130880.9425
M_atp_c: 73977.9868
M_pi_c: 41413.1577
M_h_e: 38528.6860

--- Initial Metabolites ---
['M_glc_DASH_D_c', 'M_h2o_c', 'M_nad_c', 'M_atp_c']

--- Active Reactions ---
[{'id': 'R_NTPP6', 'substrates': ['M_atp_c', 'M_h2o_c'], 'products': ['M_ppi_c', 'M_amp_c', 'M_h_c']}, {'id': 'R_ATPM', 'substrates': ['M_atp_c', 'M_h2o_c'], 'products': ['M_pi_c', 'M_adp_c', 'M_h_c']}, {'id': 'R_HEX1', 'substrates': ['M_glc_DASH_D_c', 'M_atp_c'], 'products': ['M_g6p_c', 'M_adp_c', 'M_h_c']}, {'id': 'R_NADDP', 'substrates': ['M_nad_c', 'M_h2o_c'], 'products': ['M_nmn_c', 'M_amp_c', 'M_h_c']}, {'id': 'R_NADK', 'substrates': ['M_nad_c', 'M_atp_c'], 'products': ['M_nadp_c', 'M_adp_c', 'M_h_c']}, {'id': 'R_ADNCYC', 'subst

## Genome Assembly

### High-Level Description (Conceptual)

**Goal**: Reconstruct the original DNA sequence from a list of overlapping fragments (k-mers).

**Approach**:

1. **De Bruijn Graph**:

   * Represent k-mers as edges.
   * Nodes are (k-1)-mers (prefixes/suffixes).
   * Find an **Eulerian path**: a path that visits every edge exactly once.
   * Rebuild the sequence by following this path.

2. **Overlap Graph**:

   * Represent k-mers as nodes.
   * Add an edge from node A to node B if the suffix of A matches the prefix of B (length k-1).
   * Find a **Hamiltonian path**: a path that visits every node exactly once.
   * Reconstruct the sequence by joining overlapping fragments along the path.

---

###  Low-Level Description (Implementation)

**De Bruijn Algorithm**:

* For each k-mer, add an edge from `prefix(k-mer)` to `suffix(k-mer)`.
* Ensure the graph is *nearly balanced* (1 start, 1 end node).
* Add a temporary edge from end to start.
* Use **Hierholzer’s algorithm** to find an Eulerian cycle.
* Remove the temporary edge to get the Eulerian path.
* Reconstruct the sequence by concatenating characters from each node.

**Overlap Graph Algorithm**:

* Label each fragment uniquely (e.g., `"ATG-1"`).
* For every pair of fragments, add an edge if `suffix(A) == prefix(B)`.
* Use **backtracking** to search for a Hamiltonian path.
* Rebuild the sequence using the first full fragment and last characters from the rest.


In [25]:
def prefix(seq): return seq[:-1]
def suffix(seq): return seq[1:]

class DeBruijnGraph(Graph):
    """
    Class representing a De Bruijn graph constructed from k-mers (fragments).
    Inherits from the base Graph class.
    """

    def __init__(self, frags):
        """
        Initializes a De Bruijn graph from a list of k-mers.

        Each k-mer contributes a directed edge from its prefix (first k-1 chars)
        to its suffix (last k-1 chars).

        :param frags: List of k-mer strings.
        """
        super().__init__()
        for seq in frags:
            self.add_edge(prefix(seq), suffix(seq))

    def check_nearly_balanced_graph(self):
        """
        Checks if the graph is nearly balanced (Eulerian path exists).

        A graph is nearly balanced if:
        - One vertex has out-degree = in-degree + 1 (start node)
        - One vertex has in-degree = out-degree + 1 (end node)
        - All other nodes have equal in-degree and out-degree

        :return: Tuple (start_node, end_node) if nearly balanced, else (None, None)
        """
        res = None, None  # (start, end)
        for n in self.graph:
            indeg = self.in_degree(n)
            outdeg = self.out_degree(n)
            if indeg - outdeg == 1:
                res = res[0], n  # candidate for end node
            elif outdeg - indeg == 1:
                res = n, res[1]  # candidate for start node
            elif indeg != outdeg:
                return None, None
        return res

    def eulerian_path(self):
        """
        Constructs a Eulerian path using Hierholzer's algorithm.

        Requires the graph to be nearly balanced. Adds a temporary edge to
        make the graph Eulerian, finds the cycle, then removes the temporary edge.

        :return: List of nodes in Eulerian path, or None if not found.
        """
        start, end = self.check_nearly_balanced_graph()
        if not start or not end:
            return None

        self.add_edge(end, start)  # temporary edge

        path = []
        stack = [start]
        edges = {u: list(vs) for u, vs in self.graph.items()}  # deep copy

        while stack:
            u = stack[-1]
            if edges.get(u):
                stack.append(edges[u].pop())
            else:
                path.append(stack.pop())

        path.reverse()

        # Identify and remove the temporary edge
        for i in range(len(path) - 1):
            if path[i] == end and path[i + 1] == start:
                return path[i + 1:] + path[1:i + 1]

        return None

    def seq_from_path(self, path):
        """
        Reconstructs the original sequence from the Eulerian path.

        Each node represents a (k-1)-mer, so the full sequence is constructed
        by appending the last character of each successive node.

        :param path: List of node strings forming a Eulerian path.
        :return: Reconstructed string sequence, or None if path is invalid.
        """
        if not path:
            return None
        return path[0] + ''.join(n[-1] for n in path[1:])


class OverlapGraph(Graph):
    """
    Class representing an overlap graph built from a set of DNA fragments.

    An edge exists from fragment A to fragment B if suffix(A) == prefix(B).
    Handles identical sequences by uniquely labeling them.
    """

    def __init__(self, frags):
        """
        Initializes the OverlapGraph from a list of fragments.

        :param frags: List of sequence fragments (strings).
        """
        super().__init__()
        self.frags = [f"{f}-{i}" for i, f in enumerate(frags, 1)]

        for f in self.frags:
            self.add_vertex(f)

        for f1 in self.frags:
            s1 = suffix(f1.split('-')[0])
            for f2 in self.frags:
                if prefix(f2.split('-')[0]) == s1:
                    self.add_edge(f1, f2)

    def search_hamiltonian_path(self):
        """
        Attempts to find a Hamiltonian path using backtracking.

        A Hamiltonian path visits each vertex exactly once.

        :return: List of vertex labels in path, or None if not found.
        """
        def bt(path):
            if len(path) == len(self.graph):
                return path
            for neighbor in self.graph[path[-1]]:
                if neighbor not in path:
                    res = bt(path + [neighbor])
                    if res:
                        return res
            return None

        for start in self.graph:
            res = bt([start])
            if res:
                return res
        return None

    def get_seq(self, node):
        """
        Extracts the original sequence from a node label.

        :param node: Node label (e.g., 'ATG-3').
        :return: Sequence part before the suffix (e.g., 'ATG').
        """
        return node.split('-')[0]

    def seq_from_path(self, path):
        """
        Reconstructs a DNA sequence from a Hamiltonian path in the overlap graph.

        The full sequence is built by appending the last character of each successive node.

        :param path: List of vertex labels forming a Hamiltonian path.
        :return: Reconstructed sequence, or None if path is invalid.
        """
        if not path:
            return None
        return self.get_seq(path[0]) + ''.join(self.get_seq(n)[-1] for n in path[1:])



In [26]:
def composition(k, seq):
    return sorted([seq[i:i+k] for i in range(len(seq)-k+1)])

def run_all(seq, k):
    print("Original sequence:", seq)
    frags = composition(k, seq)
    print("k-mers:", frags)

    # De Bruijn Graph Method
    print("\n--- De Bruijn ---")
    dbg = DeBruijnGraph(frags)
    dbg.print_graph()
    path = dbg.eulerian_path()
    if path:
        print("Eulerian path found:", path)
        print("Reconstructed sequence:", dbg.seq_from_path(path))
    else:
        print("Eulerian path not found.")

    # Overlap Graph Method with repetitions
    print("\n--- Overlap with repetitions ---")
    og = OverlapGraph(frags)
    og.print_graph()
    path = og.search_hamiltonian_path()
    if path:
        print("Hamiltonian path found:", path)
        print("Reconstructed sequence:", og.seq_from_path(path))
    else:
        print("Hamiltonian path not found.")

run_all('ATGCAATGGTCTG', 3)

Original sequence: ATGCAATGGTCTG
k-mers: ['AAT', 'ATG', 'ATG', 'CAA', 'CTG', 'GCA', 'GGT', 'GTC', 'TCT', 'TGC', 'TGG']

--- De Bruijn ---
AA -> ['AT']
AT -> ['TG']
TG -> ['GC', 'GG']
CA -> ['AA']
CT -> ['TG']
GC -> ['CA']
GG -> ['GT']
GT -> ['TC']
TC -> ['CT']
Eulerian path not found.

--- Overlap with repetitions ---
AAT-1 -> ['ATG-2', 'ATG-3']
ATG-2 -> ['TGC-10', 'TGG-11']
ATG-3 -> ['TGC-10', 'TGG-11']
CAA-4 -> ['AAT-1']
CTG-5 -> ['TGC-10', 'TGG-11']
GCA-6 -> ['CAA-4']
GGT-7 -> ['GTC-8']
GTC-8 -> ['TCT-9']
TCT-9 -> ['CTG-5']
TGC-10 -> ['GCA-6']
TGG-11 -> ['GGT-7']
Hamiltonian path found: ['ATG-2', 'TGC-10', 'GCA-6', 'CAA-4', 'AAT-1', 'ATG-3', 'TGG-11', 'GGT-7', 'GTC-8', 'TCT-9', 'CTG-5']
Reconstructed sequence: ATGCAATGGTCTG
