In [13]:
import os
import pysam

In [14]:
path1 = "data/C57BL_6NJ.mgp.v5.snps.dbSNP142.vcf.gz"
path2 = "data/DBA_2J.mgp.v5.snps.dbSNP142.vcf.gz"
path3 = "results/C57BL_6NJ_DBA_2J.mgp.v5.snps.dbSNP142.vcf"

def load_vcf(path):
    snps = dict()
    with pysam.VariantFile(path) as f:
        for record in f:
            snps[(record.chrom, record.pos)] = record
    return snps

snps1 = load_vcf(path1)
snps2 = load_vcf(path2)

In [19]:
ps1 = snps1.keys()
ps2 = snps2.keys()
ps3 = list(sorted(ps1 | ps2))
print(len(ps1), len(ps2), len(ps3), sep="\t")

140111	5872394	5938142


In [85]:
array = []

for ps in ps3:
    snp1 = snps1.get(ps)
    snp2 = snps2.get(ps)
    if snp1 is None:
        if snp2 is None:
            assert False
        else:
            # only DBA
            snp = snp2
            chrom = "chr%s" % snp.chrom
            pos = snp.pos
            ref = snp.ref
            i1, i2 = snp.samples["DBA_2J"]["GT"]
            a1, a2 = snp.alleles[i1], snp.alleles[i2]
            if list(snp.filter)[0] != "PASS":
                continue
            assert a1 == a2
            assert a1 != ref
            array.append([chrom, pos, ref, [ref, a1]])
    else:
        if snp2 is None:
            # only C57
            snp = snp1
            chrom = "chr%s" % snp.chrom
            pos = snp.pos
            ref = snp.ref
            i1, i2 = snp.samples["C57BL_6NJ"]["GT"]
            a1, a2 = snp.alleles[i1], snp.alleles[i2]
            if list(snp.filter)[0] != "PASS":
                continue
            assert a1 == a2
            assert a1 != ref
            array.append([chrom, pos, ref, [a1, ref]])
        else:
            # both
            snp = snp1
            chrom = "chr%s" % snp.chrom
            pos = snp.pos
            ref = snp.ref
            
            snp = snp1
            i1, i2 = snp.samples["C57BL_6NJ"]["GT"]
            a1, a2 = snp.alleles[i1], snp.alleles[i2]
            if list(snp.filter)[0] != "PASS":
                continue
            assert a1 == a2
            assert a1 != ref
            a_c57 = a1
            
            snp = snp2
            i1, i2 = snp.samples["DBA_2J"]["GT"]
            a1, a2 = snp.alleles[i1], snp.alleles[i2]
            if list(snp.filter)[0] != "PASS":
                continue
            assert a1 == a2
            assert a1 != ref
            a_dba = a1
            
            array.append([chrom, pos, ref, [a_c57, a_dba]])
len(array)

5183895

In [87]:
with pysam.VariantFile(path1) as f, open(path3, "w+") as fw:
    fw.write("##fileformat=VCFv4.2\n")
    fw.write("##FORMAT=<ID=GT,Number=1,Type=String,Description=\"Genotype\">\n")
    fw.write("##FORMAT=<ID=PS,Number=1,Type=String,Description=\"Phase set for GT\">\n")
    for chrom in f.header.contigs:
        fw.write("##contig=<ID=chr%s,length=%d>\n" % (chrom, f.header.contigs[chrom].length))
    fw.write("#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tC57_6NJ/DBA_2J\n")
    for chrom, pos, ref, (pat, mat) in array:
        alleles = [ref]
        if pat not in alleles:
            alleles.append(pat)
        if mat not in alleles:
            alleles.append(mat)
        gt = "%s|%s" % (alleles.index(pat), alleles.index(mat))
        line = "\t".join(map(str, [chrom, pos, ".", ref, ",".join(alleles[1:]), ".", "PASS", ".", "GT:PS", gt + ":PATMAT"]))
        fw.write(line + "\n")
assert os.system("bgzip %s" % path3) == 0
assert os.system("tabix -p vcf %s.gz" % path3) == 0