In [None]:
# Install missing packages
%pip install matplotlib networkx plotly

# Import Required Libraries
from collections import defaultdict, namedtuple
import matplotlib.pyplot as plt
import networkx as nx
import plotly.graph_objects as go
import json
import os

# Sequitur Algorithm for Grammar Inference
This notebook demonstrates the Sequitur algorithm for grammar inference, including implementation, sample runs, and visualisation of grammar rules.

In [None]:
# Define Sequitur Algorithm Implementation
Rule = namedtuple('Rule', ['symbols'])
class Sequitur:
    def __init__(self):
        self.rules = {0: Rule([])}
        self.digrams = {}
        self.next_rule_id = 1
    def process(self, sequence):
        self.rules[0] = Rule(list(sequence))
        i = 0
        while i < len(self.rules[0].symbols) - 1:
            digram = tuple(self.rules[0].symbols[i:i+2])
            if digram in self.digrams:
                self._substitute(i, digram)
                i = max(0, i-1)
            else:
                self.digrams[digram] = (0, i)
                i += 1
    def _substitute(self, index, digram):
        rule_id = self.next_rule_id
        self.next_rule_id += 1
        self.rules[rule_id] = Rule(list(digram))
        # Replace digram with rule reference
        self.rules[0].symbols[index:index+2] = [f'R{rule_id}']
        # Remove old digram
        del self.digrams[digram]
    def get_rules(self):
        return self.rules

In [None]:
# Run Sequitur on Sample Input
sample_sequence = "ABABCBABABC"
sequitur = Sequitur()
sequitur.process(sample_sequence)
rules = sequitur.get_rules()
print("Inferred Grammar Rules:")
for rule_id, rule in rules.items():
    print(f"Rule {rule_id}: {rule.symbols}")

In [None]:
# Visualise Grammar Rules with NetworkX
G = nx.DiGraph()
for rule_id, rule in rules.items():
    for symbol in rule.symbols:
        if isinstance(symbol, str) and symbol.startswith('R'):
            G.add_edge(f'R{rule_id}', symbol)
        else:
            G.add_edge(f'R{rule_id}', symbol)
plt.figure(figsize=(8,6))
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, node_color='lightblue', edge_color='grey', font_size=10)
plt.title('Grammar Rule Graph')
plt.show()

In [None]:
# Test Sequitur with Different Inputs
def test_sequitur(sequence):
    s = Sequitur()
    s.process(sequence)
    print(f"Input: {sequence}")
    for rule_id, rule in s.get_rules().items():
        print(f"Rule {rule_id}: {rule.symbols}")
test_sequitur("ABCABCABC")
test_sequitur("AABBAABB")

## Notebook Workflow Summary
1. **Import Required Libraries**: Loads Python libraries for grammar inference and visualisation.
2. **Define Sequitur Algorithm Implementation**: Implements the Sequitur algorithm for grammar inference.
3. **Run Sequitur on Sample Input**: Demonstrates grammar inference on a sample sequence.
4. **Visualise Grammar Rules**: Plots the inferred grammar rules as a directed graph.
5. **Test Sequitur with Different Inputs**: Allows testing the algorithm with other input sequences to observe grammar changes.

In [None]:
# Check and Visualise Alternatives from alternatives.json
# Load alternatives.json
with open('/workspaces/Sequitur/tests/synthetic/bee_movie/results/alternatives.json', 'r') as f:
    alternatives = json.load(f)

# Print summary statistics
print("Ambiguity count:", alternatives.get("ambiguity_count"))
print("Number of chains:", len(alternatives.get("chains", [])))
print("Number of components:", len(alternatives.get("components", [])))
print("Number of cycles:", len(alternatives.get("cycles", [])))
print("Number of squares:", len(alternatives.get("squares", [])))

# Visualise chains as a graph
G = nx.Graph()
for chain in alternatives.get("chains", []):
    nx.add_path(G, chain)

plt.figure(figsize=(10,6))
nx.draw(G, with_labels=True, node_color='lightblue', edge_color='grey')
plt.title("Ambiguity Chains Graph")
plt.show()

In [None]:
# Visualise all ambiguity components from alternatives.json
# Load alternatives.json
with open('/workspaces/Sequitur/tests/synthetic/bee_movie/results/alternatives.json', 'r') as f:
    alternatives = json.load(f)

# Build graph from components
G = nx.Graph()
for component in alternatives.get("components", []):
    if len(component) > 1:
        nx.add_path(G, component)

# Highlight cycles and squares
cycle_nodes = set()
for cycle in alternatives.get("cycles", []):
    cycle_nodes.update(cycle)
square_nodes = set()
for square in alternatives.get("squares", []):
    if isinstance(square, dict) and "j" in square:
        square_nodes.add(square["j"])

# Draw graph with highlights
plt.figure(figsize=(12,8))
node_colors = []
for node in G.nodes():
    if node in cycle_nodes:
        node_colors.append('orange')
    elif node in square_nodes:
        node_colors.append('red')
    else:
        node_colors.append('lightblue')
