In [1]:
%run "print_functions.ipynb""

In [11]:

def parsegb(path):
    if path[-3:] == ".gz":
        gfl = (gzip.open(path, "rb").read()).decode("utf-8") 
        return list(SeqIO.parse(StringIO(gfl),"gb"))
    else:
        return list(SeqIO.parse(path,"gb"))
    
def extract_name_from_path(path):
    return ".".join(path.split("//")[-1].split(".")[:2])
    
def parsegff(path, return_name = False):
    name = extract_name_from_path(path)
    id_spec = None
    merge_strategy = "merge"
    if path[-3:] == ".gz":
        gfl = (gzip.open(path, "rb").read()).decode("utf-8")
        if return_name:
            return gffutils.create_db(gfl, ':memory:', from_string = True, id_spec = id_spec,
                                      merge_strategy = merge_strategy), name
        return gffutils.create_db(gfl, ':memory:', from_string = True, id_spec = id_spec,
                                  merge_strategy = merge_strategy)
    else:
        if return_name:
            return gffutils.create_db(path, ':memory:', id_spec = id_spec,
                                      merge_strategy = merge_strategy), name
        return gffutils.create_db(path, ':memory:', id_spec = id_spec,
                                  merge_strategy = merge_strategy)
    
def parsefasta(path, return_name = False):
    name = extract_name_from_path(path)
    if path[-3:] == ".gz":
        gfl = (gzip.open(path, "rb").read()).decode("utf-8") 
        if return_name:
            return list(SeqIO.parse(StringIO(gfl),"fasta")), name
        return list(SeqIO.parse(StringIO(gfl),"fasta"))
    else:
        if return_name:
            return list(SeqIO.parse(path,"fasta"))
        return list(SeqIO.parse(path,"fasta"))

#find strange sequences in gff file
def calculate_stats_gff(db):
    unusual_ones = []
    for elem in list(db.features_of_type("gene")):
        cds_cands = (list(db.children(elem)))
        cn = 0
        for elem in cds_cands:
            if elem.featuretype == "CDS":
                cn += 1
        
        if cn > 1:
            #interesting one
            unusual_ones.append((elem, cds_cands))
    return unusual_ones, db

#placeholders to be implemented
def has_gaps(list_of_children):
    return None

def get_start_codon():
    return None

#quick path replacement functions
def replace_path_gff_to_gbff(path, old_folder, new_folder):
    return path.replace(old_folder, new_folder).replace(".gff", ".gbff")
def replace_path_gbff_to_gff(path, old_folder, new_folder):
    return path.replace(old_folder, new_folder).replace(".gbff", ".gff")

#finds feature in fasta list
def find_by_id(flist, query):
    for f in flist:
        if f.id == query:
            return f
    return None

In [1]:
def check_overlapping_genes_one_frame(frame:"Frame object"):
    frame.genes.sort()
    frame.convert_to_computational()
    for i in range(1,len(frame.genes)):
        if frame.genes[i][0] <= frame.genes[i-1][1]:
            print(name, frame[i-1], frame[i])
            
def select_genes_by_frame(genes, frame, genes_type):
    if genes_type != "computational":
        print("WARNING: non-computational type")
    ans = []
    for g in genes:
        if g[0] % 3 == frame % 3:
            #ans.append(g)
            ans.append([g[0] - frame % 3, g[1]])
    return ans
            
def computational_split_string(annot:"Annotation object"):
    annot.convert_to_computational()
    ans = []
    for frame, ref_frame in ((0,len(annot.seq)),(1,-2),(2,-1)):
        frame_genes = select_genes_by_frame(annot.genes, frame, annot.genes_type)
        frame_seq = annot.seq[frame:ref_frame]
        ans.append(Frame(annot.name, frame_seq, frame, frame_genes, annot.genes_type))

    return ans #list of frame obgects

def check_same_annot(frame1, frame2):
    return (frame1.content in frame2.content) or (frame2.content in frame1.content)

def long_wrap(v):
    if len(v) > 100:
        return tqdm.tqdm(v)
    else:
        return v

