In [114]:
import pandas as pd
import numpy as np
import os
import glob
import re
from typing import List, Callable
from enum import Enum

In [115]:
# path to the dir that contains the output files from the cpp_code (filenames must end with .out)
dir = 'out'

In [116]:
all_files = glob.glob(os.path.join(dir, "*.out"))
df = pd.concat((pd.read_csv(f, header=None) for f in all_files), ignore_index=True)
# seq. name: from the fasta or fastq file
# frame: [0, ..., k]
# repeat_representation: for example AG(GAC)_4 TGT
# score_type: run c++ main -h to see the available score types
# score: the score of the repeat_representation
# was_too_long: 1 if the if the input seq was longer than the configured max length, see c++ main -h
df.columns = ['seq_name', 'frame', 'repeat_representation', 'score_type', 'score', 'was_too_long']

In [117]:
# extract the values from the strings
df['frame'] = df['frame'].str.extract(r'frame[:\s]+(\d+)', expand=False).astype(int)
df['score_type'] = df["score_type"].str.extract(r'score_type[:\s]+(\w+)', expand=False)
df['score'] = df['score'].str.extract(r'score[:\s]+(\d+)', expand=False).astype(int)
df['was_too_long'] = df['was_too_long'].str.extract(
    r'seqlen too long[:\s]+(\w+)', expand=False).astype(int).astype(bool)
# creating new columns
df["no_flanks"] = df["repeat_representation"].apply(lambda x: x[x.index("("):x.rfind(" ")+1]) # TODO: maybe dont use the +1 and adapt the pattern in seq_conforms_with_category accordingly


In [118]:
# these are currently not used
def color_negative_red(val):
    if val < 1:
        font_color = 'white'
        background_color = 'red'
    else:
        font_color = 'black'
        background_color = 'white'
    return 'color: %s; background-color: %s' % (font_color, background_color)


def highlight_chars_with_A(text):
    styled_text = ''
    for char in text:
        if char == 'A':
            styled_text += '<span style="color: white; background-color: red;">%s</span>' % char
        else:
            styled_text += char
    return styled_text
# df.style.applymap(color_negative_red, subset=["frame"])
# df['repeat_representation'] = df['repeat_representation'].apply(highlight_chars_with_A)


In [119]:
categories = [["(CAG)", "TAG", "(CAG)"],
              ["(CAG)", "CAA", "(CAG)"],
              ["(CAG)", "CCG", "(CAG)"]]  # also maybe with numbers as minimum requirements

In [153]:
class Seq_has(Enum):
    NO_NEIGHBOUR = 0
    LEFT_NEIGHBOUR = 1
    RIGHT_NEIGHBOUR = 2
    BOTH_NEIGHBOURS = 3


def seq_conforms_with_category(row,
                               categories: List[List[str]],
                               is_spurious: Callable[[str, Seq_has], bool] = lambda x: False
                               ) -> pd.Series:
    """
    Parameters:
    row: a row from the dataframe
    categories: a list of lists of strings, each list of strings represents a category of tandem repeats
    is_spurious: a function that takes a string and returns a boolean, if repeats in a str are spurious

    Returns: a series of booleans, each representing if the row's repeat_representation conforms with a category. The
    last element is a boolean whether the seq is completely categorizabled or not
    """
    seq = row["no_flanks"]  # if this is changed then i must adapt middles_are_spurious
    found_categories = [0] * len(categories)
    positions_with_found_categories = [0] * len(seq)
    for i, category in enumerate(categories):
        # find all start and stops, see if all tandem repeats in the string are covered
        pattern = (p if p[0] != "(" else p.replace("(", "\(").replace(")", "\)") + "\_\d+\s" for p in category)
        pattern = "(?=(" + "".join(pattern) + "))"  # to find overlapping matches
        matches = re.finditer(pattern, seq)
        for hit in matches:

            start = hit.start()
            end = hit.end() + len(hit.group(1))

            found_categories[i] = 1
            positions_with_found_categories[start:end] = [1] * (end - start)

    mapped_string = "".join(str(x) for x in positions_with_found_categories)
    # print(mapped_string)
    matches = [match for match in re.finditer("0+", mapped_string)]
    if len(matches) == 0:
        return pd.Series(found_categories + [True])  # seq is complete categorized
    found_not_spurious = False
    for match in matches:  # check all uncatagorized parts
        if match.start() == 0 and match.end() == len(seq):
            found_not_spurious = found_not_spurious or not is_spurious(seq, Seq_has.NO_NEIGHBOUR)
        elif match.start() == 0:
            found_not_spurious = found_not_spurious or not is_spurious(seq[0:match.end()], Seq_has.RIGHT_NEIGHBOUR)
        elif match.end() == len(seq):
            found_not_spurious = found_not_spurious or not is_spurious(seq[match.start():], Seq_has.LEFT_NEIGHBOUR)
        else:
            found_not_spurious = found_not_spurious or not is_spurious(
                seq[match.start():match.end()], Seq_has.BOTH_NEIGHBOURS)

    return pd.Series(found_categories + [not found_not_spurious])


