### Test steps of HMM analysis - before bringing together in module.
Eventually will contain unit tests 
(and code that can transform to unit test)

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import socket as socket
import os as os
import sys as sys
import multiprocessing as mp
import h5py
import allel
import itertools as it

socket_name = socket.gethostname()
print(socket_name)

if socket_name.startswith("compute-"):
    print("HSM O2 Computational partition detected.")
    path = "/n/groups/reich/hringbauer/git/hapBLOCK/"  # The Path on Harvard Cluster
else: 
    raise RuntimeWarning("Not compatible machine. Check!!")

os.chdir(path)  # Set the right Path (in line with Atom default)
sys.path.append("./python3/") 
from emission import load_emission_model
from transition import load_transition_model
from hmm import load_fwd_bwd_func
from loaddata import load_loaddata

print(os.getcwd())
print(f"CPU Count: {mp.cpu_count()}")

compute-a-16-118.o2.rc.hms.harvard.edu
HSM O2 Computational partition detected.
/n/groups/reich/hringbauer/git/hapBLOCK
CPU Count: 32


# Create some test data

In [2]:
### Test data for biallelic case
ht1 = np.array([[0,1],[1,0],[0,1],[0.1,0.9]])
ht2 = np.array([[0,1],[1,0],[1,0],[0.5,0.5]])
ht3 = np.array([[0,1],[1,0],[1,0],[0.5,0.5]])
ht4 = np.array([[0,1],[1,0],[1,0],[0.5,0.5]])
hts = np.stack((ht1, ht2))
hts2 = np.stack((ht1,ht2,ht3,ht4))
p = np.array([0.9,0.8,0.7,0.5])

# Test Emission Module

In [13]:
e = load_emission_model(e_model="haploid_gl")
e_mat =e.give_emission_matrix(hts2, p)

In [14]:
res_exp = np.array([[0.6561, 0.0016, 0.0189, 0.0625],
               [0.729 , 0.008 , 0.    , 0.0625],
               [0.729 , 0.008 , 0.    , 0.0625],
               [0.729 , 0.008 , 0.063 , 0.0625],
               [0.729 , 0.008 , 0.063 , 0.0625]])
assert(np.isclose(e_mat,res_exp).all())

In [7]:
e = load_emission_model(e_model="haploid_gl")
e_mat =e.give_emission_matrix(hts2[:,:,0], p)

In [8]:
res_exp = np.array([[0.6561, 0.0016, 0.0189, 0.0625],
               [0.729 , 0.008 , 0.    , 0.0625],
               [0.729 , 0.008 , 0.    , 0.0625],
               [0.729 , 0.008 , 0.063 , 0.0625],
               [0.729 , 0.008 , 0.063 , 0.0625]])
assert(np.isclose(e_mat,res_exp).all())

# Test Transition Module

In [3]:
t = load_transition_model(t_model="standard")
t.set_params(ibd_in = 0.0005, ibd_out = 0.001, ibd_jump = 0.05, recalculate=False)
t_mat = t.calc_transition_rate(submat33=False)
assert(np.all(np.isclose(np.sum(t_mat, axis=1),0))) ### Sanity Check

### Calculate Full Transition Matrix

In [8]:
### Test producing the full transition matrix
r_vec = np.linspace(0,1.0,101)[1:]
t = load_transition_model(t_model="standard")
t.set_params(ibd_in = 0.0005, ibd_out = 0.001, ibd_jump = 0.05, recalculate=False)
t_mat_full = t.full_transition_matrix(r_vec, n=4,submat33=False)
np.shape(t_mat_full)

(100, 5, 5)

In [None]:
np.sum(t_mat_full, axis=2)

In [10]:
### Test producing the full transition matrix
r_vec = np.linspace(0,1.0,101)[1:]
t = load_transition_model(t_model="standard")
t.set_params(ibd_in = 0.0005, ibd_out = 0.001, ibd_jump = 0.05, recalculate=False)
t_mat = t.full_transition_matrix(r_vec, n=4)
np.shape(t_mat)

Reference Number: 4


(100, 3, 3)

In [None]:
t_mat_full[:,:3,:3] - t_mat

### Test HMM Module

In [17]:
fwd_bwd = load_fwd_bwd_func(h_model="FiveState")

In [18]:
post0 = fwd_bwd(np.log(e_mat), t_mat, in_val = 1e-4, full=False)

Total Log likelihood: -13.180


  """Entry point for launching an IPython kernel.


In [21]:
np.sum(np.exp(post0), axis=1)

array([1., 1., 1., 1.])

# Test loading HDF5 data

In [11]:
l = load_loaddata(l_model="hdf5", path="./data/hdf5/1240k_v43/ch")
l.set_params(iids=["SUC002", "SUC003"], ch=3)
hts_l, p, m = l.load_all_data()

Minimum Genetic Map: 0.0000 Morgan
Maximum Genetic Map: 2.2326 Morgan
Gaps bigger than 0.1 cM: 214
Maximum Gap: 0.2348 cM
Upper Gap Cutoff: 5.0000 cM
