In [6]:
import filecmp
import glob

filecmp.cmp(
    'integration/expected/isolate_1/isolate_1.bam',
    'integration/observed/isolate_1/isolate_1.bam',
    shallow=False,
)

True

In [17]:
expected = 'integration/expected/isolate_1'
observed = 'integration/observed/isolate_1'

dircmp = filecmp.dircmp(expected, observed)
match, mismatch, errors = filecmp.cmpfiles(expected, observed, dircmp.left_list, False)

In [27]:
from re import sub, finditer
from typing import Dict

def _subtract_indels_from_counts(upper_reads: str, base_counts: Dict[str, int]) -> None:
    for indel in finditer(r'([+|-])(\d+)(\w+)', upper_reads):
        indel_size = int(indel.group(2))
        indel_sequence = indel.group(3)[:indel_size]
        for base in base_counts:
            base_counts[base] -= indel_sequence.count(base)

def _base_ratios_from_reads(reads: str, depth: int, ref: str) -> Dict[str, float]:
    upper_reads = reads.upper()

    # ^G. marks the start of a read segment where the ASCII minus 33 of G is the mapping quality
    # and the character AFTER those 2 is the actual read - in this case a match to the reference.
    # Drop the first 2 characters as they are not actual reads
    if '^' in upper_reads:
        upper_reads = sub('\^.', '', upper_reads)

    base_counts = {base: upper_reads.count(base) for base in 'ACTG'}
    if ref in 'ACTG':
        base_counts[ref] += upper_reads.count('.') + upper_reads.count(',')
    if '+' in upper_reads or '-' in upper_reads:
        _subtract_indels_from_counts(upper_reads, base_counts)
    # Only calculate ratios for present bases
    base_ratios = {
        base: round(base_counts[base] / depth, 3)
        for base in base_counts if base_counts[base]
    }
    return base_ratios

In [None]:
from typing import Optional, List, Tuple

def _get_genotype_and_valid_bases_and_valid_ratios(
    ref: str, bases: List[str], ratio_strs: list, hetero_min: float, hetero_max: float
) -> Optional[Tuple[str, str, str]]:

    ratios = map(float, ratio_strs)

    genotype = ''
    valid_bases = ''
    valid_ratios = ''

    for (base, ratio, ratio_str) in zip(bases, ratios, ratio_strs):
        if ratio < hetero_min:
            continue
        valid_ratios += ratio_str + ','
        if ratio >= hetero_max:
            genotype = '0/0' if base == ref else '1/1'
            valid_bases = base + base
            break
        valid_bases += base

    if not valid_bases:
        return None

    if not genotype:
        if len(valid_bases) == 1:
            if valid_bases == ref:  # If only ref has min <= freq <= max
                return None
            genotype = '1/1'
            valid_bases = valid_bases + valid_bases
        elif len(valid_bases) == 2:
            genotype = '0/1' if ref in valid_bases else '1/2'
        else:
            genotype = '?'

    valid_ratios = valid_ratios.rstrip(',')
    return genotype, valid_bases, valid_ratios

In [None]:
def test_get_genotype_and_valid_bases_and_valid_ratios():
        columns = [
            'ref', 'bases', 'ratios', 'h_min', 'h_max', 'genotype', 'v_bases', 'v_ratios'
        ]
        cases = [
            ('A', ['A', 'T'], ['0.9', '0.1'], .2, .8, '0/0', 'AA', '0.9'),
            ('A', ['T', 'A'], ['0.1', '0.9'], .2, .8, '0/0', 'AA', '0.9'),
            ('A', ['A', 'T'], ['0.5', '0.5'], .2, .8, '0/1', 'AT', '0.5,0.5'),
            ('A', ['T', 'A'], ['0.5', '0.5'], .2, .8, '0/1', 'TA', '0.5,0.5'),
            ('A', ['A', 'T'], ['0.1', '0.9'], .2, .8, '1/1', 'TT', '0.9'),
            ('A', ['T', 'C'], ['0.5', '0.5'], .2, .8, '1/2', 'TC', '0.5,0.5'),
            ('A', ['A','C','T'], ['0.3', '0.4', '0.3'], .2, .8, '?', 'ACT', '0.3,0.4,0.3'),
        ]
        for ref, bases, ratio_strs, hetero_min, hetero_max, genotype, valid_bases, valid_ratios in cases:
            assert _get_genotype_and_valid_bases_and_valid_ratios(
                ref, bases, ratio_strs, hetero_min, hetero_max
            ) == (genotype, valid_bases, valid_ratios)

test_genotype_and_validate_bases

In [48]:
cases = [
    ('.' * 22,                     22, 'G', {'G': 1}),
    ('.' * 9 + 'T',                10, 'A', {'A': .9, 'T': .1}),
    ('.' * 4 + 'T' * 4 + 'G' * 8,  16, 'A', {'A': 0.25, 'T': 0.25, 'G': 0.5}),
    ('.' * 4 + 'A' * 4 + 'G' * 8,  16, 'T', {'A': 0.25, 'T': 0.25, 'G': 0.5}),
    ('.,' * 3 + 'T' * 4,           10, 'A', {'A': .6, 'T': .4}),
    ('.,' * 3 + 'T' * 4 + '$',     10, 'A', {'A': .6, 'T': .4}),
    ('.,' * 3 + 'T' * 4 + '^',     10, 'A', {'A': .6, 'T': .4}),
    ('.,' * 3 + '+3ATA' + 'T' * 4, 10, 'A', {'A': .6, 'T': .4}),
    ('.,' * 3 + '-3ATA' + 'T' * 4, 10, 'A', {'A': .6, 'T': .4}),
    # ^G. marks the start of a read segment where the ASCII minus 33 of G is the mapping quality
    # and the character AFTER those 2 is the actual read - in this case a match to the reference
    ('.....^G.....',               10, 'T', {'T': 1.0}), 

]
for reads, depth, ref, base_ratios in cases:
    assert _base_ratios_from_reads(reads, depth, ref) == base_ratios

In [47]:
_base_ratios_from_reads('.' * 5 + '^G.' + '.' * 4, 10, 'A')

{'A': 1.0}

In [42]:
'A' * 4 + 'T' * 4 + 'G' * 8

'AAAATTTTGGGGGGGG'

In [41]:
_base_ratios_from_reads('AAAATTTTGGGGGGGG', 16, 'A')

{'A': 0.25, 'T': 0.25, 'G': 0.5}

In [21]:
from os.path import join

for filename in dircmp.left_list:
    assert filecmp.cmp(join(expected, filename), join(observed, filename), shallow=False)

AssertionError: 

In [13]:

dircmp.same_files
dircmp.left_list

['isolate_1.bam',
 'isolate_1.fasta',
 'isolate_1.pileup',
 'isolate_1.sam',
 'isolate_1_exons.fasta',
 'isolate_1_exons_concat.fasta',
 'isolate_1_porechopped.fastq.gz',
 'isolate_1_snp_freq.tsv',
 'isolate_1_snp_ratios.tsv',
 'isolate_1_sorted.bam']