nx.draw(G, with_labels=True, node_color=node_colors, edge_color='grey', font_size=10)
plt.title("Ambiguity Components: Cycles (orange), Squares (red), Others (blue)")
plt.show()

In [None]:
# Interactive ambiguity graph visualisation with Plotly
# Load alternatives.json
with open('/workspaces/Sequitur/tests/synthetic/bee_movie/results/alternatives.json', 'r') as f:
    alternatives = json.load(f)

# Load index_map.tsv and build node-to-sequence mapping
index_map = {}
with open('/workspaces/Sequitur/tests/synthetic/bee_movie/results/index_map.tsv', 'r') as f:
    for line in f:
        idx, seq = line.strip().split('\t', 1)
        index_map[int(idx)] = seq

def get_sequence(node):
    if isinstance(node, int) and node in index_map:
        # Show first 40 bases for hover, full sequence on click (optional)
        seq = index_map[node]
        return f"Node {node}: {seq[:40]}... ({len(seq)} bp)"
    return str(node)

# Build graph from components
G = nx.Graph()
ambiguous_nodes = set()
for component in alternatives.get("components", []):
    if len(component) > 1:
        nx.add_path(G, component)
        ambiguous_nodes.update(component)

# Collect all nodes (ambiguous and non-ambiguous)
all_nodes = set(G.nodes())
# Suppose all possible nodes are in range(N), where N is max node index + 1
N = max(all_nodes) + 1 if all_nodes else 0
non_ambiguous_nodes = set(range(N)) - ambiguous_nodes

# Collapse non-ambiguous nodes into a single node
collapsed_node = "Non-ambiguous"
for node in non_ambiguous_nodes:
    G.add_edge(collapsed_node, node)

# Prepare node attributes
node_x, node_y, node_text, node_color, node_shape = [], [], [], [], []
pos = nx.spring_layout(G)
for node in G.nodes():
    x, y = pos[node]
    node_x.append(x)
    node_y.append(y)
    if node == collapsed_node:
        node_text.append("All non-ambiguous nodes")
        node_color.append("grey")
        node_shape.append("square")
    else:
        node_text.append(get_sequence(node))
        if node in ambiguous_nodes:
            node_color.append("lightblue")
            node_shape.append("circle")
        else:
            node_color.append("grey")
            node_shape.append("circle")

# Prepare edges
edge_x, edge_y = [], []
for edge in G.edges():
    x0, y0 = pos[edge[0]]
    x1, y1 = pos[edge[1]]
    edge_x += [x0, x1, None]
    edge_y += [y0, y1, None]

# Plotly graph
edge_trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=1, color="#888"), hoverinfo='none', mode='lines')
node_trace = go.Scatter(x=node_x, y=node_y, mode='markers',
    marker=dict(color=node_color, size=18, symbol=["square" if s=="square" else "circle" for s in node_shape]),
    text=node_text, hoverinfo='text')

fig = go.Figure(data=[edge_trace, node_trace],
    layout=go.Layout(title="Interactive Ambiguity Graph",
        showlegend=False, hovermode='closest',
        margin=dict(b=20,l=5,r=5,t=40),
        xaxis=dict(showgrid=False, zeroline=False),
        yaxis=dict(showgrid=False, zeroline=False)))
fig.show()

In [None]:
# Interactive ambiguity graph with collapsed unambiguous paths
# Load alternatives.json
with open('/workspaces/Sequitur/tests/synthetic/bee_movie/results/alternatives.json', 'r') as f:
    alternatives = json.load(f)

# Load index_map.tsv and build node-to-sequence mapping
index_map = {}
with open('/workspaces/Sequitur/tests/synthetic/bee_movie/results/index_map.tsv', 'r') as f:
    for line in f:
        idx, seq = line.strip().split('\t', 1)
        index_map[int(idx)] = seq

def get_sequence(node):
    if isinstance(node, int) and node in index_map:
        seq = index_map[node]
        return f"Node {node}: {seq[:40]}... ({len(seq)} bp)"
    return str(node)

# Collapse long unambiguous paths into single edges
G = nx.Graph()
ambiguous_nodes = set()
for component in alternatives.get("components", []):
    if len(component) > 1:
        nx.add_path(G, component)
        ambiguous_nodes.update(component)

all_nodes = set(G.nodes())
N = max(all_nodes) + 1 if all_nodes else 0
non_ambiguous_nodes = set(range(N)) - ambiguous_nodes

# Find unambiguous stretches (paths of >2 nodes not in ambiguous_nodes)
collapsed_edges = []
visited = set()
for node in non_ambiguous_nodes:
    if node in visited:
        continue
    # Try to find a linear path starting at node
    path = [node]
    current = node
    while True:
        neighbors = [n for n in range(N) if n not in ambiguous_nodes and n not in visited and n != current]
        if not neighbors:
            break
        next_node = neighbors[0]
        path.append(next_node)
        visited.add(current)
        current = next_node
    if len(path) > 2:
        collapsed_edges.append((path[0], path[-1], len(path)))
        for n in path:
            visited.add(n)
    else:
        visited.add(node)

