In [None]:
# Reverse complement if needed, /rc previously added from bam
import sys
from Bio import SeqIO
in_fname = sys.argv[1]
out_fname = sys.argv[2]

complement = {'A':'T', 'C':'G', 'G':'C', 'T':'A'}
with open(in_fname) as f, open(out_fname, 'w') as out_f:
    for hap in SeqIO.parse(f, format="fasta"):
        out_seqname = hap.name.split(':')[0].replace('/rc', '').replace('h1', 'hap1').replace('h2', 'hap2')
        out_f.write(f">{out_seqname}\n")
        if '/rc' in hap.name:
            out_f.write(''.join([complement[z] for z in hap.seq[::-1]]) + '\n')
        else:
            out_f.write(str(hap.seq) + '\n')

In [None]:
# Filter out haplotypes that map to multiple contigs/ multiple places on contig
from Bio import SeqIO
from collections import defaultdict
import sys

in_fname = sys.argv[1]
out_fname_no_multiply_mapping = sys.argv[2]

seqs_per_hap = defaultdict(list)
with open(in_fname) as f:
    for hap in SeqIO.parse(f, format="fasta"):
        seqs_per_hap[hap.name.split(':')[0].replace('/rc', '')].append(str(hap.seq))

with open(out_fname_no_multiply_mapping, 'w') as out_f:
    for hap in sorted(seqs_per_hap):
        if len(set(seqs_per_hap[hap])) == 1:
            out_f.write(f'>{hap}\n{seqs_per_hap[hap][0]}\n')
        else:
            pass #print('Skipped ' + hap)


In [None]:
#=======Create k-mer plot with colored blocks for CYP2D6 and CYP2D7===========

In [None]:
!git clone https://github.com/jwertz01/KmerShade.git
!cd KmerShade; snakemake --cores 1

In [None]:
#=======Assign star alleles===========

In [None]:
# Align haplotype sequences to reverse-complemented star allele reference sequences from PharmVar
!wget -O CYP2D6-6.2.1_pharmvar.zip "https://www.pharmvar.org/get-download-file?name=CYP2D6&refSeq=ALL&fileType=zip&version=current"
!minimap2 -x asm20 -a -Y --MD --eqx --sam-hit-only -N 2000 all_contigs_concat_revcomp_no_multiply_mapping.fa CYP2D6-6.2.1_pharmvar/CYP2D6.haplotypes.revcomp.fa > cyp_seqs_aligned_star_alleles.sam

In [None]:
# Convert and index
!samtools view -b cyp_seqs_aligned_star_alleles.sam | samtools sort > cyp_seqs_aligned_star_alleles.bam
!samtools index cyp_seqs_aligned_star_alleles.bam

In [None]:
'''Get star allele site matches via CIGAR string parsing of reference star
allele vs. haplotype alignments'''
import argparse
import sys
import os
from collections import defaultdict, namedtuple
import pysam
from pathlib import Path

VcfRec = namedtuple('VcfRec', ['pos', 'ref', 'alt'])

def main():
    args = parse_args()
    assert args.pharmvar_dir.is_dir()
    sites_per_star_allele = load_star_allele_definitions(
        args.pharmvar_dir, args.seq_start_pos
    )
    matched_sites = defaultdict(lambda: defaultdict(set))

    sam = args.sam
    align_f = pysam.AlignmentFile(sam, 'rb') if (
        sam.suffix == '.bam'
    ) else pysam.AlignmentFile(sam)
    for i, r in enumerate(align_f):
        if i % 10000 == 0:
            print(i)
        process_alignment(
            r, sites_per_star_allele, matched_sites, args.exclude_haps,
            args.indel_buffer
        )

    write_output(matched_sites, args.out)


