In [1]:
import networkx as nx
from collections import defaultdict
import Levenshtein

In [35]:
def read_fasta(filepath):
    sequence = ""
    with open(filepath, "r") as f:
        for line in f:
            line = line.strip()
            if line.startswith(">"):
                continue 
            sequence += line  
    return sequence


def build_kmer_dict(sequence, k):
    # 创建k-mer字典
    kmer_dict = defaultdict(list)
    n = len(sequence)
    for i in range(n - k + 1):
        kmer = sequence[i:i + k]
        kmer_dict[kmer].append(i)
    return kmer_dict

def filter_kmer_dict(kmer_dict):
    # 去单kmer
    to_remove = []
    for kmer, positions in kmer_dict.items():
        if len(positions) < 2: 
            to_remove.append(kmer)
    for kmer in to_remove:
        del kmer_dict[kmer]

# dna_seq = "ATGCGATGACCTGATGATGCGATGTGACGTATGCGATGACCTGATGATGCGATG"
dna_seq = read_fasta("10^3.fasta")

k = 7
min_l = 300
max_l = 1000
sim_threshold = 0.7

# 1. 建立kmer字典
kmer_dict = build_kmer_dict(dna_seq, k)
print(kmer_dict)
filter_kmer_dict(kmer_dict)
print(kmer_dict)

