In [123]:
import joblib
import cyvcf2
import gzip
import pandas as pd
import re
from collections import defaultdict

## gff logic

In [115]:
def init_gff(file):
    field_names = ['seqid', 'source', 'type', 'start', 'end', 'score', 'strand', 'phase', 'attributes']
    gff = pd.read_csv(file, sep='\t', comment='#', names=field_names)
    gff['ID'] = gff['attributes'].str.extract(r'ID=transcript:([^;]+)')
    gff = gff[gff['ID'].notna()]
    
    return gff

def get_transcript_range(gff, transcript_id):
    transcript = gff[gff['ID'] == transcript_id]

    if transcript.shape[0] == 0:
        return None, None, None
    
    if transcript.shape[0] > 1:
        raise Exception('Unsupported more than 1 transcript')

    return transcript['seqid'].values[0], transcript['start'].values[0], transcript['end'].values[0]

gff = init_gff('data/Homo_sapiens.GRCh38.110.chromosome.21.gff3.gz')

## get transcript references

In [116]:
import gzip

def init_transcript_reference(file_path):
    transcript_reference = {}
    with gzip.open(file_path, 'rt') as file:  # 'rt' mode to read as text
        name = None
        seq_list = []
        for line in file:
            line = line.strip()  # Remove trailing newline characters
            if line.startswith(">"):
                if name:  # If not the first sequence, save the previous sequence
                    transcript_reference[name] = ''.join(seq_list)
                    seq_list = []
                name = line[1:].split()[0]  # Get the name, discard the ">"
            else:
                seq_list.append(line)
        if name:  # Save the last sequence
            transcript_reference[name] = ''.join(seq_list)
    return transcript_reference

# Usage
file_path = 'data/reference_sequences.fasta.gz'
transcript_reference = init_transcript_reference(file_path)

## iterate through transcripts

In [135]:
gff_transcripts = gff['ID'].unique()
vcf = cyvcf2.VCF('data/example.vcf.gz')

mutation_dict = {
    "*frameshift": "R",
    "*frameshift&stop_retained": "Q",
    "*inframe_deletion": "C",
    "*inframe_insertion": "J",
    "*missense": "N",
    "*missense&inframe_altering": "K",
    "*stop_gained": "X",
    "*stop_gained&inframe_altering": "A",
    "frameshift": "F",
    "frameshift&start_lost": "V",
    "frameshift&stop_retained": "B",
    "inframe_deletion": "D",
    "inframe_deletion&stop_retained": "P",
    "inframe_insertion": "I",
    "inframe_insertion&stop_retained": "Z",
    "missense": "M",
    "missense&inframe_altering": "Y",
    "phi": "E",
    "start_lost": "0",
    "start_lost&splice_region": "U",
    "stop_gained": "G",
    "stop_gained&inframe_altering": "T",
    "stop_lost": "L",
    "stop_lost&frameshift": "W"
}

# take a bcftools csq string and return the transcript and instruction
# an instruction is defined as:
# (mutation_code, ref_pos, alt_pos, alt_seq, ref_seq_len)

def process_csq(csq):
    csq = csq.split('|')
    
    mutation_type = csq[0]
    if mutation_type not in mutation_dict:
        return None, None

    code = mutation_dict[mutation_type]

    transcript = csq[2]

    match = re.search(r"(\d+)([A-Za-z]+)>(\d+)([A-Za-z]+)", csq[5])

    if match:
        ref_pos, ref_seq, alt_pos, alt_seq = match.groups()

    else:
        return None, None
    
    return transcript, (code, ref_pos, alt_pos, alt_seq, len(ref_seq))
import re

# Initialize a counter for unique instruction IDs
instruction_id_counter = 0
instruction_to_id = {}

def get_instruction_id(instruction):
    global instruction_id_counter
    # Convert instruction tuple to a string to use as a dictionary key
    instruction_key = str(instruction)
    if instruction_key not in instruction_to_id:
        # Assign a new ID to the instruction if it's not already present
        instruction_to_id[instruction_key] = instruction_id_counter
        instruction_id_counter += 1
    return instruction_to_id[instruction_key]

# Process the instructions for each transcript
for transcript in gff_transcripts:
    instructions = defaultdict(set)

    chr, start, end = get_transcript_range(gff, transcript_id=transcript)
    for record in vcf(f'{chr}:{start}-{end}'):
        bcsq = record.INFO.get('BCSQ')
        if bcsq is None:
            continue

        for csq in bcsq.split(','):
            csq_transcript, instruction = process_csq(csq)
            if csq_transcript != transcript or not instruction:
                continue

            instruction_id = get_instruction_id(instruction)
            for individual_call, individual in zip(record.genotypes, vcf.samples):
                for haplotype in [0, 1]:
                    if individual_call[haplotype] == 1:
                        instructions[f'{individual}_{haplotype}'].add(instruction_id)

    # process a transcript's instructions here:
    if len(instructions) != 0:
        print(instructions)
        

defaultdict(<class 'set'>, {'HG02024_1': {0, 2, 3, 4, 5}, 'HG02025_1': {0, 2, 3, 4, 5, 6, 7}, 'HG02026_0': {0, 1, 2, 3, 4, 5, 6, 7}, 'HG02024_0': {2, 3, 4, 5, 6, 7}, 'HG02025_0': {2, 3, 4, 5, 6, 7}, 'HG02026_1': {2, 3, 4, 5}})
defaultdict(<class 'set'>, {'HG02024_1': {8, 10, 11, 12, 13}, 'HG02025_1': {8, 10, 11, 12, 13, 14, 15}, 'HG02026_0': {8, 9, 10, 11, 12, 13, 14, 15}, 'HG02024_0': {10, 11, 12, 13, 14, 15}, 'HG02025_0': {10, 11, 12, 13, 14, 15}, 'HG02026_1': {10, 11, 12, 13}})
defaultdict(<class 'set'>, {'HG02024_1': {16, 18, 19, 20, 21}, 'HG02025_1': {16, 18, 19, 20, 21, 22, 23}, 'HG02026_0': {16, 17, 18, 19, 20, 21, 22, 23}, 'HG02024_0': {18, 19, 20, 21, 22, 23}, 'HG02025_0': {18, 19, 20, 21, 22, 23}, 'HG02026_1': {18, 19, 20, 21}})
defaultdict(<class 'set'>, {'HG02024_1': {24, 26, 27, 28, 29}, 'HG02025_1': {24, 26, 27, 28, 29, 30, 31}, 'HG02026_0': {24, 25, 26, 27, 28, 29, 30, 31}, 'HG02024_0': {26, 27, 28, 29, 30, 31}, 'HG02025_0': {26, 27, 28, 29, 30, 31}, 'HG02026_1': {26, 27