In [1]:
import re
import numpy as np
import pandas as pd
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqFeature import FeatureLocation, ExactPosition
from Bio.SeqRecord import SeqRecord
from pyscripts.config import path2
from pyscripts.datasets import DatasetLoader
from pyscripts.genomeutil import is_regular, bac_translate
dloader = DatasetLoader()

In [2]:
from enum import IntEnum
class AbsFrame(IntEnum):
    NONE  = 0b00000000
    P0    = 0b00000001
    P1    = 0b00000010
    P2    = 0b00000100
    M0    = 0b00001000
    M1    = 0b00010000
    M2    = 0b00100000
    ALL   = P0 | P1 | P2 | M0 | M1 | M2

plusAbs  = [AbsFrame.P0, AbsFrame.P1, AbsFrame.P2]
minusAbs = [AbsFrame.M0, AbsFrame.M1, AbsFrame.M2]
    
class RelFrame(IntEnum):
    UNDET = 0b00000000
    FW0   = 0b00000001
    FW1   = 0b00000010
    FW2   = 0b00000100
    RC0   = 0b00001000
    RC1   = 0b00010000
    RC2   = 0b00100000

map2AbsFrame = {
    (0, +1): AbsFrame.P0, (1, +1): AbsFrame.P1, (2, +1): AbsFrame.P2, 
    (0, -1): AbsFrame.M0, (1, -1): AbsFrame.M1, (2, -1): AbsFrame.M2
}

map2RelFrame = { # (self_abs, coding_abs) => self_relative_to_coding
    (AbsFrame.P0, AbsFrame.P0): RelFrame.FW0, (AbsFrame.P0, AbsFrame.M0): RelFrame.RC0, 
    (AbsFrame.P0, AbsFrame.P1): RelFrame.FW2, (AbsFrame.P0, AbsFrame.M1): RelFrame.RC1, 
    (AbsFrame.P0, AbsFrame.P2): RelFrame.FW1, (AbsFrame.P0, AbsFrame.M2): RelFrame.RC2, 

    (AbsFrame.P1, AbsFrame.P0): RelFrame.FW1, (AbsFrame.P1, AbsFrame.M0): RelFrame.RC2, 
    (AbsFrame.P1, AbsFrame.P1): RelFrame.FW0, (AbsFrame.P1, AbsFrame.M1): RelFrame.RC0, 
    (AbsFrame.P1, AbsFrame.P2): RelFrame.FW2, (AbsFrame.P1, AbsFrame.M2): RelFrame.RC1, 

    (AbsFrame.P2, AbsFrame.P0): RelFrame.FW2, (AbsFrame.P2, AbsFrame.M0): RelFrame.RC1, 
    (AbsFrame.P2, AbsFrame.P1): RelFrame.FW1, (AbsFrame.P2, AbsFrame.M1): RelFrame.RC2, 
    (AbsFrame.P2, AbsFrame.P2): RelFrame.FW0, (AbsFrame.P2, AbsFrame.M2): RelFrame.RC0, 

    (AbsFrame.M0, AbsFrame.P0): RelFrame.RC0, (AbsFrame.M0, AbsFrame.M0): RelFrame.FW0, 
    (AbsFrame.M0, AbsFrame.P1): RelFrame.RC2, (AbsFrame.M0, AbsFrame.M1): RelFrame.FW1, 
    (AbsFrame.M0, AbsFrame.P2): RelFrame.RC1, (AbsFrame.M0, AbsFrame.M2): RelFrame.FW2, 

    (AbsFrame.M1, AbsFrame.P0): RelFrame.RC1, (AbsFrame.M1, AbsFrame.M0): RelFrame.FW2, 
    (AbsFrame.M1, AbsFrame.P1): RelFrame.RC0, (AbsFrame.M1, AbsFrame.M1): RelFrame.FW0, 
    (AbsFrame.M1, AbsFrame.P2): RelFrame.RC2, (AbsFrame.M1, AbsFrame.M2): RelFrame.FW1, 

    (AbsFrame.M2, AbsFrame.P0): RelFrame.RC2, (AbsFrame.M2, AbsFrame.M0): RelFrame.FW1, 
    (AbsFrame.M2, AbsFrame.P1): RelFrame.RC1, (AbsFrame.M2, AbsFrame.M1): RelFrame.FW2, 
    (AbsFrame.M2, AbsFrame.P2): RelFrame.RC0, (AbsFrame.M2, AbsFrame.M2): RelFrame.FW0, 
}


