In [159]:
import pysam
import difflib
from typing import NamedTuple

In [160]:
REF_FASTA = '../insilico-builder/test_data/ref_38/hg38.fa'

In [161]:
fasta = pysam.FastaFile(REF_FASTA)

In [162]:
# ref = 'TCAGTTACAGGGTGGGT'
# alt = 'TAAAC'
# pos = 16459345
# chrom = '10'
# ref = 'TCCCTATGCAGAGTAAGGGGAGATGGAAGAAAC'
# alt = 'TTCTGATGGGGGAGATGGAAGAACA'
# pos = 133662439
# chrom = '10'
ref = 'GAGCG'
alt = 'GTT'
pos = 116367644
chrom = '10'

padding = 5

before = fasta.fetch(chrom, pos-padding-1, pos-1).lower()
middle = fasta.fetch(chrom, pos-1, pos+len(ref)-1).upper()
if middle != ref:
    raise Exception('Reference mismatch')
after = fasta.fetch(chrom, pos+len(ref)-1, pos+len(ref)+padding-1).lower()

old = before + middle + after
new = before + alt + after
print(old)
print(new)

diff = difflib.ndiff(old.upper(), new.upper())
print('|'.join(diff))
diff = difflib.ndiff(old.upper(), new.upper())


class Variant(NamedTuple):
    chrom: str
    start_pos: int
    seq: str
    length: int


variant_pos = pos - padding - 1
building_seq = ''
pending_variants = []
seq_type = ' '
for line in diff:
    start_char, _, char = line
    if start_char == ' ':
        if len(building_seq) > 0:
            var_len = len(building_seq) if seq_type == '+' else -len(building_seq)
            pending_variants.append(Variant(chrom, variant_pos, building_seq, var_len))
            if seq_type == '-':
                variant_pos += len(building_seq)
            building_seq = ''
            seq_type = ' '
        variant_pos += 1
    elif start_char == '-':
        if seq_type == '+':
            pending_variants.append(Variant(chrom, variant_pos, building_seq, len(building_seq)))
            building_seq = ''
        building_seq += char
        seq_type = start_char
    elif start_char == '+':
        if seq_type == '-':
            pending_variants.append(Variant(chrom, variant_pos, building_seq, -len(building_seq)))
            building_seq = ''
            variant_pos += len(building_seq)
        building_seq += char
        seq_type = start_char
    else:
        raise Exception(f'Unknown diff line: {line}')

if len(building_seq) > 0:
    var_len = len(building_seq) if seq_type == '+' else -len(building_seq)
    pending_variants.append(Variant(chrom, variant_pos, building_seq, var_len))

print(pending_variants)


tgacaGAGCGagact
tgacaGTTagact
  T|  G|  A|  C|  A|  G|+ T|+ T|- A|- G|- C|- G|  A|  G|  A|  C|  T
[Variant(chrom='10', start_pos=116367644, seq='TT', length=2), Variant(chrom='10', start_pos=116367644, seq='AGCG', length=-4)]


In [163]:


def handle_pending_snv(start_pos, ref_seq, alt_seq):
    print('SNV')
    for pos in range(len(ref_seq)):
        ref_base = ref_seq[pos]
        alt_base = alt_seq[pos]
        if ref_base != alt_base:
            print(f'{chrom}\t{start_pos}\t{ref_base}\t{alt_base}')
        start_pos += 1

def handle_standard_variant(variant):
    chrom = variant.chrom
    pos = variant.start_pos
    if variant.length > 0:
        ref = fasta.fetch(chrom, pos-1, pos).upper()
        alt = ref + variant.seq
    else:
        ref = fasta.fetch(chrom, pos-1, pos-variant.length).upper()
        alt = ref[0]
    print(f'{chrom}\t{pos}\t{ref}\t{alt}')


def handle_compound_indel(variant_1, variant_2):
    var_len = max(abs(variant_1.length), abs(variant_2.length))
    ref_seq = fasta.fetch(variant_1.chrom, variant_1.start_pos-1, variant_1.start_pos+var_len-1).upper()
    for pos in range(var_len):
        ref_base = ref_seq[pos]
        alt_base = variant_1.seq[pos] if variant_1.length > 0 else variant_1.seq[-pos-1]
        if ref_base != alt_base:
            print(f'{variant_1.chrom}\t{variant_1.start_pos+pos}\t{ref_base}\t{alt_base}')