def parse_args():
    '''Parse command line arguments'''
    p = argparse.ArgumentParser(
        description='Identifies star allele site matches from reference star '
        'allele vs. haplotype alignments'
    )
    p.add_argument(
        'sam', type=Path, help='SAM file with star allele reference sequences '
        '(query) aligned to haplotypes (reference). Please use minimap2 '
        'with --eqx and with a high -N.'
    )
    p.add_argument(
        'pharmvar_dir', type=Path, help='Directory with PharmVar VCF files'
    )
    p.add_argument('out', type=Path, help='Final star allele TSV')
    p.add_argument(
        '--seq_start_pos', type=int, default=42124499, 
        help='Starting position of star allele sequences on the chromosome '
        '(42124499)'
    )
    p.add_argument(
        '--exclude_haps', nargs='+',
        help='List of haplotypes to exclude, e.g. SV haplotypes (none)',
        default=[]
    )
    p.add_argument(
        '--indel_buffer', type=int,
        help='Number of surrounding bases that must match between star allele '
        'and haplotype to count as a site match for indels (20).',
        default=20
    )
    if len(sys.argv) < 4:
        p.print_help()
        sys.exit(0)
    return p.parse_args()


def load_star_allele_definitions(vcf_dir, seq_start_pos):
    '''Load star allele definitions from VCF files.'''
    sites_per_star_allele = defaultdict(list)
    for fname in vcf_dir.glob('*.vcf'):
        star_allele = fname.stem
        offset = 0
        with open(fname) as f:
            for line in f:
                if not line.strip('"').startswith('#'):
                    chrom, pos, _, ref, alt, *_ = line.strip().split('\t')
                    # Position of variant from start of reference star allele
                    pos = int(pos) - seq_start_pos + offset
                    sites_per_star_allele[star_allele].append(
                        VcfRec(pos, ref, alt)
                    )
                    # Account for position shifts due to indels
                    offset += len(alt) - len(ref)
    return sites_per_star_allele


def process_alignment(
    r, sites_per_star_allele, matched_sites, exclude_haps, indel_buffer
):
    '''Process a single alignment record.'''
    star_allele = r.query_name.replace('*', '_')
    hap = r.reference_name
    if hap in exclude_haps:
        return
    star_allele_sites = sites_per_star_allele[star_allele]
    sites_pos = [z.pos for z in star_allele_sites]
    star_to_hap_pos = {}
    start_hap_pos = None
    for star_pos, hap_pos in r.get_aligned_pairs():  # (ref, hap)
        if start_hap_pos is None and hap_pos is not None:
            start_hap_pos = hap_pos
        if star_pos in sites_pos:
            star_to_hap_pos[star_pos] = hap_pos
    # Sequences for aligned segments
    hap_seq = r.get_reference_sequence().upper()
    star_seq = r.query_sequence.upper()
    for site in star_allele_sites:
        hap_pos = star_to_hap_pos[site.pos]
        if hap_pos is None:
            continue
        # Pos from start of alignment rather than hap seq
        hap_pos -= start_hap_pos
        match_found = check_site_match(
            site, star_seq, hap_pos, hap_seq, indel_buffer, hap, star_allele
        )
        if match_found:
            matched_sites[hap][star_allele].add(site)


def check_site_match(
    site, star_seq, hap_pos, hap_seq, buffer, hap, star_allele
):
    '''Check if a site matches between star allele and haplotype.'''
    hap_alt = hap_seq[hap_pos:hap_pos + len(site.alt)]
    if len(site.ref) == len(site.alt):
        return hap_alt == site.alt
    else:  # Handle indels
        surrounding_hap = hap_seq[
            max(0, hap_pos - buffer):hap_pos + len(site.alt) + buffer
        ]
        surrounding_star = star_seq[
            max(0, site.pos - buffer):site.pos + len(site.alt) + buffer
        ]
        return surrounding_hap == surrounding_star


def write_output(matched_sites, out_file):
    '''Write matched sites to the output file.'''
    with open(out_file, 'w') as out_f:
        out_f.write('Sample_hap\tStar_allele\tMatched_sites\n')
        for sample_hap, star_alleles in sorted(matched_sites.items()):
            for star_allele, sites in sorted(star_alleles.items()):
                sites_str = ','.join(
                    f"{s.pos}:{s.ref}>{s.alt}" for s in sorted(sites)
                )
                out_f.write(
                    f"{sample_hap}\t{star_allele.replace('CYP2D6_', '*')}\t"
                    f"{sites_str}\n"
                )