def computational_assemble_frames(frames:"list of frame objects"):
    if len(frames) != 3:
        print("Warning: frames count != 3")
    #check that frames are from the same annotation
    flag = True
    for elem1 in frames:
        for elem2 in frames:
            flag = flag and check_same_annot(elem1, elem2) and (elem1.name == elem2.name)
    if not flag:
        print("Error: frames are not from the same annotation")
        
    #now we construct annotation
    an_genes = []
    an_content = ""
    an_name = frames[0].name
    an_genes_type = frames[0].genes_type
    for elem in frames:
        if elem.frame == 0:
            an_content = elem.content
        elem_genes = [[e[0] - elem.frame, e[1] - elem.frame] for e in elem.genes]
        an_genes += elem_genes
    return Annotation(an_name, an_content, an_genes, an_genes_type)

def extract_stats(paths_gff:"list of paths to .gff files", paths_fst:"list of paths to fasta files",feat_type = "CDS", 
                  drop_fusion_genes = True):
    #if two genes have the same start then it is a fusion gene
    
    full_annotations = []
    
    for path_gff, path_fst in (zip(paths_gff, paths_fst)):
        gff_dat, name = parsegff(path_gff, return_name=True)
        fst_dat = parsefasta(path_fst) #list of SeqRecord objectds
        #create FullAnnotation object
        feat_list = list(gff_dat.features_of_type(feat_type))
        full_annotation = FullAnnotation(name, feat_list, fst_dat)
        full_annotation.convert_to_computational()
        
        #for now we want to drop fusion genes to check the simpliest case
        if drop_fusion_genes:
            full_annotation.drop_fusion_genes()
        
        full_annotations.append(full_annotation)
    return full_annotations

def create_train_set(frame:"Frame object"):
    print(frame.name)
    amino_frame = frame.splice_to_amino()
    #now we annotate states
    possible_states = ["ncd", "cd", "start_c", "stop_c", "model_start", "model_end"]
    amino_frame.states = ["ncd" for i in range(len(amino_frame.content))]
    for gene in amino_frame.genes:
        for amin_pos in range(gene[0], gene[1]):
            if amino_frame.states[amin_pos] != "ncd":
                print("Train set ruined and it is your fault...", "Frame name is", frame)
                print("skipping this one")
                return None
            if amin_pos == gene[0]:
                amino_frame.states[amin_pos] = "start_c"
            elif amin_pos == gene[1] - 1:
                amino_frame.states[amin_pos] = "stop_c"
            else:
                amino_frame.states[amin_pos] = "cd"
    #amino_frame.convert_to_pomegranate()
    return amino_frame


In [3]:
class Annotation:
    def __init__(self, name, seq, genes, genes_type = "computational"):
        self.name = name
        self.seq = seq
        self.genes = genes
        #now supports only single stranded annotations
        self.strands = "single"
        #genes type can be either classical or computational. computational -> gene = seq[gene_start,gene_finish]
        #classical = human readable genes
        self.genes_type = genes_type
    def convert_to_computational(self):
        if self.genes_type != "computational":
            #print(self.genes[0])
            for gene in self.genes:
                gene[0] -= 1 #= [gene[0] - 1, gene[1]]
            #print(self.genes[0])
        self.genes_type = "computational"
    
    def convert_to_classical(self):
        if self.genes_type != "classical":
            for gene in self.genes:
                gene[0] += 1 #= [gene[0] + 1, gene[1]]
        self.genes_type = "classical"
        
        
    def get_torch_representation(self):
        #torch type means that genes are represented as (n,3) array 
        #with ones in genes on suitable frame and 0 otherwise
        #frame is relative to make learning possible
        #that means for ATGTGG coords of ones will be ((0,0),(1,1),(2,2),(3,0),(4,1),(5,2))
        
        new_genes = np.zeros((len(self.seq), 3))
        self.convert_to_computational()
        for gene in self.genes:
            #not sure about that cycle, check after convert
            c_frame = 0
            for i in range(gene[0], gene[1]):
                '''if sum(new_genes[i]) > 0:
                    print(new_genes[i], "old", c_frame, i)
                    new_genes[i][c_frame] = 1
                    print(new_genes[i], "new")
                else:'''
                new_genes[i][c_frame] = 1
                c_frame += 1
                c_frame  %= 3
        return (self.seq, new_genes)
        
    
    def append(self, gene):
        self.genes.append(gene)
        
        
    def drop_fusion_genes(self, print_dropped = True):
        self.genes.sort()
        if len(self.genes) == 0:
            return
        ans_feats = [self.genes[0]]
        for i in range(len(self.genes)):
            f = self.genes[i]
            if (not f[0] == ans_feats[-1][0]) and (not f[1] == ans_feats[-1][1]):
                ans_feats.append(f)
            elif print_dropped:
                print("Fusion gene:", f, self.name)
        self.genes = ans_feats