def handle_pending_variants(pending_variants):
    i = 0
    while i < len(pending_variants):
        if i == len(pending_variants) - 1:
            handle_standard_variant(pending_variants[i])
            i += 1
            break
        # Check for compound indel
        if pending_variants[i].start_pos == pending_variants[i+1].start_pos:
            handle_compound_indel(pending_variants[i], pending_variants[i+1])
            i += 2
        else:
            handle_standard_variant(pending_variants[i])
            i += 1

        


print(pending_variants)

# TODO: Take into account chromosome

# Search for SNVs
i = 0
req_offset = 0
alt_offset = 0
str_offset = pos - padding
print('old len =', len(old))
print('new len =', len(new))
print(f'str offset {str_offset}')
print(f'ref end {str_offset+len(old)}')
non_matched = []
while i < len(pending_variants):
    matched = False
    curr_variant = pending_variants[i]
    size_diff = curr_variant.length
    for j in range(i+1, len(pending_variants)):
        next_variant = pending_variants[j]
        size_diff += next_variant.length
        if size_diff == 0:
            print(f'SNVS {pending_variants[i:j+1]}')
            var_start_pos = curr_variant.start_pos + 1
            var_end_pos = next_variant.start_pos+1
            var_end_pos += -next_variant.length if next_variant.length < 0 else 0
            ref_str_start = var_start_pos-str_offset
            ref_str_end = var_end_pos-str_offset
            alt_str_start = alt_offset+var_start_pos-str_offset
            alt_str_end = alt_offset+var_end_pos-str_offset
            ref_seq = old[ref_str_start:ref_str_end]
            alt_seq = new[alt_str_start:alt_str_end]
            print(ref_str_start, ref_str_end, ref_seq, var_start_pos, var_end_pos)
            print(alt_str_start, alt_str_end, alt_seq, var_start_pos, var_end_pos)
            handle_pending_snv(var_start_pos, ref_seq, alt_seq)
            i = j + 1
            matched = True
            break
    if not matched:
        non_matched.append(curr_variant)
        alt_offset += curr_variant.length
        i += 1

handle_pending_variants(non_matched)


[Variant(chrom='10', start_pos=116367644, seq='TT', length=2), Variant(chrom='10', start_pos=116367644, seq='AGCG', length=-4)]
old len = 15
new len = 13
str offset 116367639
ref end 116367654
REST
10	116367644	G	GTT
10	116367644	GAGCG	G


In [164]:


def complete_diff(chrom, pos, ref, alt, previous_seq):
    if len(ref) > 1 and len(alt) > 1:
        min_len = min(len(ref), len(alt))
        for i in range(min_len):
            if ref[i] != alt[i]:
                print(f'{chrom}\t{pos+i}\t{ref[i]}\t{alt[i]}')
        if len(ref) == len(alt):
            return
        previous_seq = ref[:min_len] + previous_seq
        ref = ref[min_len-1:]
        alt = ref[0] + alt[min_len:]
        pos = pos + i
    if len(alt) > 1:
        for i, previous_char in enumerate(previous_seq[::-1]):
            if previous_char != alt[1]:
                break
        ref = pending_seq[-1-i]
        alt = ref + alt[1:]
        pos = pos - i
    if len(ref) > 1:
        for i, previous_char in enumerate(previous_seq[::-1]):
            if previous_char + ref[:-1] != ref:
                break
        alt = pending_seq[-1-i]
        ref = alt + ref[1:]
        pos = pos - i
    print(f'{chrom}\t{pos}\t{ref}\t{alt}')


variant_pos = pos - padding - 1
ref_seq = ''
alt_seq = ''
pending_seq = ''
for line in diff:
    start_char, _, char = line
    if start_char == ' ':
        if len(ref_seq) > 1 or len(alt_seq) > 1:
            # INS or DEL
            complete_diff(chrom, variant_pos, ref_seq, alt_seq, pending_seq)
            variant_pos += len(ref_seq) - 1
        ref_seq = char
        alt_seq = char
        pending_seq += char
        variant_pos += 1
    elif start_char == '-':
        ref_seq += char
    elif start_char == '+':
        alt_seq += char
    else:
        raise Exception('Unknown diff line' + line)

if len(ref_seq) > 1 or len(alt_seq) > 1:
    complete_diff(chrom, variant_pos, ref_seq, alt_seq, pending_seq)