In [38]:
import numpy as np
from Bio import SeqIO
from collections import defaultdict
import random

1. Построение графа Де Брюина (5 баллов)

По заданному набору ридов в формате FASTQ и параметру k, который соответствует длине k-меров, построить граф Де Брюина, некоторый путь в котором соответствовал бы возможной подстроке в исходном геноме. Не забывайте про запоминание покрытия каждого k-мера, а так же про сами подстроки, которые соответствуют каждому ребру. В остальном граф полностью соответствует тому, что был описан в лекции.

In [39]:
def generate_random_seq(length):
    return ''.join(random.choices("ACGT", k=length))

def generate_reads(sequence, read_length):
    reads = []
    for i in range(0, len(sequence), read_length):
        chunk = sequence[i:i + read_length]
        if len(chunk) == read_length:
            reads.append(chunk)
    return reads

random.seed(42)
seq = generate_random_seq(1000)
reads = generate_reads(seq, 150)

In [40]:
class DBGraph:
    def __init__(self, reads, k):
        self.k = k
        self.kmers = []
        self.nodes = set()
        self.edges = {}  # {(prev, next): [kmer, coverage]}
        self.connections = defaultdict(list)
        self.in_degree = defaultdict(int)
        self.out_degree = defaultdict(int)
        self._create_graph(reads)

    def _create_graph(self, reads):
        for read in reads:
            self.kmers.extend([read[i:i+self.k] for i in range(len(read) - self.k + 1)])
        for kmer in self.kmers:
            prev, next = kmer[:-1], kmer[1:]
            self.nodes.update([prev, next])
            key = (prev, next)
            if key in self.edges:
                self.edges[key][1] += 1
            else:
                self.edges[key] = [kmer, 1]
                self.connections[prev].append(next)
                self.in_degree[next] += 1
                self.out_degree[prev] += 1

2. Сжатие графа (4 балла)

Научитесь производить сжатие графа Де Брюина. При сжатии не забывайте склеивать подстроки на ребрах и обновлять покрытие ребер. Пересчитывайте покрытие склеиваемых ребер как взвешенное среднее, где вес соответствует длине подстроки, соответствующей ребру.

In [41]:
class DBGraph:
    def __init__(self, reads, k):
        self.k = k
        self.kmers = []
        self.nodes = set()
        self.edges = {}  # {(prev, next): [kmer, coverage]}
        self.connections = defaultdict(list)
        self.in_degree = defaultdict(int)
        self.out_degree = defaultdict(int)
        self._create_graph(reads)

    def _create_graph(self, reads):
        for read in reads:
            self.kmers.extend([read[i:i+self.k] for i in range(len(read) - self.k + 1)])
        for kmer in self.kmers:
            prev, next = kmer[:-1], kmer[1:]
            self.nodes.update([prev, next])
            key = (prev, next)
            if key in self.edges:
                self.edges[key][1] += 1
            else:
                self.edges[key] = [kmer, 1]
                self.connections[prev].append(next)
                self.in_degree[next] += 1
                self.out_degree[prev] += 1

    def _path(self, start_edge):
        path = [start_edge]
        current = start_edge[1]
    
        while (self.in_degree[current] == 1 and self.out_degree[current] == 1):
            next_node = self.connections[current][0]
            next_edge = (current, next_node)
            if next_edge not in self.edges:
                break
            path.append(next_edge)
            current = next_node
    
        return path

    def compress(self):
        to_replace = []
        for node in self.connections:
            if self.out_degree[node] != 1:
                for next_node in self.connections[node]:
                    edge = (node, next_node)
                    path = self._path(edge)
                    if len(path) > 1:
                        to_replace.append(path)

        for path in to_replace:
            new_str = self.edges[path[0]][0]
            weighted_cov_sum = len(new_str) * self.edges[path[0]][1]
            total_len = len(new_str)

            for e in path[1:]:
                s, cov = self.edges[e]
                new_str += s[-1]
                weighted_cov_sum += len(s) * cov
                total_len += len(s)

                self.in_degree[e[1]] -= 1
                self.out_degree[e[0]] -= 1

            avg_cov = round(weighted_cov_sum / total_len)
            new_edge = (path[0][0], path[-1][1])
            self.edges[new_edge] = [new_str, avg_cov]
            self.connections[path[0][0]] = [path[-1][1]]
            self.in_degree[path[-1][1]] += 1
            self.out_degree[path[0][0]] += 1

            for e in path:
                if e != new_edge:
                    #print(len(self.edges))
                    self.edges.pop(e, None)
                    #print(len(self.edges))
                    if e[0] in self.connections:
                        self.connections[e[0]] = [v for v in self.connections[e[0]] if v != e[1]]


