In [None]:
import random
from cyvcf2 import VCF

In [None]:
# ==========
# Parameters
# ==========
input_vcf = "merged.a9.filtered.qual99_fmissing0.2.maf0.05.biallelic.bcf"   # path to your filtered, biallelic VCF
num_snps = 200                 # how many SNPs to randomly sample
num_replicates = 10             # how many replicates to generate
output_prefix = "random_snps_200"   # prefix for output FASTA files
random_seed = None              # set to an integer for reproducible results, e.g. 42

In [None]:
if random_seed is not None:
    random.seed(random_seed)

In [None]:
# ============================
# 1. Read in the VCF variants
# ============================
vcf = VCF(input_vcf)
records = [variant for variant in vcf]  # store all variants in memory
samples = vcf.samples

print(f"Loaded {len(records)} variants from {input_vcf}.")
print(f"Samples in VCF: {samples}")

# Check we have enough variants to sample
if len(records) < num_snps:
    raise ValueError(
        f"ERROR: The VCF has only {len(records)} variants, "
        f"but you requested {num_snps}."
    )


In [None]:
for rep_index in range(1, num_replicates + 1):
    print(f"Generating replicate {rep_index} of {num_replicates} ...")

    # 2a. Randomly sample SNPs
    chosen_records = random.sample(records, num_snps)

    # 2b. Prepare a structure to hold the integer-coded genotype for each sample
    sample2seq = {sample: [] for sample in samples}

    # 2c. Fill sequence data (0,1,2,-)
    for record in chosen_records:
        genotypes = record.genotypes  # [ [g1, g2, phased, ...], [g1, g2, ...], ...]
        for i, sample in enumerate(samples):
            g1, g2 = genotypes[i][0], genotypes[i][1]

            # Missing genotype => '-'
            if g1 < 0 or g2 < 0:
                code = '-'
            else:
                # 0 = REF, 1 = ALT for each allele
                # Biallelic, so valid combos: (0,0), (0,1)/(1,0), (1,1)
                if g1 == 0 and g2 == 0:
                    code = '0'
                elif (g1 == 0 and g2 == 1) or (g1 == 1 and g2 == 0):
                    code = '1'
                elif g1 == 1 and g2 == 1:
                    code = '2'
                else:
                    # Shouldn't happen in a well-filtered biallelic VCF,
                    # but just in case, treat as missing:
                    code = '-'

            sample2seq[sample].append(code)

    # 2d. Write NEXUS for this replicate
    out_nexus = f"{output_prefix}_rep{rep_index}.nex"
    with open(out_nexus, "w") as out_f:
        # Header
        out_f.write("#NEXUS\n")
        out_f.write("[SNP matrix in integer format: 0=homREF, 1=het, 2=homALT, -=missing]\n\n")
        out_f.write("Begin data;\n")
        out_f.write(f"\tDimensions ntax={len(samples)} nchar={num_snps};\n")
        out_f.write('\tFormat datatype=integerdata symbols="012" gap=-;\n')
        out_f.write("\tMatrix\n")

        # Matrix lines
        for sample in samples:
            seq_str = "".join(sample2seq[sample])
            out_f.write(f"{sample}\t{seq_str}\n")

        out_f.write("\t;\nEnd;\n")

    print(f"  -> Wrote replicate {rep_index} NEXUS: {out_nexus}")

print("All replicates done!")

