In [1]:
import os
from pathlib import Path
import numpy as np
import pickle as pk
import h5py


######################################################################################
######################################################################################

def save_to_hdf5(data_input: dict, hdf5_path: Path, gzip: bool = True) -> Path:
    data = data_input
    str_dt = h5py.string_dtype(encoding="utf-8")

    with h5py.File(hdf5_path, "w") as h5f:
        metadata_group = h5f.create_group("metadata")

        loci_array = np.array(data["loci"], dtype=str_dt)
        metadata_group.create_dataset("loci", data=loci_array)

        pheno_names_array = np.array(data["phenotype_names"], dtype=str_dt)
        metadata_group.create_dataset("phenotype_names", data=pheno_names_array)

        strains_group = h5f.create_group("strains")

        for idx, strain_id in enumerate(data["strain_names"]):
            strain_grp = strains_group.create_group(strain_id)

            pheno = np.array(data["phenotypes"][idx], dtype=np.float64)
            strain_grp.create_dataset("phenotype", data=pheno)

            genotype = np.array(data["genotypes"][idx], dtype=np.int8)
            strain_grp.create_dataset(
                "genotype",
                data=genotype,
                chunks=True,
                compression="gzip" if gzip else None,
            )

        print(f"{hdf5_path} generated from {data_input}.")

    return hdf5_path
out_dict={}


In [2]:

######################################################################################
######################################################################################
# Read phenotype file
phen_file = open('test_sim_WF_1kbt_100kups_5mb_p.txt', 'r')
phens = phen_file.read().split('\n')
phens = [x.split() for x in phens if x]  # Skip empty lines

# Extract phenotype information
out_dict = {}
out_dict['phenotype_names'] = phens[0][1:]  # Extract header of pheno names from first row
out_dict['strain_names'] = [x[0] for x in phens[1:] if x]  # Strain names from first column
out_dict['phenotypes'] = []

# Convert phenotypes to float, handling NA values
for x in phens[1:]:
    if not x:  # Skip empty lines
        continue
    row_phenos = []
    for y in x[1:]:
        if y == 'NA':
            row_phenos.append(0)  # Or use None/np.nan if preferred
        else:
            row_phenos.append(float(y))
    out_dict['phenotypes'].append(row_phenos)


In [3]:
len(out_dict['phenotypes'][1])

25

In [1]:

# Read genotype file - CSV format with headers
genotype_file = open('test_sim_WF_1kbt_100kups_5mb_g.txt', 'r')
gens = genotype_file.read().split('\n')
gens = [x.split(',') for x in gens if x]  # Split by comma

# Extract locus names - first row contains locus names directly (no empty cell for sample column)
out_dict['loci'] = gens[0]  # The entire first row contains locus names

# Process genotypes - CSV format with row names but no column name for sample IDs
new_coding_dict = {'0': [1, 0], '1': [0, 1]}
out_dict['genotypes'] = []

# For each individual row (starting from row 1 to skip header)
for row in gens[1:]:
    if not row or len(row) <= 1:  # Skip empty lines or incomplete rows
        continue

    # Process all genotypes for this individual (skip first column which is sample ID)
    ind_genotypes = []
    for geno in row[1:]:
        if geno.strip() in new_coding_dict:  # Strip whitespace just in case
            ind_genotypes.append(new_coding_dict[geno.strip()])
        else:
            # Handle missing or unexpected genotypes
            ind_genotypes.append([0, 0])

    out_dict['genotypes'].append(ind_genotypes)

# Write to output file
# import pickle as pk
# pk.dump(out_dict, open(snakemake.output[0], 'wb'))

: 

In [14]:
len(out_dict['genotypes'][0])


10000