In [76]:
import msprime
import os
import subprocess
import tskit

In [95]:
# Constants

VFC_DIR = "generatedVcf"
PLINK_DIR = "generatedPlink"
PLINK1 = "plink"
PLINK_CMD = "plink2"

if not os.path.exists(VFC_DIR):
  os.mkdir(VFC_DIR)

if not os.path.exists(PLINK_DIR):
  os.mkdir(PLINK_DIR)

In [78]:
def generate(n_samples: int, length: int, recombination_rate: float = 1e-8, mutation_rate: float = 1e-8, population_size: float = 10**4, random_seed: int = 42) -> tskit.TreeSequence:
  ts = msprime.sim_ancestry(
      samples=[msprime.SampleSet(n_samples)],
      population_size=population_size,
      recombination_rate=recombination_rate,
      sequence_length=length,
      random_seed=random_seed,
  )
  mts = msprime.sim_mutations(ts, rate=mutation_rate, random_seed=random_seed)
  return mts  

In [79]:
def output_generated(n_samples: int, length: int, n_samples_str: str | None = None, length_str: str | None = None, dir: str = VFC_DIR) -> str:
  mts = generate(n_samples, length)
  print(f"Simulation done (Samples: {n_samples}, length: {length}): {mts.num_trees} trees and {mts.num_sites} sites")

  if n_samples_str is None:
    n_samples_str = str(n_samples)
  if length_str is None:
    length_str = str(length)

  file_base = f"sim_len-{length_str}_samples-{n_samples_str}"
  out_vcf = f"{dir}/{file_base}.vcf"

  with open(out_vcf, "w") as vcf_file:
      mts.write_vcf(vcf_file)
  print("VCF output done")
  return file_base, out_vcf

In [80]:
def sampleSnps(file_base: str, snp_lim: int, dir: str = PLINK_DIR, snp_lim_str: str | None = None):
  new_file_base = f"{file_base}_snplim-{snp_lim_str if snp_lim_str else snp_lim}"
  out_dir = f"{dir}/{new_file_base}"
  meta_out = f"{out_dir}/meta"

  if not os.path.exists(out_dir):
    os.mkdir(out_dir)
    if not os.path.exists(meta_out):
      os.mkdir(meta_out)

  input_files = f"{dir}/{file_base}/data"
  output_files = f"{out_dir}/data"

  recode_file = f"{meta_out}/recode"
  cut_file = f"{meta_out}/cut.map"
  shuf_file = f"{meta_out}/shuf.map"

  recode_args = [PLINK1, "--bfile", input_files, "--recode", "--out", recode_file]
  recode_proc = subprocess.run(recode_args, capture_output=True)

  cut_args = ["cut", "-f", "2", f"{recode_file}.map"]
  cut_proc = subprocess.run(cut_args, capture_output=True)
  with open(cut_file, "w+") as cut_file_out:
    cut_file_out.write(cut_proc.stdout.decode("utf-8"))

  shuf_args = ["shuf", "-n", f"{snp_lim}", cut_file]
  shuf_proc = subprocess.run(shuf_args, capture_output=True)
  with open(shuf_file, "w+") as shuf_file_out:
    shuf_file_out.write(shuf_proc.stdout.decode("utf-8"))

  extract_args = [PLINK1, "--bfile", input_files, "--extract", shuf_file, "--make-bed", "--out", output_files]
  extract_proc = subprocess.run(extract_args, capture_output=True)

  print("Sampling SNPs Done")


In [86]:
def convertToPlink(vcf_path: str, file_base: str, dir: str = PLINK_DIR, max_maf: float | None = None, max_alleles: int = 2):

  new_file_base = file_base
  if max_maf:
    new_file_base = f"{new_file_base}_maf-{max_maf}"

  out_dir = f"{dir}/{new_file_base}"
  out_file = f"{out_dir}/data"
    
  if not os.path.exists(out_dir):
    os.mkdir(out_dir)
  
  args = [PLINK_CMD, "--vcf", vcf_path, "--max-alleles", str(max_alleles), "--make-bed", "--out", out_file]
  if max_maf:
    args.extend(["--max-maf", str(max_maf)])

  a = subprocess.run(args, capture_output=True)
  print("VCF to PLINK done")
  return a

In [87]:
# Generating the VCF File
# file_base, vcf_file = output_generated(5000, 10000000, "5k", "10MB")

# Generating the Plink from VCF
# proc = convertToPlink(vcf_file, file_base)

In [96]:
# Varying population size

# vcf_inputs = [
#   (5000, 10000000, "5k", "10MB"),
#   (10000, 10000000, "10k", "10MB"),
#   (15000, 10000000, "15k", "10MB"),
#   (20000, 10000000, "20k", "10MB")
# ]
# vcf_inputs = [
#   (10000, 50000000, "10k", "50MB"),
#   (25000, 50000000, "25k", "50MB"),
#   (50000, 50000000, "50k", "50MB"),
#   (75000, 50000000, "75k", "50MB"),
#   (100000, 50000000, "100k", "50MB")
# ]
vcf_inputs = [
  (10000, 10000000, "10k", "10MB"),
  (25000, 10000000, "25k", "10MB"),
  (50000, 10000000, "50k", "10MB"),
  (75000, 10000000, "75k", "10MB"),
  (100000, 10000000, "100k", "10MB")
]
plink_inputs = (30000, "30k")

for i in vcf_inputs:
  file_base, vcf_file = output_generated(*i)
  proc = convertToPlink(vcf_file, file_base, PLINK_DIR)
  sampleSnps(file_base, plink_inputs[0], PLINK_DIR, plink_inputs[1])

Simulation done (Samples: 10000, length: 10000000): 38901 trees and 41516 sites
VCF output done
VCF to PLINK done
Sampling SNPs Done
Simulation done (Samples: 25000, length: 10000000): 42951 trees and 45350 sites
VCF output done
VCF to PLINK done
Sampling SNPs Done
Simulation done (Samples: 50000, length: 10000000): 45420 trees and 48693 sites
VCF output done
VCF to PLINK done
Sampling SNPs Done
Simulation done (Samples: 75000, length: 10000000): 47058 trees and 49718 sites
VCF output done
VCF to PLINK done
Sampling SNPs Done
Simulation done (Samples: 100000, length: 10000000): 48146 trees and 50571 sites
VCF output done
VCF to PLINK done
Sampling SNPs Done