In [3]:
def iter_noncoding_reading_frames(record):    
    seq = str(record.seq)
    coding_absframe = np.full(len(seq), AbsFrame.NONE, dtype='int8')
    
    start_plus, start_minus = [], []
    # record all start codons of canonical ORFs
    # record absolute RF of all ORFs
    for cds in filter(lambda feat: feat.type=='CDS', record.features):
        loc = cds.location
        if is_regular(cds, record):
            for pt in loc.parts:
                coding_absframe[pt.start:pt.end] |= map2AbsFrame[(pt.start%3, pt.strand)] 
            if loc.strand > 0:
                start_plus.append(loc.start if len(loc.parts)==1 else loc.parts[0].start)
            else:
                start_minus.append(loc.end if len(loc.parts)==1 else loc.parts[1].end)
        else:
            for pt in loc.parts:
                coding_absframe[pt.start:pt.end] = AbsFrame.ALL
    
    # regex patterns of stop codons to record all of them
    ptrn_stop_plus  = re.compile('TAA|TAG|TGA|TRA|TAR')
    ptrn_stop_minus = re.compile('TTA|CTA|TCA|TYA|YTA')
    
    # split by (1) start codons of canonical ORFs; (2) all stop codons on the genome
    plus = pd.concat([
        pd.DataFrame({'s': start_plus, 'e': start_plus}, dtype=int),
        pd.DataFrame([
            m.span() 
            for m in re.finditer(ptrn_stop_plus, seq)
        ], columns=['s','e'], dtype=int)
    ])
    
    minus = pd.concat([
        pd.DataFrame({'s': start_minus, 'e': start_minus}, dtype=int),
        pd.DataFrame([
            m.span() 
            for m in re.finditer(ptrn_stop_minus, seq)
        ], columns=['s','e'], dtype=int)
    ])
    
    # find all non-stop and non-coding regions
    for sign, strand_pm in {+1: plus, -1: minus}.items():
        cross_pm = {}
        for self_absfr, pm_fr in strand_pm.groupby((strand_pm['s']%3).apply(lambda st: map2AbsFrame[(st, sign)])):
            pm_fr = pm_fr.sort_values(by=['s','e']).reset_index(drop=True)
            cross_pm[self_absfr] = (pm_fr.iloc[0, 0], pm_fr.iloc[-1, 1])
            
            for s, e in zip(pm_fr['e'].values, pm_fr['s'].values[1:]):
                if s == e or (coding_absframe[s:e] & self_absfr).any(): 
                    ## ignore in-frame overlap with any (potential) coding region
                    continue  
                
                ## assign relframe to non-coding reading frames
                loc = FeatureLocation(ExactPosition(s), ExactPosition(e), sign)
                transl = bac_translate(loc.extract(seq))
                assert '*' not in transl
                
                self_relframe = np.full_like(coding_absframe[s:e], RelFrame.UNDET, dtype='int8')
                for coding_absfr in (plusAbs + minusAbs):
                    self_relframe[coding_absframe[s:e] == coding_absfr] = map2RelFrame[(self_absfr, coding_absfr)]
                self_relframe_transl = (self_relframe[0::3] & self_relframe[1::3] & self_relframe[2::3])[::sign]
                yield loc, transl, self_relframe_transl
                assert (self_relframe_transl != RelFrame.FW0).all(), (s, e, loc, transl)
        
        if record.annotations['topology'] == 'circular':
            ## Handling corner cases: ARFs that cross the boundary of a circular DNA record.
            ## Very troublesome...
            mod = len(seq) % 3
            pair_first  = plusAbs if sign>0 else minusAbs
            pair_second = pair_first[-mod:] + pair_first[:-mod]

            for self_absfr1, self_absfr2 in zip(pair_first, pair_second):
                s1, e2 = cross_pm[self_absfr1][1], cross_pm[self_absfr2][0]
                coding_absframe1, coding_absframe2 = coding_absframe[s1:], coding_absframe[:e2]
                if (coding_absframe[s1:] & self_absfr1).any() or (coding_absframe[:e2] & self_absfr2).any(): 
                    ## ignore in-frame overlap with any (potential) coding region
                    continue
                self_relframe1 = np.full_like(coding_absframe1, RelFrame.UNDET, dtype='int8')
                self_relframe2 = np.full_like(coding_absframe2, RelFrame.UNDET, dtype='int8')    
                for coding_absfr in (plusAbs + minusAbs):
                    self_relframe1[coding_absframe1 == coding_absfr] = map2RelFrame[(self_absfr1, coding_absfr)]
                    self_relframe2[coding_absframe2 == coding_absfr] = map2RelFrame[(self_absfr2, coding_absfr)]
                self_relframe = np.hstack([self_relframe1, self_relframe2])
                self_relframe_transl = (self_relframe[0::3] & self_relframe[1::3] & self_relframe[2::3])[::sign]

                loc = CompoundLocation([
                    FeatureLocation(ExactPosition(s1), ExactPosition(len(seq)), sign),
                    FeatureLocation(ExactPosition(0), ExactPosition(e2), sign)
                ][::sign])
                transl = bac_translate(loc.extract(seq))

                if (p := transl.find('*')) < 0:
                    yield loc, transl, self_relframe_transl
                    assert len(transl) == len(self_relframe_transl)
                else:
                    assert '*' not in transl[p+1:]
                    e1, s2 = s1 + (len(seq)-s1) // 3 * 3, e2 % 3
                    loc1 = FeatureLocation(ExactPosition(s1), ExactPosition(e1), sign)
                    loc2 = FeatureLocation(ExactPosition(s2), ExactPosition(e2), sign)
                    if sign > 0:
                        yield loc1, transl[:p],   self_relframe_transl[:p]
                        yield loc2, transl[p+1:], self_relframe_transl[p+1:]
                        assert len(loc2) == len(self_relframe_transl[p+1:]) * 3 and (len(seq)-s1) // 3 == p
                    else:
                        yield loc2, transl[:p],   self_relframe_transl[:p]
                        yield loc1, transl[p+1:], self_relframe_transl[p+1:]
                        assert len(loc2) == p * 3 and (len(seq)-s1) // 3 == len(self_relframe_transl[p+1:])
        else:
            for self_absfr, (e2, s1) in cross_pm.items():
                e1, s2 = s1 + (len(seq)-s1) // 3 * 3, e2 % 3
                loc1 = FeatureLocation(ExactPosition(s1), ExactPosition(e1), sign)
                loc2 = FeatureLocation(ExactPosition(s2), ExactPosition(e2), sign)
                transl1 = bac_translate(loc1.extract(seq))
                transl2 = bac_translate(loc2.extract(seq))
                coding_absframe1 = coding_absframe[s1:e1]
                coding_absframe2 = coding_absframe[s2:e2]
                self_relframe1 = np.full_like(coding_absframe1, RelFrame.UNDET, dtype='int8')
                self_relframe2 = np.full_like(coding_absframe2, RelFrame.UNDET, dtype='int8')    
                for coding_absfr in (plusAbs + minusAbs):
                    self_relframe1[coding_absframe1 == coding_absfr] = map2RelFrame[(self_absfr, coding_absfr)]
                    self_relframe2[coding_absframe2 == coding_absfr] = map2RelFrame[(self_absfr, coding_absfr)]
                self_relframe_transl1 = (self_relframe1[0::3] & self_relframe1[1::3] & self_relframe1[2::3])[::sign]
                self_relframe_transl2 = (self_relframe2[0::3] & self_relframe2[1::3] & self_relframe2[2::3])[::sign]
                yield loc1, transl1, self_relframe_transl1
                yield loc2, transl2, self_relframe_transl2
                

In [4]:
refs = pd.read_csv(path2.metadata/'expression'/'Kim+20SciData_refs.tsv', sep='\t')

In [5]:
for acc, code in zip(refs['reference_assembly'], refs['species_code']):
    rec = max(dloader.load_genome(acc, dirname=path2.pubdata/'expression'/'Kim+20SciData'/'reference'), key=len)
    
    SeqIO.write([
        SeqRecord(Seq(transl), id=(tag:=f'{rec.id}{loc}'), name=tag, description=tag)
        for loc, transl, relfr in iter_noncoding_reading_frames(rec)
        if len(transl) >= 30 and (relfr >= RelFrame.RC0).any() and np.count_nonzero(relfr < RelFrame.RC0) / len(relfr) < 0.3
    ], path2.data/'expression'/'Kim+20SciData'/'noncoding_og'/'input'/code, 'fasta')

```
docker-compose run --rm -w /data/expression/Kim+20SciData/noncoding_og sonicparanoid sonicparanoid -i input/ -o output/ -t 32 -p default >& data/expression/Kim+20SciData/noncoding_og/sonic.log
```