In [1]:
import pysam
import pyfastx

In [2]:
fa = pyfastx.Fasta("/ccb/salz4-3/hji20/off-target-probe-checker/otpc/test_data/xenium_human_breast_gene_expression_panel_probe_sequences.fasta")

In [12]:
def convert_md2bit_nucmer(s, tstart):
    running = ""
    bit_s = ""
    for c in s:
        if c.isdigit():
            running += c
        else:
            if len(running) > 0:
                bit_s += '1' * int(running)
            bit_s += '0'
            running = ""
    if len(running) > 0:
        bit_s += '1' * (int(running) - tstart)
    return bit_s

def convert_cigar2bit(tup):
    bit_s = ""
    left_clip = 0
    right_clip = 0
    ins_info = []
    for x in tup:
        op, l = x
        if op == 0: # match
            bit_s += '1' * l
        elif op == 1 or op == 4: # soft clip or ins
            if op == 4:
                if len(bit_s) == 0:
                    left_clip = l
                else:
                    right_clip = l
            if op == 1:
                ins_info.append((bit_s.count('1'), l))
            bit_s += '0' * l
    return bit_s, (left_clip, right_clip), ins_info

def bitwise_and(s1, s2):
    out = ""
    assert len(s1) == len(s2)
    for i in range(len(s1)):
        if s1[i] == '1' and s2[i] == '1':
            out += '1'
        else:
            out += '0'
    return out

def char2sym(char):
    if char == '0':
        return 'X'
    return '='

def compress_bvec(bvec):
    out = []
    curr_char = bvec[0]
    ctr = 1
    for char in bvec[1:]:
        if char == curr_char:
            ctr += 1
        else:
            out.append(f"{char2sym(curr_char)}{ctr}")
            curr_char = char
            ctr = 1
    out.append(f"{char2sym(curr_char)}{ctr}")
    return ''.join(out)

def convert_md2bit_nucmer_del(s, tstart):
    running = ""
    bit_s = ""
    ignore = False
    mismatch_info = []
    for c in s:
        if c.isdigit():
            ignore = False
            running += c
        else:
            if ignore: continue
            if c == '^':
                if len(running) > 0:
                    bit_s += '1' * int(running)
                running = ""
                ignore = True
                continue
            else:
                if len(running) > 0:
                    bit_s += '1' * int(running)
                mismatch_info.append(len(bit_s))
                bit_s += '0'
                running = ""
    if len(running) > 0:
        bit_s += '1' * (int(running) - tstart)
    return bit_s, mismatch_info

def convert_cigar2bit_del(tup, n, mismatch_info):
    bit_s_lst = []
    bit_s = ""
    running = 0
    mut_ctr = 0
    for x in tup:
        op, l = x
        if op == 0: # match
            temp = '1' * l
            # TODO: test if this behaves as expected
            # switch '1' to '0' if a mismatch is reported in this stretch of matches
            if len(mismatch_info) > 0 and mut_ctr != len(mismatch_info):
                temp_lst = list(temp)
                print(running)
                print(l)
                for m in mismatch_info:
                    if m >= running and m < running + l:
                        temp_lst[m - running] = '1'
                        mut_ctr += 1
                temp = ''.join(temp_lst)
            running += l
            bit_s += temp
        elif op == 1 or op == 4: # soft clip or ins
            bit_s += '0' * l
        elif op == 2:
            temp = '0' * len(bit_s)
            bit_s += '0' * (n - len(bit_s))
            bit_s_lst.append(bit_s)
            bit_s = temp
    bit_s_lst.append(bit_s)
    return bit_s_lst

In [27]:
convert_md2bit_nucmer("5a0a3g6a144", 122)

'1111100111011111101111111111111111111111'