if __name__ == '__main__':
    main()



In [None]:
'''Determine the best star allele based on site and genomic matches.'''
import argparse
import os
import sys
import csv
import re
import math
from collections import defaultdict, namedtuple
from pathlib import Path
from pysam import *

VcfRec = namedtuple('VcfRec', ['pos', 'ref', 'alt'])
StarMatch = namedtuple('StarMatch', ['star_allele', 'match', 'total'])

def main():
    args = parse_args()
    assert args.pharmvar_dir.is_dir()
    best_genomic_matches = find_best_genomic_matches(
        args.sam, args.exclude_haps, args.ignore_alleles
    )
    sites_per_star_allele = load_star_allele_definitions(
        args.pharmvar_dir, args.seq_start_pos
    )
    all_matched_sites = read_in_all_matched_sites(
        args.matched_sites, args.exclude_haps, args.ignore_alleles
    )

    with open(args.out, 'w') as out_f:
        w = csv.DictWriter(out_f, delimiter='\t', fieldnames=[
            'Hap', 'Final star allele', 'Best genomic match', 
            'Best genomic match %', 'Best site match', 'Best site match %', 
            'Second-best site match (DIFFERENT major star allele)',
            'Second-best site match %',
        ])
        w.writeheader()
        for hap in all_matched_sites:
            write_alleles(
                w, hap, all_matched_sites[hap], sites_per_star_allele,
                [z[0] for z in best_genomic_matches[hap]]
            )


def parse_args():
    '''Parse command line arguments'''
    p = argparse.ArgumentParser(
        description='Identifies the best star allele per haplotype based on '
        'genomic and site matching'
    )
    p.add_argument(
        'sam', type=Path, help='SAM file with star allele reference sequences '
        '(query) aligned to haplotypes (reference). Please use minimap2 '
        'with --eqx and with a high -N.'
    )
    p.add_argument(
        'matched_sites', type=Path, help='File with matched sites, with header '
        'Sample_hap\tStar_allele\tMatched_sites, where matched sites '
        'is a comma-separated list of pos:ref>alt'
    )
    p.add_argument(
        'pharmvar_dir', type=Path, help='Directory with PharmVar VCF files'
    )
    p.add_argument('out', type=Path, help='Final star allele TSV')
    p.add_argument(
        '--seq_start_pos', type=int, default=42124499, 
        help='Starting position of star allele sequences on the chromosome '
        '(42124499)'
    )
    # exact same sequence as *n.001 or *n.002
    star_alleles_w_dups = [
        '*1', '*106', '*107', '*108', '*110', '*112', '*113', '*114',
        '*115', '*116', '*120', '*122', '*124', '*137', '*140', '*167',
        '*169', '*170', '*173', '*174', '*18', '*19', '*2', '*23', '*24',
        '*25', '*26', '*27', '*3', '*33', '*34', '*38', '*39', '*4', '*43',
        '*44', '*48', '*50', '*62', '*7', '*71', '*75', '*81', '*86', '*89',
        '*9', '*91', '*92', '*93', '*96', '*97'
    ]
    p.add_argument(
        '--ignore_alleles', nargs='+',
        help=f"List of star alleles to ignore, e.g. duplicates "
        f"({', '.join(star_alleles_w_dups)})",
        default=star_alleles_w_dups
    )
    p.add_argument(
        '--exclude_haps', nargs='+',
        help='List of haplotypes to exclude, e.g. SV haplotypes (none)',
        default=[]
    )
    if len(sys.argv) < 5:
        p.print_help()
        sys.exit(0)
    return p.parse_args()


