# Imports

In [52]:
from subprocess import Popen
from pathlib import Path
import mmh3
import screed
import numpy as np
import re

# Utility functions

Define utility functions.

In [53]:
def get_uuid():
    return str(uuid.uuid4())

def load_genome_sequence(filepath):
    with open(filepath, 'r') as f:
        lines = f.readlines()
    sequence = ''.join([line.strip() for line in lines if not line.startswith('>')])
    return sequence.upper()
    
def encode_seq(seq, k):
    vocab = {'A':0, 'C':1, 'G':2, 'T':3}
    seq = [vocab[c] for c in seq if c in vocab]
    chunks = [seq[i:i+k] for i in range(0, len(seq)-k, k)]
    return np.array(chunks)  

# https://sourmash.readthedocs.io/en/latest/kmers-and-minhash.html

def build_kmers(sequence, ksize):
    kmers = []
    n_kmers = len(sequence) - ksize + 1

    for i in range(n_kmers):
        kmer = sequence[i : i + ksize]
        kmers.append(kmer)

    return kmers
    
def read_kmers_from_file(filename, ksize):
    all_kmers = []
    for record in screed.open(filename):
        sequence = record.sequence
        
        sequence = re.sub(r'[^ACGT]', 'N', sequence.upper())        
        kmers = build_kmers(sequence, ksize)
        all_kmers += kmers

    return all_kmers

def hash_kmer(kmer):
    rc_kmer = screed.rc(kmer)

    if kmer < rc_kmer:
        canonical_kmer = kmer
    else:
        canonical_kmer = rc_kmer

    hash = mmh3.hash64(canonical_kmer, 42)[0]
    if hash < 0:
        hash += 2**64
        
    return hash

def hash_kmers(kmers):
    hashes = []
    for kmer in kmers:
        hashes.append(hash_kmer(kmer))
    return hashes

def jaccard_similarity(a, b):
    a = set(a)
    b = set(b)

    intersection = len(a.intersection(b))
    union = len(a.union(b))

    return intersection / union

# Reference simulation

Execute the reference simulation.

In [22]:
# Reference values

M = 0
c=str(Path("data", "random_genome_chr_index.csv"))
r=str(Path("data", "combined_curated_TE_lib_ATOSZM_selected.fasta"))
o="data"
maxidn=95
minidn=80
maxsd=20
minsd=1
a=0.7
b=0.5
i=0.001
m=5
n=1
k = 100   
batch_size = 64

p="reference_simulation"
cmd = f"tegenomesimulator -M {M} -p {p} -c {c} -r {r} -o {o} -a {a} -b {b} -i {i} -m {m} -n {n} --maxidn {maxidn} --minidn {minidn} --maxsd {maxsd} --minsd {minsd}"
Popen(cmd, shell=True).wait()

Mode: 0
Running Random Synthesized mode.
Prefix: reference_simulation
Repeat: data/combined_curated_TE_lib_ATOSZM_selected.fasta
Chromosome Index: data/random_genome_chr_index.csv
Genome File: None
Alpha: 0.7
Beta: 0.5
Max Copies: 5
Min Copies: 1
Upper bound of mean identity: 95
Lower bound of mean identity: 80
Upper bound of sd of mean identity: 20
Lower bound of sd of mean ideneity: 1
Max chance of intact insertion: 0.001
Seed: 1
Output Directory: data

TE library table generated successfully. Output logged to data/TEgenomeSimulator_reference_simulation_result/TEgenomeSimulator.log
mode=0, running prep_yml_config.py for Random Genome Mode.

Config file generated successfully. Output logged to data/TEgenomeSimulator_reference_simulation_result/TEgenomeSimulator.log

Genome with non-overlap random TE insertions was generated successfully. Output logged to data/TEgenomeSimulator_reference_simulation_result/TEgenomeSimulator.log

Genome with non-overlap random and nested TE insertions wa

0

Load the genome sequence data to string.

In [60]:
reference_genome_sequence_path = f"{o}/TEgenomeSimulator_{p}_result/{p}_genome_sequence_out_final.fasta"
reference_kmer_sequences = read_kmers_from_file(reference_genome_sequence_path, 21)
reference_hashed_sequences = hash_kmers(reference_kmer_sequences)
reference_hashed_sequences[:5]

[16569318863910026406,
 10700716329976813798,
 41015238908800100,
 3845901871115767651,
 7715406540985638717]

In [62]:
jaccard_similarity(reference_hashed_sequences, reference_hashed_sequences)

1.0

In [None]:
import numpy as np
import pandas as pd

from calisim.data_model import (
	DistributionModel,
	ParameterDataType,
	ParameterSpecification,
)
from calisim.optimisation import OptimisationMethod, OptimisationMethodModel
from calisim.utils import get_examples_outdir

observed_data = reference_hashed_sequences

parameter_spec = ParameterSpecification(
	parameters=[
		DistributionModel(
			name="maxidn",
			distribution_name="uniform",
			distribution_args=[90, 97],
			data_type=ParameterDataType.DISCRETE,
		),
		DistributionModel(
			name="minidn",
			distribution_name="uniform",
			distribution_args=[75, 85],
			data_type=ParameterDataType.DISCRETE,
		),
		DistributionModel(
			name="maxsd",
			distribution_name="uniform",
			distribution_args=[15, 25],
			data_type=ParameterDataType.DISCRETE,
		),
		DistributionModel(
			name="minsd",
			distribution_name="uniform",
			distribution_args=[1, 4],
			data_type=ParameterDataType.DISCRETE,
		),
		DistributionModel(
			name="a",
			distribution_name="uniform",
			distribution_args=[0.5, 0.9],
			data_type=ParameterDataType.CONTINUOUS,
		),
		DistributionModel(
			name="b",
			distribution_name="uniform",
			distribution_args=[0.3, 0.7],
			data_type=ParameterDataType.CONTINUOUS,
		),
	]
)

def objective(
	parameters: dict, simulation_id: str, observed_data: np.ndarray | None, t: pd.Series
) -> float | list[float]:
	simulation_parameters = dict(h0=34.0, l0=5.9, t=t, gamma=0.84, delta=0.026)

	for k in ["alpha", "beta"]:
		simulation_parameters[k] = parameters[k]

	simulated_data = model.simulate(simulation_parameters).lynx.values
	metric = MeanSquaredError()
	discrepancy = metric.calculate(observed_data, simulated_data)
	return discrepancy


outdir = get_examples_outdir()
specification = OptimisationMethodModel(
	experiment_name="optuna_optimisation",
	parameter_spec=parameter_spec,
	observed_data=reference_hashed_sequences,
	outdir=outdir,
	method="tpes",
	directions=["minimize"],
	output_labels=["Jaccard Similarity"],
	n_iterations=3,
	method_kwargs=dict(n_startup_trials=50),
	calibration_func_kwargs=dict(t=observed_data.year),
)

calibrator = OptimisationMethod(
	calibration_func=objective, specification=specification, engine="optuna"
)

calibrator.specify().execute().analyze()

result_artifacts = "\n".join(calibrator.get_artifacts())
print(f"View results: \n{result_artifacts}")