In [42]:
reads = ["ACGTACG", "ACGTACT", "ACGTACC"]

graph = DBGraph(reads, k=3)

print("До сжатия:")
print("Узлы:", graph.nodes)
print("Рёбра:", graph.edges)
print("Связи:", graph.connections)
print("Входящие степени:", graph.in_degree)
print("Исходящие степени:", graph.out_degree)

graph.compress()

print("\nПосле сжатия:")
print("Узлы:", graph.nodes)
print("Рёбра:", graph.edges)
print("Связи:", graph.connections)
print("Входящие степени:", graph.in_degree)
print("Исходящие степени:", graph.out_degree)

До сжатия:
Узлы: {'CG', 'CC', 'TA', 'AC', 'CT', 'GT'}
Рёбра: {('AC', 'CG'): ['ACG', 4], ('CG', 'GT'): ['CGT', 3], ('GT', 'TA'): ['GTA', 3], ('TA', 'AC'): ['TAC', 3], ('AC', 'CT'): ['ACT', 1], ('AC', 'CC'): ['ACC', 1]}
Связи: defaultdict(<class 'list'>, {'AC': ['CG', 'CT', 'CC'], 'CG': ['GT'], 'GT': ['TA'], 'TA': ['AC']})
Входящие степени: defaultdict(<class 'int'>, {'CG': 1, 'GT': 1, 'TA': 1, 'AC': 1, 'CT': 1, 'CC': 1})
Исходящие степени: defaultdict(<class 'int'>, {'AC': 3, 'CG': 1, 'GT': 1, 'TA': 1})

После сжатия:
Узлы: {'CG', 'CC', 'TA', 'AC', 'CT', 'GT'}
Рёбра: {('AC', 'CT'): ['ACT', 1], ('AC', 'CC'): ['ACC', 1], ('AC', 'AC'): ['ACGTAC', 3]}
Связи: defaultdict(<class 'list'>, {'AC': ['AC'], 'CG': [], 'GT': [], 'TA': []})
Входящие степени: defaultdict(<class 'int'>, {'CG': 1, 'GT': 0, 'TA': 0, 'AC': 1, 'CT': 1, 'CC': 1})
Исходящие степени: defaultdict(<class 'int'>, {'AC': 4, 'CG': 0, 'GT': 0, 'TA': 0, 'CT': 0, 'CC': 0})


3. Удаление хвостов (3 балла)
   
Реализуйте возможность после построения графа удалять плохо покрытые хвосты. Чтобы определить, является ли хвост плохо покрытым, нужно посчитать распределение произведений покрытий на длину хвоста по всем хвостам и удалить те, что попадают в 30 процентов ближе к нулю распределения.