defaultdict(<class 'list'>, {'GTCTCTG': [0], 'TCTCTGT': [1, 954], 'CTCTGTC': [2], 'TCTGTCC': [3], 'CTGTCCA': [4], 'TGTCCAT': [5], 'GTCCATG': [6], 'TCCATGT': [7], 'CCATGTA': [8], 'CATGTAT': [9, 1444], 'ATGTATA': [10, 4277, 5565], 'TGTATAA': [11, 320, 4278, 5566], 'GTATAAG': [12, 5567], 'TATAAGC': [13], 'ATAAGCG': [14], 'TAAGCGT': [15], 'AAGCGTG': [16], 'AGCGTGT': [17], 'GCGTGTG': [18, 266, 1078], 'CGTGTGT': [19, 235, 1014], 'GTGTGTG': [20, 22, 24, 26, 1015], 'TGTGTGT': [21, 23, 25, 27, 1016], 'GTGTGTT': [28, 553, 1017], 'TGTGTTT': [29, 554], 'GTGTTTG': [30, 555], 'TGTTTGC': [31, 4042], 'GTTTGCA': [32], 'TTTGCAG': [33, 3960, 4887, 5338], 'TTGCAGG': [34, 5339], 'TGCAGGC': [35], 'GCAGGCA': [36, 2842], 'CAGGCAG': [37], 'AGGCAGG': [38], 'GGCAGGG': [39], 'GCAGGGG': [40], 'CAGGGGA': [41], 'AGGGGAA': [42, 3802], 'GGGGAAA': [43, 982, 3803], 'GGGAAAA': [44, 3804], 'GGAAAAA': [45, 470, 1223, 3805, 4915], 'GAAAAAA': [46, 1224, 1928, 3853], 'AAAAAAT': [47, 1260, 1518, 1929, 2181, 2980, 3855, 5134], 

In [36]:
def build_debruijn_graph(sequence, kmer_dict, k, min_l, max_l):
    G = nx.MultiDiGraph()
    
    for key in kmer_dict.keys():
        G.add_node(key, position=kmer_dict[key])

    n = len(sequence)
    for i in range(n - k + 1):
        current_kmer = sequence[i:i + k]
        if current_kmer not in kmer_dict:
            continue
        
        start_search = i + min_l
        end_search = i + max_l
        # 查询窗口
        if end_search + k > n:
            end_search = n - k

        for next_pos in range(start_search, end_search + 1):
            next_kmer = sequence[next_pos: next_pos + k]
            if next_kmer in kmer_dict:
                G.add_edge(
                    current_kmer, 
                    next_kmer, 
                    pos_start = i, 
                    pos_end   = next_pos, 
                    weight    = (next_pos - i)
                )
    return G

# 2. 建图
G = build_debruijn_graph(dna_seq, kmer_dict, k, min_l, max_l)

In [41]:
def levenshtein_similarity(seq1, seq2):
    # 计算Levenshtein距离相似度
    lev_distance = Levenshtein.distance(seq1, seq2)
    max_len = max(len(seq1), len(seq2))
    similarity = 1 - lev_distance / max_len
    return similarity


def remove_nested_intervals(intervals):
    # 去嵌
    intervals.sort(key=lambda x: (x[1], -x[2]))
    result = []
    max_end = float('-inf')

    for item in intervals:
        seq, start, end = item
        if end > max_end:
            result.append(item)
            max_end = end
        else:
            pass

    return result

def has_overlap(intervals):
    intervals = [(intervals[i], intervals[i+1]) for i in range(0, len(intervals), 2)]
    intervals.sort()
    
    # 区间是否重叠
    for i in range(1, len(intervals)):
        if intervals[i][0] < intervals[i-1][1]:
            return True
    return False

def merge_intervals(a, b):
    # 结果列表，用于存储合并后的区间
    result = b.copy()  # 先把b中的区间复制到结果中

    for interval_a in a:
        # 需要检查的区间
        merged = False
        for i, interval_b in enumerate(result):
            # 检查区间a与b是否有重叠或嵌套
            if (interval_a[0] <= interval_b[1] and interval_a[1] >= interval_b[0]):  # 有重叠
                # 合并区间
                new_interval = [min(interval_a[0], interval_b[0]), max(interval_a[1], interval_b[1])]
                result[i] = new_interval  # 用合并后的区间替换旧的区间
                merged = True
                break
            elif (interval_a[0] >= interval_b[0] and interval_a[1] <= interval_b[1]):  # 区间a嵌套于区间b
                merged = True
                break
        
        if not merged:
            # 如果没有重叠或嵌套，直接将区间a加入result中
            result.append(interval_a)
    
    return result

def check_duplication(G, sequence):
    # 建立字典
    group_num = 0
    duplication_dict = {}
    for edge in set(G.edges()):
        edge_group = G[edge[0]][edge[1]]
        if len(edge_group) > 1:
            duplication_dict[str(group_num)] = []
            # print("\n" + "from " + edge[0] + " to " + edge[1])
            for i in range(len(edge_group)):
                pos_start = edge_group[i]["pos_start"]
                pos_end = edge_group[i]["pos_end"] + 3
                seq = dna_seq[pos_start:pos_end]
                # print(seq)

                duplication_dict[str(group_num)].append([seq, pos_start, pos_end])
            group_num += 1

    # 去重去嵌，建立目录
    seq_directory = {}
    del_list = []
    for key in duplication_dict.keys():
        duplication_dict[key] = remove_nested_intervals(duplication_dict[key])
        if len(duplication_dict[key]) > 1:
            for edge in duplication_dict[key]:
                start_pos = edge[1]
                if str(start_pos) in seq_directory.keys():
                    seq_directory[str(start_pos)].append(key)
                else:
                    seq_directory[str(start_pos)] = [key]
        else:
            del_list.append(key)
    for key in del_list:
        duplication_dict.pop(key)
    # print(duplication_dict)
    # print(seq_directory)

    # 从头查重
    duplication_group = {}
    duplication_group_index = 0
    pos = 0
    while pos < len(sequence):
        if str(pos) in seq_directory.keys():
            # print(str(pos) + " " + str(seq_directory[str(pos)]))
            for edge_group_index in seq_directory[str(pos)]:
                kmer_group = duplication_dict[str(edge_group_index)]
                section = [num for sublist in kmer_group for num in sublist[1:]]
                # print(kmer_group)
                # print(section)

                list_duplication_deoverlap = []
                for j in range(int(len(section) / 2)):
                    for k in range(j + 1 ,int(len(section) / 2)):
                        a,b, c, d = section[2 * j], section[2 * j + 1], section[2 * k], section[2 * k + 1]

                        # 如果存在重叠
                        if max(a, c) < min(b, d):
                            a_n, b_n, c_n, d_n = min(a, c), min(b, c), max(b, c), max(b, d)
                            if levenshtein_similarity(sequence[a_n: b_n], sequence[c_n: d_n]) > sim_threshold:
                                list_duplication_deoverlap.append([a_n, b_n])
                                list_duplication_deoverlap.append([c_n, d_n])
                        # 如果不存在重叠
                        else:
                            list_duplication_deoverlap.append([a, b])
                            list_duplication_deoverlap.append([c, d])
                
                list_duplication_deoverlap = [list(tup) for tup in list(set(list(map(tuple, list_duplication_deoverlap))))]   

                if len(duplication_group.keys()) > 0:
                    for key in duplication_group.keys():
                        print(list_duplication_deoverlap)
                        duplication_group[key] = merge_intervals(list_duplication_deoverlap, duplication_group[key])

                else:
                    duplication_group[str(duplication_group_index)] = list_duplication_deoverlap
                    duplication_group_index += 1
            print(duplication_group)
        pos += 1
        # break
            

check_duplication(G, dna_seq)

[[954, 1573], [1, 488]]
[[954, 1829], [1, 854]]
[[1, 903], [954, 1285]]
[[954, 1795], [1, 390]]
[[1, 738], [954, 1578]]
[[1, 795], [954, 1638]]
[[1, 912], [954, 1370]]
[[1, 751], [954, 1651]]
[[1, 416], [954, 1941]]
[[1, 383], [954, 1480]]
[[1, 671], [954, 1841]]
[[954, 1951], [1, 340]]
[[1, 339], [954, 1950]]
[[1, 529], [954, 1933]]
[]
[[1, 800], [954, 1472]]
[[954, 1934], [1, 580]]
[[1, 911], [954, 1830]]
[[1, 401], [954, 1947]]
[[954, 1294], [1, 459]]
[[1, 771], [954, 1561]]
[[1, 368], [954, 1555]]
[[954, 1626], [1, 413]]
[[1, 434], [954, 1859]]
[[1, 491], [954, 1463]]
[[1, 834], [954, 1503]]
[[954, 1811], [1, 321]]
[[954, 1824], [1, 933]]
[[1, 530], [954, 1523]]
[[1, 579], [954, 1296]]
[[1, 367], [954, 1554]]
[[954, 1935], [1, 581]]
[[954, 1572], [1, 882]]
[[954, 1468], [1, 926]]
[[1, 540], [954, 1259]]
[[954, 1367], [1, 712]]
{'0': [[954, 1951], [1, 933]]}
[[9, 854], [1444, 1829]]
[[9, 646], [1444, 1984]]
[[1444, 2389], [9, 453]]
[[9, 460], [1444, 2182]]
[[1444, 2163], [9, 648]]
[

遍历序列，\\
    获得当前序列的位置，\\
    查找当前序列位置是否有新的kmer开头，\\
        如果有，检查kmer是否存在于维护中的重复组中，\\
            如果存在，就延长维护中的重复组，更新该组的最后标定kmer。\\
            如果不存在就创建一个维护中的重复组，创建最后标定kmer，\\
        如果没有，就跳过。
    然后检查所有维护中的重复组
        如果一个重复组延长到当前位置的话相似度会下降，则跳过，
        如果一个重复组延长到当前位置的话相似度会上升，则给这个重复组增加一个潜在结束长度
        检查维护中的重复组是否满足结束条件，如果满足就结束（结束条件是啥，设定如果测量长度超过重复组本身的长度就结束）
最后结束所有的维护中的重复组（）


重复组字典的一个键的结构：

key：[序号]
内容【[序列1 起 始],
[序列2 起 始],
[序列3 起 始],
[1 2 相似度，1 3 相似度， 2 3 相似度]】