In [19]:
fn = "/ccb/salz4-3/hji20/off-target-probe-checker/otpc/test_results/dev_nucmer/main.sam"
unaligned = []
ainfos = dict()
with pysam.AlignmentFile(fn, 'r') as fh:
    for brec in fh:
        qname = brec.query_name
        if brec.is_unmapped:
            unaligned.append(qname)
            continue
        elif brec.is_supplementary:
            continue
        else:
            tname = brec.reference_name
            qlen = len(fa[qname].seq)
            crit_bvec = "0" * 5 + "1" * (qlen - 2 * 5) + "0" * 5 # pad = 5 for testing
            assert len(crit_bvec) == qlen
            crit_dvec = int(crit_bvec, 2)
            if qname not in ainfos:
                ainfos[qname] = set()
            # store relevant infos
            cigar = brec.cigarstring
            cigar_tups = brec.cigartuples
            num_mismatch = int(brec.get_tag('NM'))
            md_tag = brec.get_tag('MD')
            tstart = brec.reference_start
            if cigar == f'{qlen}M':
                if num_mismatch == 0:
                    ainfos[qname].add((tname, f'={qlen}'))
                else:
                    md_bvec = convert_md2bit_nucmer(md_tag, tstart)
                    if crit_dvec & int(md_bvec, 2) == crit_dvec:
                        ainfos[qname].add((tname, compress_bvec(md_bvec)))
            else:
                if 'D' in cigar:
                    md_bvec, mismatch_info = convert_md2bit_nucmer_del(md_tag, tstart)
                    cigar_bvecs = convert_cigar2bit_del(cigar_tups, qlen, mismatch_info)
                else:
                    cigar_bvec, clip_info, ins_info = convert_cigar2bit(cigar_tups)
                    if num_mismatch == 0:
                        final_bvec = cigar_bvec
                    else:
                        md_bvec = convert_md2bit_nucmer(md_tag, tstart)
                        if len(ins_info) > 0:
                            temp = ""
                            prev_p = None
                            for i in range(len(ins_info)):
                                p, l = ins_info[i]
                                if i == 0:
                                    temp += md_bvec[:p] + '0' * l
                                else:
                                    temp += md_bvec[prev_p:p] + '0' * l
                                prev_p = p
                            print(ins_info)
                            temp += md_bvec[ins_info[-1][0]:]
                            print(temp)
                            temp = '0' * clip_info[0] + temp + '0' * clip_info[1]
                            print(temp)
                            print(cigar_bvec)
                            assert len(temp) == len(cigar_bvec) # sanity check
                            final_bvec = bitwise_and(cigar_bvec, temp)
                        else:
                            temp = '0' * clip_info[0] + md_bvec + '0' * clip_info[1]
                            assert len(temp) == len(cigar_bvec) # sanity check
                            final_bvec = bitwise_and(cigar_bvec, temp)
                        if crit_dvec & int(final_bvec, 2) == crit_dvec:
                            ainfos[qname].add((tname, compress_bvec(md_bvec)))

[(6, 1)]
111111011111111111111111111111111011111
0111111011111111111111111111111111011111
0111111011111111111111111111111111111111
[(6, 1)]
111111011111111111111111111111111011111
0111111011111111111111111111111111011111
0111111011111111111111111111111111111111
[(30, 1)]
1111011111111111111111111111110111111111
1111011111111111111111111111110111111111
1111111111111111111111111111110111111111
[(30, 1)]
1111011111111111111111111111110111111111
1111011111111111111111111111110111111111
1111111111111111111111111111110111111111
[(30, 1)]
1111011111111111111111111111110111111111
1111011111111111111111111111110111111111
1111111111111111111111111111110111111111
[(12, 2)]
1111111111110011111111111111111111111111
1111111111110011111111111111111111111111
1111111111110011111111111111111111111111
[(12, 2)]
1111111111110011111111111111111111111111
1111111111110011111111111111111111111111
1111111111110011111111111111111111111111
0
23
23
15
0
11
11
3
0
11
11
3
[(20, 1), (22, 3)]
11111111111111111111011