In [None]:
# Do per contig
for rep_index in range(1, num_replicates + 1):
    print(f"Generating replicate {rep_index} of {num_replicates} ...")

    # 2a. Randomly sample SNPs
    chosen_records = random.sample(records, num_snps)

    # 2b. Prepare a structure to hold the integer-coded genotype for each sample
    # Per contig
    sample2seq = {sample: [] for sample in samples}

    # Sort the records by contig and position
    chosen_records.sort(key=lambda r: (r.CHROM, r.start))

    # Store the contig for each entry
    contig2seq = {contig: [] for contig in set(r.CHROM for r in chosen_records)}
    # Count the number of SNPs per contig
    contig2num_snps = {contig: 0 for contig in contig2seq}

    # 2c. Fill sequence data (0,1,2,-)
    for record in chosen_records:
        contig2num_snps[record.CHROM] += 1
        genotypes = record.genotypes  # [ [g1, g2, phased, ...], [g1, g2, ...], ...]
        for i, sample in enumerate(samples):
            g1, g2 = genotypes[i][0], genotypes[i][1]

            # Missing genotype => '-'
            if g1 < 0 or g2 < 0:
                code = '-'
            else:
                # 0 = REF, 1 = ALT for each allele
                # Biallelic, so valid combos: (0,0), (0,1)/(1,0), (1,1)
                if g1 == 0 and g2 == 0:
                    code = '0'
                elif (g1 == 0 and g2 == 1) or (g1 == 1 and g2 == 0):
                    code = '1'
                elif g1 == 1 and g2 == 1:
                    code = '2'
                else:
                    # Shouldn't happen in a well-filtered biallelic VCF,
                    # but just in case, treat as missing:
                    code = '-'

            sample2seq[sample].append(code)

    # 2d. Write NEXUS for this replicate per contig

    for contig, snpcount in contig2num_snps.items():
        if snpcount == 0:
            continue
        out_nexus = f"{output_prefix}_rep{rep_index}_{contig}.nex"
        
        with open(out_nexus, "w") as out_f:
            # Header
            out_f.write("#NEXUS\n")
            out_f.write("[SNP matrix in integer format: 0=homREF, 1=het, 2=homALT, -=missing]\n\n")
            out_f.write("Begin data;\n")
            out_f.write(f"\tDimensions ntax={len(samples)} nchar={snpcount};\n")
            out_f.write('\tFormat datatype=integerdata symbols="012" gap=-;\n')
            out_f.write("\tMatrix\n")

            # Matrix lines
            for sample in samples:
                seq_str = "".join(sample2seq[sample][:snpcount])
                out_f.write(f"{sample}\t{seq_str}\n")

                # Remove the first n snps from the list
                sample2seq[sample] = sample2seq[sample][snpcount:]

            out_f.write("\t;\nEnd;\n")

        print(f"  -> Wrote replicate {rep_index} NEXUS: {out_nexus}")


print("All replicates done!")

In [None]:
from cyvcf2 import VCF

print("Generating SNP dataset from contig ptg000423c ...")

# Open BCF file and access the specific contig directly
vcf = VCF(input_vcf)
records = [record for record in vcf("ptg000423c")]  # Random access to contig

# Sort SNPs by position
chosen_records = sorted(records, key=lambda r: r.POS)

# Prepare a structure to hold the integer-coded genotype for each sample
samples = vcf.samples
sample2seq = {sample: [] for sample in samples}

# Fill sequence data (0,1,2,-)
for record in chosen_records:
    genotypes = record.genotypes  # [ [g1, g2, phased, ...], [g1, g2, ...], ...]
    for i, sample in enumerate(samples):
        g1, g2 = genotypes[i][0], genotypes[i][1]

        # Missing genotype => '-'
        if g1 < 0 or g2 < 0:
            code = '-'
        else:
            # 0 = REF, 1 = ALT for each allele
            if g1 == 0 and g2 == 0:
                code = '0'
            elif (g1 == 0 and g2 == 1) or (g1 == 1 and g2 == 0):
                code = '1'
            elif g1 == 1 and g2 == 1:
                code = '2'
            else:
                code = '-'

        sample2seq[sample].append(code)

# Write NEXUS file
out_nexus = f"{output_prefix}.nex"
with open(out_nexus, "w") as out_f:
    # Header
    out_f.write("#NEXUS\n")
    out_f.write("[SNP matrix in integer format: 0=homREF, 1=het, 2=homALT, -=missing]\n\n")
    out_f.write("Begin data;\n")
    out_f.write(f"\tDimensions ntax={len(samples)} nchar={len(chosen_records)};\n")
    out_f.write('\tFormat datatype=integerdata symbols="012" gap=-;\n')
    out_f.write("\tMatrix\n")

    # Matrix lines
    for sample in samples:
        seq_str = "".join(sample2seq[sample])
        out_f.write(f"{sample}\t{seq_str}\n")

    out_f.write("\t;\nEnd;\n")

print(f"  -> Wrote NEXUS file: {out_nexus}")
print("All SNP data processing done!")
