In [121]:
import os
import numpy as np
from Bio import SeqIO
from Bio.Seq import Seq as RawSeq

os.makedirs("./cache", exist_ok=True)

In [250]:
np.random.seed(12345)
ALPHABET = "ACGT"
INSERT_LEN = 40_000

END_LEN = 600
CUTT_END = 1000
SHORT_LEN = 30

class TestCases:
    @classmethod
    def generaete_insert(cls):
        return RawSeq("".join(ALPHABET[i] for i in np.random.choice(len(ALPHABET), INSERT_LEN, replace=True)))

    def case_perfect(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        return fwd, rev, [s], s

    def case_perfect_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        return fwd, rev, [s.reverse_complement(inplace=False)], s

    def case_assembly_slightly_short(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        return fwd, rev, [str(s)[SHORT_LEN:len(s)-SHORT_LEN]], s

    def case_assembly_slightly_short_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        return fwd, rev, [s.reverse_complement(inplace=False)[SHORT_LEN:len(s)-SHORT_LEN]], s

    def case_needs_scaffold(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        mid = len(s)//2
        SCAFFOLD_OVERLAP = 1000
        return fwd, rev, [str(s)[:mid+(SCAFFOLD_OVERLAP//2)], str(s)[(mid-SCAFFOLD_OVERLAP//2):]], s

    def case_needs_scaffold_off_center(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        SCAFFOLD_OVERLAP = 2000
        mid = (len(s)//2)+5_000
        return fwd, rev, [str(s)[:mid+SCAFFOLD_OVERLAP//2], str(s)[mid-SCAFFOLD_OVERLAP//2:]], s
    
    def case_needs_scaffold_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        mid = len(s)//2
        SCAFFOLD_OVERLAP = 1000
        return fwd, rev, [str(s)[:mid+SCAFFOLD_OVERLAP//2], s.reverse_complement(inplace=False)[:mid+SCAFFOLD_OVERLAP//2]], s

    def case_needs_scaffold_slightly_short_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        mid = len(s)//2
        SCAFFOLD_OVERLAP = 1000
        return fwd, rev, [str(s)[SHORT_LEN:mid+SCAFFOLD_OVERLAP//2], s.reverse_complement(inplace=False)[SHORT_LEN:mid+SCAFFOLD_OVERLAP//2]], s
    
    def case_fwd_only(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[:len(s)-CUTT_END]
        return fwd, rev, [ss], ss
    
    def case_rev_only(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[CUTT_END:]
        return fwd, rev, [ss], ss
    
    def case_fwd_missing(self, s: RawSeq):
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        return None, rev, [s], s
    
    def case_rev_missing(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        return fwd, None, [s], s
    
    def case_fwd_only_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[:len(s)-CUTT_END]
        return fwd, rev, [RawSeq(ss).reverse_complement(inplace=False)], ss
    
    def case_rev_only_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[CUTT_END:]
        return fwd, rev, [RawSeq(ss).reverse_complement(inplace=False)], ss
    
    def case_fwd_only_slightly_short(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[:len(s)-CUTT_END]
        return fwd, rev, [ss[SHORT_LEN:]], ss
    
    def case_rev_only_slightly_short(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[CUTT_END:]
        return fwd, rev, [ss[:len(s)-SHORT_LEN]], ss
    
    def case_fwd_only_slightly_short_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[:len(s)-CUTT_END]
        return fwd, rev, [RawSeq(ss).reverse_complement(inplace=False)[:-SHORT_LEN]], ss
    
    def case_rev_only_slightly_short_rc(self, s: RawSeq):
        fwd = str(s)[:END_LEN]
        rev = s.reverse_complement(inplace=False)[:END_LEN]
        ss = str(s)[CUTT_END:]
        return fwd, rev, [RawSeq(ss).reverse_complement(inplace=False)[SHORT_LEN:]], ss

expected = {}
cases = []
fwd_ends, rev_ends, contigs = "./cache/fwds.fa", "./cache/revs.fa", "./cache/contigs.fa"
with open(fwd_ends, "w") as fw, open(rev_ends, "w") as rv, open(contigs, "w") as con:
    for k, v in TestCases.__dict__.items():
        if not k.startswith("case"): continue
        cases.append(k)
        test_seq = TestCases.generaete_insert()
        _fwd, _rev, _assemblies, _expected = v(TestCases, test_seq)
        if _fwd:
            fw.write(f">{k}\n{_fwd}\n")
        if _rev:
            rv.write(f">{k}\n{_rev}\n")
        for i, s in enumerate(_assemblies):
            con.write(f">{k}_{i}\n{s}\n")
        expected[k] = RawSeq(str(_expected))

In [251]:
test_results_dir = "./cache/test_results"
os.system(f"""\
    export PYTHONPATH=/home/tony/workspace/tools/FabFos/src:$PYTHONPATH
    export PATH=/home/tony/lib/miniforge3/envs/fabfos/bin:$PATH
    /home/tony/lib/miniforge3/envs/fabfos/bin/python -m fabfos run \
        --overwrite \
        --min_length 20000 \
        --min_length_range 2000 \
        --gap_str N \
        -t 14 \
        -a {contigs} \
        --endf {fwd_ends} \
        --endr {rev_ends} \
        --ends_facing \
        --end_regex "\\w+" \
        -o {test_results_dir} \
""")

[33mBuilding DAG of jobs...[0m
[33mUsing shell: /usr/bin/bash[0m
[33mProvided cores: 14[0m
[33mRules claiming more threads will be scaled down.[0m
[33mJob stats:
job                count
---------------  -------
acquire_contigs        1
scaffold               1
target                 1
total                  3
[0m
[33mSelect jobs to execute...[0m
[32m[0m
[32m[Mon Feb  5 14:52:05 2024][0m
[32mrule acquire_contigs:
    input: internals/temp_reads/original_reads.json, internals/temp_assembly/assemblies.json
    output: internals/temp_assembly/contigs.json
    jobid: 2
    reason: Forced execution
    threads: 14
    resources: tmpdir=/tmp[0m
[32m[0m
[33mBuilding DAG of jobs...[0m
[33mUsing shell: /usr/bin/bash[0m
[33mProvided cores: 14[0m
[33mRules claiming more threads will be scaled down.[0m
[33mJob stats:
job                count
---------------  -------
acquire_contigs        1
scaffold               1
target                 1
total                  3
[0m

0

In [252]:
actual = {s.id:s.seq for s in SeqIO.parse(f"{test_results_dir}/scaffolds.fna", "fasta")}
len(actual), len(expected)

(18, 18)

In [257]:
_err = False
for k in cases:
    if k not in actual:
        print("missing", k)
        _err = True

    if expected[k] != actual[k]:
        print("unequal", k)
        print(len(expected[k])-len(actual[k]))
        _err = True
if not _err:
    print("all tests passed")

all tests passed


In [258]:
k = "case_needs_scaffold_slightly_short_rc"
print(k)
print(len(expected[k]), len(actual[k]), len(expected[k])-len(actual[k]))
print(expected[k])
print(actual[k])

case_needs_scaffold_slightly_short_rc
40000 40000 0
AGTAGATGTAAAAGAGGTGTATTGGGTGGGAGCAAATATAGGATTGGGCCATATTCTACCCTCCCGGTTAGGGGTAGGTGGAAGGCTTATAGCTTGCGAACCCGCGCGTTTATCGCACAGGTAAATCTGTAAGTTCAGCGATCCCGGCGAGACATCATTGGTGGATCTGCGTATCGTCCGTAATTCTCAATCGTGGGAACTTACCCCCCTGCACAGTAAGTACTATACTACTGACTGAGTTCTGACACGTTATGCTCTAATATACACTGGTGCGCATAGGTCTGATGCAACTGCCGGACCCACATCGCGGTCGACATACAGTGTGACTTAAGATTTCAAACCGTGCGCATTACTCCAGTGAGCCAACTCAAGCTCACCCATTCGTATCAACTGCTCTCTCTTGCTTTCTGACCATCTTGTTTGTAGAAGAAGCACTCTACGTAGAGCTCTCGTTCTGTTGGTCTGTGTTAGATGCCTTCCGAGGGCTTAAGGGAAGTTTCGTCGATTGGCAAAAGCGTCCGTCACTACGTTCCTCAGAATACGAATGAATGAACTAATCCTCTCAGCGACCGAAGATTACATGCGCTGCGACAGGGATTTCACAGAAATGTCGAAGACCCACACAACCGTTGCTTTAGATGACAATCGGTTGATCACATTAACCAGGTCACGTAAGGGACCTTATCTCAGTGCTCATCACCTCAGCGGATTACAGAGGAAAAGACATATATGTTACACCTAAGTGCGACTAGAGGAAGAACCAGAGCCATAGGCACGAGCCTAGAAACATGGAGATAGCATATTCTATGGTAGTAGACATTCACAGACCCCTGGCCGTGTCCCACCCCGCTGTGTCCCCACTATTCTCACTCCTGTATCATGGGGATCTCGTATAATCCATCTTTGCACGAGAAAGTACCGGACGGAAGAAACATACATATAATCTCA

In [259]:
RawSeq("CAACTTCTCAAATGGACCCGGTCTTTACT").reverse_complement(inplace=False)

Seq('AGTAAAGACCGGGTCCATTTGAGAAGTTG')

In [260]:
x = "0123456789"
s, e = 5, 1
x[e:s], x[::-1][len(x)-s:len(x)-e]

('1234', '4321')