Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Refactor and generalize plink to VCF conversion with support for more…

… non-reference cases
  • Loading branch information...
commit 904b30aef9cf2cb0656e8ad83b988cc676f1492a 1 parent 3465522
@chapmanb authored
Showing with 92 additions and 34 deletions.
  1. +92 −34 nextgen/scripts/plink_to_vcf.py
View
126 nextgen/scripts/plink_to_vcf.py
@@ -23,24 +23,33 @@
from bx.seq import twobit
def main(ped_file, map_file, ref_file):
- pbed_prefix = convert_to_plink_bed(ped_file, map_file)
- vcf_file = convert_bed_to_vcf(pbed_prefix, ped_file)
+ base_dir = os.getcwd()
+ pbed_prefix = convert_to_plink_bed(ped_file, map_file, base_dir)
+ vcf_file = convert_bed_to_vcf(pbed_prefix, ped_file, base_dir)
fix_nonref_positions(vcf_file, ref_file)
-def convert_to_plink_bed(ped_file, map_file):
- plink_cl = "p-link" # from ubuntu package, 'plink' otherwise
+def convert_to_plink_bed(ped_file, map_file, base_dir):
+ # from ubuntu package, 'plink' otherwise
+ for plink_cl in ["p-link", "plink"]:
+ try:
+ subprocess.check_call([plink_cl, "--help"])
+ break
+ except:
+ pass
plink_prefix = os.path.splitext(os.path.basename(ped_file))[0]
- work_dir = os.path.join(os.path.dirname(ped_file), "vcfconvert")
+ work_dir = os.path.join(base_dir, "vcfconvert")
if not os.path.exists(work_dir):
os.makedirs(work_dir)
out_base = os.path.join(work_dir, plink_prefix)
- if not os.path.exists("{}.bed".format(out_base)):
- subprocess.check_call([plink_cl, "--ped", ped_file, "--map", map_file,
+ if not os.path.exists("{0}.bed".format(out_base)):
+ subprocess.check_call([plink_cl, "--noweb",
+ "--ped", ped_file, "--map", map_file,
"--make-bed", "--out", out_base])
return out_base
-def convert_bed_to_vcf(pbed_prefix, ped_file):
- out_file = "{}.vcf".format(os.path.splitext(ped_file)[0])
+def convert_bed_to_vcf(pbed_prefix, ped_file, base_dir):
+ out_file = os.path.join(base_dir,
+ "{0}.vcf".format(os.path.splitext(os.path.basename(ped_file))[0]))
if not os.path.exists(out_file):
subprocess.check_call(["pseq", pbed_prefix, "new-project"])
subprocess.check_call(["pseq", pbed_prefix, "load-plink",
@@ -50,16 +59,69 @@ def convert_bed_to_vcf(pbed_prefix, ped_file):
stdout=out_handle)
return out_file
+def fix_line_problems(parts):
+ """Fix problem alleles and reference/variant bases in VCF line.
+ """
+ varinfo = parts[:9]
+ genotypes = []
+ # replace haploid calls
+ for x in parts[9:]:
+ if len(x) == 1:
+ x = "./."
+ genotypes.append(x)
+ if varinfo[3] == "0": varinfo[3] = "N"
+ if varinfo[4] == "0": varinfo[4] = "N"
+ return varinfo, genotypes
+
+def fix_vcf_line(parts, ref_base):
+ """Orient VCF allele calls with respect to reference base.
+
+ Handles cases with ref and variant swaps. strand complements.
+ """
+ swap = {"1/1": "0/0", "0/1": "0/1", "0/0": "1/1", "./.": "./."}
+ complements = {"G": "C", "A": "T", "C": "G", "T": "A", "N": "N"}
+ varinfo, genotypes = fix_line_problems(parts)
+ ref, var = varinfo[3:5]
+ # non-reference regions or non-informative, can't do anything
+ if ref_base in [None, "N"] or set(genotypes) == set(["./."]):
+ varinfo = None
+ # matching reference, all good
+ elif ref_base == ref:
+ assert ref_base == ref, (ref_base, parts)
+ # swapped reference and alternate regions
+ elif ref_base == var or ref in ["N", "0"]:
+ varinfo[3] = var
+ varinfo[4] = ref
+ genotypes = [swap[x] for x in genotypes]
+ # reference is on alternate strand
+ elif ref_base != ref and complements[ref] == ref_base:
+ varinfo[3] = complements[ref]
+ varinfo[4] = complements[var]
+ # unspecified alternative base
+ elif ref_base != ref and var in ["N", "0"]:
+ varinfo[3] = ref_base
+ varinfo[4] = ref
+ genotypes = [swap[x] for x in genotypes]
+ # swapped and on alternate strand
+ elif ref_base != ref and complements[var] == ref_base:
+ varinfo[3] = complements[var]
+ varinfo[4] = complements[ref]
+ genotypes = [swap[x] for x in genotypes]
+ else:
+ print "Did not associate ref {0} with line: {1}".format(
+ ref_base, varinfo)
+ if varinfo is not None:
+ return varinfo + genotypes
+
def fix_nonref_positions(in_file, ref_file):
"""Fix Genotyping VCF positions where the bases are all variants.
The plink/pseq output does not handle these correctly, and
has all reference/variant bases reversed.
"""
- swap = {"1/1": "0/0", "0/1": "0/1", "0/0": "1/1", "./.": "./."}
- complements = {"G": "C", "A": "T", "C": "G", "T": "A"}
+ ignore_chrs = ["."]
ref2bit = twobit.TwoBitFile(open(ref_file))
- out_file = apply("{}-fix{}".format, os.path.splitext(in_file))
+ out_file = apply("{0}-fix{1}".format, os.path.splitext(in_file))
with open(in_file) as in_handle:
with open(out_file, "w") as out_handle:
@@ -67,29 +129,25 @@ def fix_nonref_positions(in_file, ref_file):
if line.startswith("#"):
out_handle.write(line)
else:
- parts = line.rstrip("\t\n").split("\t")
+ parts = line.rstrip("\r\n").split("\t")
pos = int(parts[1])
- ref_base = ref2bit[parts[0]].get(pos-1, pos).upper()
- ref, var = parts[3:5]
- if ref_base == var or ref == "N":
- base = parts[:9]
- base[3] = var
- base[4] = ref
- swap_genotypes = [swap[x] for x in parts[9:]]
- parts = base + swap_genotypes
- #print "swapped", pos
- elif ref_base == ref:
- #print "same", pos
- assert ref_base == ref, (ref_base, parts)
- elif ref_base != ref and var in ["N", "0"]:
- parts[3] = ref_base
- elif ref_base != ref and complements[ref] == ref_base:
- parts[3] = complements[ref]
- parts[4] = complements[var]
- else:
- raise ValueError("Cannot associate ref {} with line: {}".format(
- ref_base, parts))
- out_handle.write("\t".join(parts) + "\n")
+ # handle chr/non-chr naming
+ if parts[0] not in ref2bit.keys():
+ parts[0] = parts[0].replace("chr", "")
+ ref_base = None
+ if parts[0] not in ignore_chrs:
+ try:
+ ref_base = ref2bit[parts[0]].get(pos-1, pos).upper()
+ except Exception, msg:
+ # off the end of the chromosome
+ if str(msg).startswith("end before start"):
+ print msg
+ else:
+ print parts
+ raise
+ parts = fix_vcf_line(parts, ref_base)
+ if parts is not None:
+ out_handle.write("\t".join(parts) + "\n")
return out_file
if __name__ == "__main__":
Please sign in to comment.
Something went wrong with that request. Please try again.