# Add collapsed edges to graph (as faded lines)
for start, end, length in collapsed_edges:
    G.add_edge(start, end, collapsed=True, length=length)

# Prepare node and edge attributes
node_x, node_y, node_text, node_color, node_shape = [], [], [], [], []
pos = nx.spring_layout(G)
for node in G.nodes():
    x, y = pos[node]
    node_x.append(x)
    node_y.append(y)
    node_text.append(get_sequence(node))
    if node in ambiguous_nodes:
        node_color.append("lightblue")
        node_shape.append("circle")
    else:
        node_color.append("grey")
        node_shape.append("circle")

edge_x, edge_y, edge_color = [], [], []
for edge in G.edges(data=True):
    x0, y0 = pos[edge[0]]
    x1, y1 = pos[edge[1]]
    edge_x += [x0, x1, None]
    edge_y += [y0, y1, None]
    if edge[2].get("collapsed"):
        edge_color.append("rgba(150,150,150,0.3)")  # faded for collapsed
    else:
        edge_color.append("#888")

# Plotly graph
edge_trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=1, color="#888"), hoverinfo='none', mode='lines')
node_trace = go.Scatter(x=node_x, y=node_y, mode='markers',
    marker=dict(color=node_color, size=18, symbol=["circle" for _ in node_shape]),
    text=node_text, hoverinfo='text')

fig = go.Figure(data=[edge_trace, node_trace],
    layout=go.Layout(title="Ambiguity Graph (Collapsed Unambiguous Paths)",
        showlegend=False, hovermode='closest',
        margin=dict(b=20,l=5,r=5,t=40),
        xaxis=dict(showgrid=False, zeroline=False),
        yaxis=dict(showgrid=False, zeroline=False)))
fig.show()

In [None]:
from Bio import SeqIO

def build_suffix_array(reads, min_suf_len=3):
    suf_arr = []
    reads = list(reads)
    for index, read in enumerate(reads):
        read += '$' + str(index)
        for i in range(read.index('$') - min_suf_len + 1):
            suf_arr += [read[i:]]
        read = read.replace('$', '^')
        for i in range(read.index('^') - min_suf_len + 1):
            suf_arr += [read[:read.index('^') - i] + read[read.index('^'):]]
    suf_arr.sort()
    return suf_arr

# Load reads from swap_test_1.fastq and swap_test_2.fastq
reads1 = [str(rec.seq) for rec in SeqIO.parse("../tests/fixtures/swap_test_1.fastq", "fastq")]
reads2 = [str(rec.seq) for rec in SeqIO.parse("../tests/fixtures/swap_test_2.fastq", "fastq")]
reads = reads1 + reads2

suf_arr = build_suffix_array(reads, min_suf_len=3)

with open("python_affix_array.txt", "w") as f:
    for affix in suf_arr:
        f.write(f"{affix}\n")


: 

In [None]:
# Load and visualise assembly graph from exported graph JSON
# Path to exported graph JSON (update as needed)
graph_json_path = '/workspaces/Sequitur/tests/synthetic/bee_movie/results/assembly_graph.json'
assert os.path.exists(graph_json_path), f"Graph JSON not found: {graph_json_path}"

with open(graph_json_path, 'r') as f:
    graph_data = json.load(f)

# Build NetworkX graph from JSON
G = nx.DiGraph()
for node in graph_data['nodes']:
    G.add_node(node['id'], sequence=node.get('sequence', ''), **{k: v for k, v in node.items() if k not in ['id', 'sequence']})
for edge in graph_data['edges']:
    G.add_edge(edge['source'], edge['target'], weight=edge.get('weight', 1), **{k: v for k, v in edge.items() if k not in ['source', 'target', 'weight']})

# Prepare node attributes for Plotly
node_x, node_y, node_text, node_color = [], [], [], []
pos = nx.spring_layout(G)
for node in G.nodes():
    x, y = pos[node]
    node_x.append(x)
    node_y.append(y)
    seq = G.nodes[node].get('sequence', '')
    node_text.append(f"Node {node}: {seq[:40]}... ({len(seq)} bp)" if seq else str(node))
    node_color.append('lightblue')

# Prepare edge attributes
edge_x, edge_y = [], []
for edge in G.edges():
    x0, y0 = pos[edge[0]]
    x1, y1 = pos[edge[1]]
    edge_x += [x0, x1, None]
    edge_y += [y0, y1, None]

# Plotly graph
edge_trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=1, color="#888"), hoverinfo='none', mode='lines')
node_trace = go.Scatter(x=node_x, y=node_y, mode='markers',
    marker=dict(color=node_color, size=18, symbol='circle'),
    text=node_text, hoverinfo='text')

fig = go.Figure(data=[edge_trace, node_trace],
    layout=go.Layout(title="Assembly Graph (Interactive)",
        showlegend=False, hovermode='closest',
        margin=dict(b=20,l=5,r=5,t=40),
        xaxis=dict(showgrid=False, zeroline=False),
        yaxis=dict(showgrid=False, zeroline=False)))
fig.show()