def find_best_genomic_matches(sam, exclude_haps=[], ignore_alleles=[]):
    '''Find best-matching star allele(s) for each haplotype by scoring CIGAR 
    string over the genomic locus
    '''
    best_genomic_matches = defaultdict(list)
    align_f = AlignmentFile(sam, 'rb') if (
        sam.suffix == '.bam'
    ) else AlignmentFile(sam)
    for r in align_f:
        hap = r.reference_name
        star_allele = r.query_name.replace('CYP2D6', '')
        if hap in exclude_haps or star_allele in ignore_alleles:
            continue
        # genomic match score = num bases match - (mismatch + del + indel)
        score = 0
        for op, op_len in r.cigartuples:
            if op == CEQUAL:
                score += op_len
            elif op in [CDIFF, CINS, CDEL]:
                score -= op_len
        # Reset best matches if star allele is higher-scoring
        if (
            (hap not in best_genomic_matches) or
            (score > best_genomic_matches[hap][0][1])
        ):
            best_genomic_matches[hap] = [(star_allele, score)]
        # Add to best matches if same score
        elif score == best_genomic_matches[hap][0][1] and (
            (star_allele, score) not in best_genomic_matches[hap]
        ):
            best_genomic_matches[hap].append((star_allele, score))
    return best_genomic_matches


def load_star_allele_definitions(vcf_dir, seq_start_pos):
    '''Load star allele definitions from VCF files'''
    sites_per_star_allele = defaultdict(list)
    for fname in vcf_dir.glob('*.vcf'):
        star_allele = fname.stem.replace('CYP2D6_', '*')
        offset = 0
        with open(fname) as f:
            for line in f:
                if not line.strip('"').startswith('#'):
                    chrom, pos, _, ref, alt, *_ = line.strip().split('\t')
                    # Position of variant from start of reference star allele
                    pos = int(pos) - seq_start_pos + offset
                    sites_per_star_allele[star_allele].append(
                        VcfRec(pos, ref, alt)
                    )
                    # Account for position shifts due to indels
                    offset += len(alt) - len(ref)
    return sites_per_star_allele


def read_in_all_matched_sites(
    matched_sites_fname, exclude_haps=[], ignore_alleles=[]
):
    '''Read in matching sites between each star allele and hap'''
    all_matched_sites = defaultdict(lambda: defaultdict(set))
    with open(matched_sites_fname) as f:
        r = csv.DictReader(f, delimiter='\t')
        for line in r:
            hap = line['Sample_hap']
            star_allele = line['Star_allele']
            if hap in exclude_haps or star_allele in ignore_alleles:
                continue
            for site_str in line['Matched_sites'].split(','):
                all_matched_sites[hap][star_allele].add(
                    VcfRec(int(z[0]), z[1], z[2]) for z in
                    re.split(r'[:>]', site_str)
                )
    return all_matched_sites


def write_alleles(
    w, hap, all_matched_sites_hap, sites_per_star_allele, best_genomic_star
):
    '''Write output TSV line combining best genomic match and site match star
    alleles for haplotype.'''
    site_matches = []
    for star_allele in all_matched_sites_hap:
        matched_sites = len(all_matched_sites_hap[star_allele])
        total_sites = len(sites_per_star_allele[star_allele])
        assert matched_sites <= total_sites
        site_matches.append(
            StarMatch(star_allele, matched_sites, total_sites)
        )
    (
        best_site_percentage, best_site_matches,
        second_best_site_percentage, second_best_site_matches
    ) = get_best_site_matches(site_matches)

    final_star_allele = get_final_star_allele(
        best_genomic_star,
        [z.star_allele for z in best_site_matches],
        best_site_percentage
    )
    out_d = {
        'Hap':hap,
        'Final star allele': final_star_allele,
        'Best genomic match': ','.join(best_genomic_star),
        'Best genomic match %': match_frac_str([
            z for z in site_matches if z.star_allele in best_genomic_star
        ]),
        'Best site match': ','.join([
            z.star_allele for z in best_site_matches
        ]),
        'Best site match %': match_frac_str(best_site_matches),
        'Second-best site match (DIFFERENT major star allele)': ','.join([
            z.star_allele for z in second_best_site_matches
        ]),
        'Second-best site match %': match_frac_str(
            second_best_site_matches
        )
    }
    w.writerow(out_d)