def is_spurious_by_max_repeat_len_and_min_distance(seq,
                                                   max_repeat_len: int,
                                                   min_distance: int,
                                                   relation_to_neighbour: Seq_has,
                                                   ) -> bool:
    """
    Parameters:
    str: the string to check
    max_repeat_len: if repeat is shorter than this, it is considered spurious
    min_distance: if the distance between two repeats is greater than this, it is considered spurious
    relation_to_neighbour: if Seq_has.NO_NEIGHBOUR: then the seq is considered to have no neighbouring sequences
                           if Seq_has.LEFT_NEIGHBOUR: then the seq is considered to have a neighbour to the left that ends in a repeat
                           if Seq_has.RIGHT_NEIGHBOUR: then the seq is considered to have a neighbour to the right that starts with a repeat
                           if Seq_has.BOTH_NEIGHBOURS: then the seq is considered to have both neighbours
    """
    # print(seq)
    for x in re.finditer("\(\w+\)_(\d+)\s", seq):
        if int(x.group(1)) > max_repeat_len:
            # print("repeat is too long")
            return False

    if (x := re.search("\(\w+\)_\d+\s(\w*)\(", seq)):
        if len(x.group(1)) < min_distance:
            # print("repeats are to close")
            return False

    if relation_to_neighbour == Seq_has.LEFT_NEIGHBOUR or relation_to_neighbour == Seq_has.BOTH_NEIGHBOURS:
        if (x := re.match("(\w*)\(", seq)):
            if len(x.group(1)) < min_distance:
                # print("Left neighbour is too close")
                return False
    if relation_to_neighbour == Seq_has.RIGHT_NEIGHBOUR or relation_to_neighbour == Seq_has.BOTH_NEIGHBOURS:
        if (x := re.search("\s(\w*)$", seq)):
            if len(x.group(1)) < min_distance:
                # print("Right neighbour is too close")
                return False
    return True


In [155]:
max_repeat_len = 3 # 3 and below is considered spurious
min_distance = 6 # farther apart than including 6 is considered spurious
sample_seqs = ["(CAG)_2 TAG(CAG)_12 ", # True
               "(CAG)_2 TAG(CAG)_12 CAA(CAG)_2 ", # True
               "(AAA)_2 NNNNNN(AAA)_2 ", # True
               "(AAA)_4 NNNNNN(AAA)_2 ", # False
               "(CAG)_2 TAG(CAG)_12 CAA(CAG)_2 NNNNN(TTT)_3 ", # False
               "(CAG)_2 TAG(CAG)_12 CAA(CAG)_2 NNNNNN(TTT)_3 ", # True
               "(TTT)_3 NNNNN(CAG)_2 TAG(CAG)_12 CAA(CAG)_2 ", # False
               "(TTT)_3 NNNNNN(CAG)_2 TAG(CAG)_12 CAA(CAG)_2 ", # True
               "(CAG)_2 TAG(CAG)_12 NNNNNN(AAA)_3 NNNNNN(CAG)_2 CAA(CAG)_2 NNNNNN(TTT)_3 ", # True
               "(CAG)_2 TAG(CAG)_12 NNNNNN(AAA)_3 NNNNN(CAG)_2 CAA(CAG)_2 NNNNNN(TTT)_3 ", # False
               "(CAG)_2 TAG(CAG)_12 NNNNN(AAA)_3 NNNNNN(CAG)_2 CAA(CAG)_2 NNNNNN(TTT)_3 ", # False
               "(CAG)_2 TAG(CAG)_12 NNNNNN(AAA)_4 NNNNNN(CAG)_2 CAA(CAG)_2 NNNNNN(TTT)_3 "] # False
# sample_seqs = ["(GCT)_2 CCTGGGTGTAGTGAGATGTCTCCAGCCAGGGCCAAG(CAG)_3 TAG(CAG)_4 TAG(CAG)_4 CAA(CAG)_5 CAA(CAG)_3 "]
for seq in sample_seqs:
    print(list(seq_conforms_with_category({"no_flanks": seq},
                                          categories, lambda seq, neighbour: is_spurious_by_max_repeat_len_and_min_distance(seq, max_repeat_len, min_distance, neighbour))))