In [43]:
class DBGraph:
    def __init__(self, reads, k):
        self.k = k
        self.kmers = []
        self.nodes = set()
        self.edges = {}  # {(prev, next): [kmer, coverage]}
        self.connections = defaultdict(list)
        self.in_degree = defaultdict(int)
        self.out_degree = defaultdict(int)
        self._create_graph(reads)

    def _create_graph(self, reads):
        for read in reads:
            self.kmers.extend([read[i:i+self.k] for i in range(len(read) - self.k + 1)])
        for kmer in self.kmers:
            prev, next = kmer[:-1], kmer[1:]
            self.nodes.update([prev, next])
            key = (prev, next)
            if key in self.edges:
                self.edges[key][1] += 1
            else:
                self.edges[key] = [kmer, 1]
                self.connections[prev].append(next)
                self.in_degree[next] += 1
                self.out_degree[prev] += 1

    def _path(self, start_edge):
        path = [start_edge]
        current = start_edge[1]
    
        while (self.in_degree[current] == 1 and self.out_degree[current] == 1):
            next_node = self.connections[current][0]
            next_edge = (current, next_node)
            if next_edge not in self.edges:
                break
            path.append(next_edge)
            current = next_node
    
        return path

    def compress(self):
        to_replace = []
        for node in self.connections:
            if self.out_degree[node] != 1:
                for next_node in self.connections[node]:
                    edge = (node, next_node)
                    path = self._path(edge)
                    if len(path) > 1:
                        to_replace.append(path)

        for path in to_replace:
            new_str = self.edges[path[0]][0]
            weighted_cov_sum = len(new_str) * self.edges[path[0]][1]
            total_len = len(new_str)

            for e in path[1:]:
                s, cov = self.edges[e]
                new_str += s[-1]
                weighted_cov_sum += len(s) * cov
                total_len += len(s)

                self.in_degree[e[1]] -= 1
                self.out_degree[e[0]] -= 1

            avg_cov = round(weighted_cov_sum / total_len)
            new_edge = (path[0][0], path[-1][1])
            self.edges[new_edge] = [new_str, avg_cov]
            self.connections[path[0][0]] = [path[-1][1]]
            self.in_degree[path[-1][1]] += 1
            self.out_degree[path[0][0]] += 1

            for e in path:
                if e != new_edge:
                    #print(len(self.edges))
                    self.edges.pop(e, None)
                    #print(len(self.edges))
                    if e[0] in self.connections:
                        self.connections[e[0]] = [v for v in self.connections[e[0]] if v != e[1]]

    def delete_tails(self):
        tails = {}
        for u in self.nodes:
            if self.out_degree[u] > 1:
                for v in self.connections.get(u, []):
                    edge = (u, v)
                    path = self._path(edge)
                    if self.out_degree[path[-1][1]] == 0:
                        score = sum(self.edges[e][1] for e in path) * len(path)
                        tails[edge] = (path, score)

        if len(tails) > 1:
            scores = [score for _, score in tails.values()]
            threshold = np.percentile(scores, 30)
            for start_edge, (path, score) in tails.items():
                if score <= threshold:
                    for e in path:
                        self.edges.pop(e, None)
                        self.connections[e[0]] = [v for v in self.connections[e[0]] if v != e[1]]
                        self.in_degree[e[1]] -= 1
                        self.out_degree[e[0]] -= 1
    

In [44]:
reads = [
    "ACGTACGT",
    "ACGTACGT",
    "ACGTACGT",
    "ACGTACGT",
    "ACGTACGG",  # Хвост
    "ACGTACGA"   # Хвост
]

graph = DBGraph(reads, k=3)

print("До удаления хвостов:")
print("Рёбра:", graph.edges)
print("Связи:", graph.connections)
print("Входящие степени:", graph.in_degree)
print("Исходящие степени:", graph.out_degree)

graph.delete_tails()

print("\nПосле удаления хвостов:")
print("Рёбра:", graph.edges)
print("Связи:", graph.connections)
print("Входящие степени:", graph.in_degree)
print("Исходящие степени:", graph.out_degree)

До удаления хвостов:
Рёбра: {('AC', 'CG'): ['ACG', 12], ('CG', 'GT'): ['CGT', 10], ('GT', 'TA'): ['GTA', 6], ('TA', 'AC'): ['TAC', 6], ('CG', 'GG'): ['CGG', 1], ('CG', 'GA'): ['CGA', 1]}
Связи: defaultdict(<class 'list'>, {'AC': ['CG'], 'CG': ['GT', 'GG', 'GA'], 'GT': ['TA'], 'TA': ['AC']})
Входящие степени: defaultdict(<class 'int'>, {'CG': 1, 'GT': 1, 'TA': 1, 'AC': 1, 'GG': 1, 'GA': 1})
Исходящие степени: defaultdict(<class 'int'>, {'AC': 1, 'CG': 3, 'GT': 1, 'TA': 1})

После удаления хвостов:
Рёбра: {('AC', 'CG'): ['ACG', 12], ('CG', 'GT'): ['CGT', 10], ('GT', 'TA'): ['GTA', 6], ('TA', 'AC'): ['TAC', 6]}
Связи: defaultdict(<class 'list'>, {'AC': ['CG'], 'CG': ['GT'], 'GT': ['TA'], 'TA': ['AC']})
Входящие степени: defaultdict(<class 'int'>, {'CG': 1, 'GT': 1, 'TA': 1, 'AC': 1, 'GG': 0, 'GA': 0})
Исходящие степени: defaultdict(<class 'int'>, {'AC': 1, 'CG': 1, 'GT': 1, 'TA': 1, 'GA': 0, 'GG': 0})