class AminoFrame:
    def __init__(self, name:"'name to manually reconstruct all of these'", 
                 content:"cut seq of [YYY] aminoacids", frame:"could be either 0, +1 or +2", 
                 genes:"genes in amino coords", genes_type = "computational", states = None):
        self.name = name
        self.content = content
        if not frame in (0,1,2):
            print("Incorrect frame warning")
        self.frame = frame
        self.genes = genes
        self.genes_type = genes_type
        self.states = states
        
    #pomegranate format has model_start and model_end bullshit
    def convert_to_pomegranate(self):
        if self.genes_type != "pomegranate":
            self.content = ["AAA"] + self.content + ["AAA"]
            self.genes = [[e[0] + 1, e[1] + 1] for e  in self.genes]
            self.genes_type = "pomegranate"
            self.states = ["None-start"] + self.states + ["None-end"]
    def convert_to_computational(self):
        if self.genes_type != "computational":
            self.content = self.content[1:-1]
            self.genes = [[e[0] - 1, e[1] - 1] for e  in self.genes]
            self.genes_type = "computational"
            self.states = self.states[1:-1]
    
    def assemble_to_regular(self):
        self.convert_to_computational()
        regular_content = "".join(self.content)
        regular_genes = [[e[0] * 3, e[1] * 3 ] for e in self.genes]
        return Frame(self.name, regular_content, self.frame, regular_genes, self.genes_type)
    
    def create_genes_from_states(self,states):
        started = False
        current_start = 0
        if len(states) != len(self.content):
            print("Wrong states")
            return False
        for i, state in enumerate(states):
            if not started:
                if state == "start_c":
                    current_start = i
                if state in ("stop_c", "cd"):
                    print("Error")
            if started:
                if state == "stop_c":
                    self.genes.append([current_start, i])
                    started = False
                if state in ("ncd", "start_c"):
                    print("Error")
        return True
    
    def get_start_and_stop_sets(self):
        start_s = set()
        stop_s = set()
        for gene_c in self.genes:
            gene_seq = self.content[gene_c[0]: gene_c[1]]
            start_s.add(gene_seq[0])
            stop_s.add(gene_seq[1])
        return start_s, stop_s
    
    


#frame content does not guaranteed to be div by 3
class Frame(Annotation):
    def __init__(self, name:"'name to manually reconstruct all of these'", 
                 content:"cut seq", frame:"could be either 0, +1 or +2", genes, genes_type = "computational"):
        self.name = name
        self.content = content
        if not frame in (0,1,2):
            print("Incorrect frame warning")
        self.frame = frame
        self.genes = genes
        self.genes_type = genes_type
    
    def __repr__(self):
        return print_to_string(self.name, ", frame:", self.frame)
    
    def splice_to_amino(self):
        amino_content = []
        i = 0
        while i + 3 <= len(self.content):
            amino_content.append(self.content[i:i+3])
            i += 3
        amino_genes = [[e[0] // 3, e[1] // 3] for e  in self.genes]
        
        return AminoFrame(self.name, amino_content, self.frame, amino_genes, self.genes_type)
    
    
class FullAnnotation:
    def __init__(self, name, feat_list:"raw gff dat", seqs:"raw fasta dat", genes_type = "classical"):
        self.name = name
        self.genes_type = genes_type
        self.annots = dict(zip([rec.name for rec in seqs],[Annotation(name,rec.seq, [], genes_type) for rec in seqs]))
        for f in feat_list:
            self.annots[f.seqid].append([f.start,f.stop])
    
    def convert_to_computational(self):
        for annot_key in self.annots:
            self.annots[annot_key].convert_to_computational()
        self.genes_type = "computational"
            
    def convert_to_classical(self):
        for annot_key in self.annots:
            self.annots[annot_key].convert_to_classical()      
        self.genes_type = "classical"
        
    def drop_fusion_genes(self, print_dropped = True):
        for annot_key in self.annots:
            self.annots[annot_key].drop_fusion_genes(print_dropped=print_dropped)      
    
    def __repr__(self):
        return print_to_string(self.name, "genes_type:", self.genes_type, "with", len(self.annots), "annotation(s)")

In [8]:
a = "abcdefg"
a[0:3]

'abc'