[1, 0, 0, True]
[1, 1, 0, True]
[0, 0, 0, True]
[0, 0, 0, False]
[1, 1, 0, False]
[1, 1, 0, True]
[1, 1, 0, False]
[1, 1, 0, True]
[1, 1, 0, True]
[1, 1, 0, False]
[1, 1, 0, False]
[1, 1, 0, False]


In [156]:
categories_as_col_names = ["".join(c) for c in categories]

# how to recognize spurious repeats? that are small and far away from the repeat of interest?
max_repeat_len = 3
min_distance = 6
def f(x): return seq_conforms_with_category(x, categories, lambda seq,
                                            neighbour: is_spurious_by_max_repeat_len_and_min_distance(seq, max_repeat_len, min_distance, neighbour))


new_cols = df.apply(f, axis=1)
new_cols.columns = ["".join(c) for c in categories] + ["completely_defined"]
for column in new_cols.columns:
    if column in df.columns:
        df[column] = new_cols[column]
        del new_cols[column]

df = pd.concat([df, new_cols], axis=1)


In [127]:
pd.set_option('display.max_colwidth', None)
# df[["no_flanks", "completely_defined", *categories_as_col_names]][(df["(CAG)TAG(CAG)"]==1) & (df["(CAG)CAA(CAG)"]==1)]
df[(df["(CAG)TAG(CAG)"]==1) & (df["(CAG)CAA(CAG)"]==1)]

Unnamed: 0,seq_name,frame,repeat_representation,score_type,score,was_too_long,no_flanks,(CAG)TAG(CAG),(CAG)CAA(CAG),(CAG)CCG(CAG),completely_defined
13,@SRR23922262.828599.1 828599 length=151,1,G(GCT)_2 CCTGGGTGTAGTGAGATGTCTCCAGCCAGGGCCAAG(CAG)_3 TAG(CAG)_4 TAG(CAG)_4 CAA(CAG)_5 CAA(CAG)_3 CAAAGGGTCTGTGTTGCTAAGAGGCTTTTGGTTTCTTTC,CAG,15,False,(GCT)_2 CCTGGGTGTAGTGAGATGTCTCCAGCCAGGGCCAAG(CAG)_3 TAG(CAG)_4 TAG(CAG)_4 CAA(CAG)_5 CAA(CAG)_3,1,1,0,False
235,@SRR23922262.15732653.2 15732653 length=151,0,CCTGGGTGTAGTGAGATGTCTCCAGCCAGGGCCAAG(CAG)_3 TAG(CAG)_4 TAG(CAG)_4 CAA(CAG)_5 CAA(CAG)_3 CAAAGGGTCTGTGTTGCTAAGAGGCTTTTGGTTTCTTTCCCTCCAC,CAG,15,False,(CAG)_3 TAG(CAG)_4 TAG(CAG)_4 CAA(CAG)_5 CAA(CAG)_3,1,1,0,True


In [157]:
print(f"from all samples ({len(df)}) {len(df[df['completely_defined']==1])} were categorizes completely")

from all samples (240) 47 were categorizes completely


In [158]:
df[df["completely_defined"]==0]["no_flanks"][0:20]

0                                                      (CAG)_2 CCA(CAG)_4 CAA(CAG)_3 CCG(CAG)_4 CCA(CAG)_3 CCG(CAG)_4 CCA(CAG)_3 
1                                                                     (AAG)_2 CAGCTTGAG(CAG)_5 CAA(CAG)_3 CAA(CAG)_10 ACA(GAA)_2 
2                                                                                                     (CAG)_12 CATCAGCAT(CAG)_14 
7                                                                     (AAG)_2 CAGCTTGAG(CAG)_5 CAA(CAG)_3 CAA(CAG)_11 ACA(GAA)_2 
8                                                                                                   (CAC)_2 CAGCAACAGCAA(CAG)_19 
10                                             (CAG)_19 CATCACGGAAACTCTGGGCCC(CCT)_3 GGAGCATTTCCCCACCCACTGGAGGGCGGTAGCTCC(CAC)_2 
11                                                  (CAG)_5 CAA(CAG)_3 CAA(CAG)_10 ACA(GAA)_2 TGGACAGAAGATCACTCAGCCCTTGTG(CCT)_2 
12                                                                     (CAC)_2 CAGCAACAGCA