def get_final_star_allele(
    best_genomic_stars, best_site_stars, best_site_percentage
):
    final_star_allele = 'Ambiguous'
    if math.isclose(best_site_percentage, 1.0):
        final_star_alleles = set(best_site_stars).intersection(
            best_genomic_stars
        )
        if len(final_star_alleles) == 1:
            final_star_allele = list(final_star_alleles)[0]
    # Has no sites- assign if best genomic match and no adequate site match
    elif '*1.001' in best_genomic_stars:
        final_star_allele = '*1.001'
    return final_star_allele


def get_best_site_matches(sites):
    '''Get star alleles with highest and second-highest % matching and total
    sites for hap, where second-highest is from different major star allele'''
    # decreasing % then # of sites matched
    sites = sorted(
        sites, key=lambda x:(float(x.match) / x.total, x.total), reverse=True
    )
    best_percentage = match_frac(sites[0])
    best_num_sites = sites[0].total
    second_best_percentage = None
    second_best_num_sites = None
    best_site_matches = []
    second_best_site_matches = []  # from different major star allele only
    for site in sites:
        if (
            math.isclose(match_frac(site), best_percentage) and
            site.total == best_num_sites
        ):  # float equals
            best_site_matches.append(site)
        elif (
            site.star_allele.split('.')[0] in
            [z.star_allele.split('.')[0] for z in best_site_matches]
        ):
            continue
        elif second_best_percentage is None:
            second_best_percentage = match_frac(site)
            second_best_site_matches.append(site)
            second_best_num_sites = site.total
        elif (
            math.isclose(match_frac(site), second_best_percentage) and
            site.total == second_best_num_sites
        ):
            second_best_site_matches.append(site)
        else:
            break
    return (
        best_percentage, best_site_matches, second_best_percentage,
        second_best_site_matches
    )


def match_frac(site):
    return float(site.match) / site.total


def match_frac_str(site_list):
    return ','.join([f'{site.match}/{site.total}' for site in site_list])


if __name__ == '__main__':
    main()


In [None]:
# Get diplotypes
import sys
from collections import defaultdict

star_alleles_tsv = sys.argv[1]

best_star_allele_per_hap = {}
with open(star_alleles_tsv) as f:
    f.readline()
    for line in f:
        line = line.strip().split('\t')
        hap = line[0]
        best_star_allele = line[-1]
        if best_star_allele != 'Ambiguous':
            best_star_allele_per_hap[hap] = best_star_allele

diplotypes = {}
for hap in best_star_allele_per_hap:
    sample = hap.split('_')[0]
    if sample not in diplotypes:
        diplotypes[sample] = [None, None]
    star_allele_main = best_star_allele_per_hap[hap].split('.')[0]
    if hap.endswith('_hap1'):
        diplotypes[sample][0] = star_allele_main
    else:
        diplotypes[sample][1] = star_allele_main

samples_per_diplotype = defaultdict(set)
for sample in diplotypes:
    hap1, hap2 = diplotypes[sample]
    hap1_int = int(hap1.replace('*', '')) if (hap1) else None
    hap2_int = int(hap2.replace('*', '')) if (hap2) else None
    dip_strs = [
        f"{hap1 if (hap1) else '?'}",
        f"{hap2 if (hap2) else '?'}"
    ]
    # sort
    dip_str = '/'.join(dip_strs[::-1]) if (hap2_int is not None and (hap1_int is None or hap2_int < hap1_int)) else '/'.join(dip_strs)
    samples_per_diplotype[dip_str].add(sample)
for x in sorted(samples_per_diplotype.items(), key=lambda z:len(z[1]), reverse=True):
    print(f"{x[0]}\t{len(x[1])}")