# Notebook to call ROH in parallel [Legacy as of February2020, refactored with new functions]
Includes Notebooks that import the code for the calling ROHs on Mosaics, and then functions for various cases to parallelize it

@Author: Harald Ringbauer, June 2019

In [7]:
import numpy as np
import os as os
import sys as sys
import multiprocessing as mp
import pandas as pd
import socket

socket_name = socket.gethostname()
print(socket_name)
if socket_name == "VioletQueen":
    path = "/home/harald/git/HAPSBURG/"   # The Path on Harald's machine
elif socket_name.startswith("midway2"):
    print("Midway jnovmbre partition detected.")
    path = "/project2/jnovembre/hringbauer/HAPSBURG/"  # The Path on Midway Cluster
else: 
    raise RuntimeWarning("Not compatible machine. Check!!")

os.chdir(path)  # Set the right Path (in line with Atom default)
print(os.getcwd()) # Show the current working directory. Should be HAPSBURG/Notebooks/ParallelRuns
print(f"CPU Count: {mp.cpu_count()}")



sys.path.append("./package/hapsburg/")  # Since now we are in the Root Directory
from hmm_inference import HMM_Analyze   # Do not move. Should be after sys.path..

from PackagesSupport.parallel_runs.helper_functions import split_up_roh_df, prepare_path
from PackagesSupport.parallel_runs.helper_functions import prepare_path, create_folders, postprocess_iid
#from PackagesSupport.hapsburg_run import hapsb_chrom, hapsb_ind

midway2-0402.rcc.local
Midway jnovmbre partition detected.
/project2/jnovembre/hringbauer/HAPSBURG
CPU Count: 28


# Define Functions and Paralellize Wrappers

In [3]:
def analyze_individual(iid, ch=3, n_ref=503, save=True, save_fp=False,
                       path_mosaic="./Simulated/1000G_Mosaic/TSI/ch3_5cm/",
                       exclude_pops=["TSI", ], prefix_out="", 
                       roh_in=1, roh_out=10, roh_jump=100, e_rate=0.001, logfile=True):
    """Run the analysis for one individual and chromosome.
    Wrapper for HMM Class"""
    
    ### Prepare output folder, and pipe output to log file if wanted
    path_out = prepare_path(path_mosaic, iid, ch, prefix_out, logfile=logfile)
    
    ### Do the full HMM Analysis
    hmm = HMM_Analyze(cython=2, p_model="MosaicHDF5", e_model="diploid_gt",
                      manual_load=True, save=save, save_fp=save_fp)  
    # "diploid_gt" for analysis of dpld. "haploid" for analysis of haploid

    ### Load and prepare the pre-processing Model
    hmm.load_preprocessing_model()              # Load the preprocessing Model
    hmm.p_obj.set_params(destroy_phase=True, prefix_out_data=prefix_out,
                        excluded=exclude_pops, h5_path_targets=path_mosaic + "data.h5",
                        base_out_folder=os.path.join(path_mosaic, "output",""))
    ### Set paths for reference: DELETE when run for with European Reference!!
    hmm.p_obj.set_params(h5_path1000g = "./Data/1000Genomes/HDF5/1240kHDF5/all1240/chr", 
                         meta_path_ref = "./Data/1000Genomes/Individuals/meta_df_all.csv")
    
    hmm.load_data(iid=iid, ch=ch, n_ref=n_ref)  # Load the actual Data
    hmm.load_secondary_objects()  # Load transition, emission and postprocessing objects
    
    ### Set the Parameters
    hmm.t_obj.set_params(roh_in=roh_in, roh_out=roh_out, roh_jump=roh_jump)
    hmm.e_obj.set_params(e_rate=e_rate)
    
    #hmm.calc_viterbi_path(save=save)           # Calculate the Viterbi Path.
    hmm.calc_posterior(save=save)              # Calculate the Posterior.
    hmm.post_processing(save=save)             # Do the Post-Processing.
    
    ### Split up the (only works for Mosaic so be careful when transferring this code)
    split_up_roh_df(path_mosaic, path_out, iid)
    
    print(f"Analysis of {iid} and Chr. {ch} successfully concluded!")

#########################################################
#########################################################
### Do the Read Count Analysis Function

def analyze_individual_rc(iid, ch=3, n_ref=503, save=True, save_fp=False,
                          path_mosaic="./Simulated/1000G_Mosaic/TSI/RC1.0/ch3_5cm/",
                          exclude_pops=["TSI", ], prefix_out="",
                          roh_in=1, roh_out=10, roh_jump=100, e_rate=0.01, e_rate_ref=0.001, logfile=True):
    """Run the analysis for one individual and chromosome on readcount data
    Wrapper for HMM Class"""
    
    ### Create Folder if needed, and pipe output if wanted
    path_output = os.path.join(path_mosaic, "output", "")  # Include the "output" Folder
    path_out = prepare_path(path_output, iid, ch, prefix_out, logfile=logfile)
    
    if os.path.exists(os.path.join(path_out, "roh.csv")):
        return
    
    hmm = HMM_Analyze(cython=2, p_model="MosaicHDF5", e_model="readcount", post_model="Standard",
                      manual_load=True, save=save, save_fp=save_fp)

    # Load and prepare the pre-processing Model
    hmm.load_preprocessing_model()              # Load the preprocessing Model
    hmm.p_obj.set_params(readcounts=True, destroy_phase=False,
                         prefix_out_data=prefix_out, excluded=exclude_pops,
                         h5_path_targets=path_mosaic + "data.h5",
                         base_out_folder=path_output)

    ### Set paths for reference: DELETE when run for with European Reference!!
    hmm.p_obj.set_params(h5_path1000g = "./Data/1000Genomes/HDF5/1240kHDF5/all1240/chr", 
                         meta_path_ref = "./Data/1000Genomes/Individuals/meta_df_all.csv")

    hmm.load_data(iid=iid, ch=ch, n_ref=n_ref)  # Load the actual Data
    hmm.load_secondary_objects()

    ### Set the Parameters
    hmm.e_obj.set_params(e_rate=e_rate, e_rate_ref=e_rate_ref)
    hmm.t_obj.set_params(roh_in=roh_in, roh_out=roh_out, roh_jump=roh_jump)

    #hmm.calc_viterbi_path(save=save)           # Calculate the Viterbi Path.
    hmm.calc_posterior(save=save)              # Calculate the Posterior.
    hmm.post_processing(save=save)             # Do the Post-Processing.
    
    ### Split up the (only works for Mosaic so be careful when transferring this code)
    split_up_roh_df(path_mosaic, path_out, iid)
    
#########################################################
#########################################################
    
def multi_run(fun, prms, processes = 4):
    """Implementation of running in Parallel.
    fun: Function
    prms: The Parameter Files
    processes: How many Processes to use"""
    print(f"Running {len(prms)} jobs in parallel.")
    with mp.Pool(processes = processes) as pool:
        results = pool.starmap(fun, prms)

# Run Parallel Calling on TSI (single Target HDF5)

In [19]:
### Prepare Parameter files and run
#### Create the parameters array for the starmap:
ch = 3
n_ref = 503  # 2504 503
save=True
save_fp=False
base_path="./Simulated/1000G_Mosaic/TSI5/"
exclude_pops = ["TSI", ]
prefix_out = "test_d/"  # allRef
roh_in = 100 
roh_out= 100
roh_jump= 300
e_rate = 0.001  # The Error Rate
n = 100
logfile = False

lengths = [0, 2, 4, 6, 8, 10] # Which Block Lengths to test
lengths = [4, 0]  # Only do the relevant ones for key performance testing

### Create list of IIDs and of Folders
iids = ["iid" + str(i) for i in range(n)]   # Prepare List of iids
folders = [base_path + "ch" + str(ch) + "_" + str(int(l)) + "cm/" for l in lengths]  # Prepare Length folders

### Create the List of Parameter Lists (input for starmap)
prms = []

for f in folders:
    for iid in iids:
        new_par = [iid, ch, n_ref, save, save_fp, f, exclude_pops, prefix_out, roh_in, roh_out, roh_jump, e_rate, logfile]
        prms.append(new_par)  # Append to the Parameters

assert(len(prms[0])==13)   # The function takes 13 Parameters as input

In [None]:
multi_run(analyze_individual, prms, processes = 5)

Running 600 jobs in parallel.
Set Output Log path: ./Simulated/1000G_Mosaic/YRI/ch3_0cm/output/iid0/chr/3/allRef/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/YRI/ch3_0cm/output/iid90/chr/3/allRef/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/YRI/ch3_2cm/output/iid20/chr/3/allRef/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/YRI/ch3_0cm/output/iid30/chr/3/allRef/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/YRI/ch3_0cm/output/iid60/chr/3/allRef/hmm_run_log.txt


In [1]:
print("Hello? Blizzard?")
print("Run complete")

Hello? Blizzard?
Run complete


In [16]:
len(prms)

200

### Do a single example run

In [17]:
%%time
analyze_individual(*prms[0])

Using Low-Mem Cython Linear Speed Up.
Loaded Pre Processing Model: MosaicHDF5
Loading Individual: iid0

Loaded 77652 variants
Loaded 100 individuals
HDF5 loaded from ./Simulated/1000G_Mosaic/TSI5/ch3_4cm/data.h5

Loaded 77652 variants
Loaded 2504 individuals
HDF5 loaded from ./Data/1000Genomes/HDF5/1240kHDF5/all1240/chr3.hdf5

Intersection on Positions: 77652
Nr of Matching Refs: 77652 / 77652
Full Intersection Ref/Alt Identical: 77652 / 77652
2397 / 2504 Individuals included in Reference
Extraction of 2 Haplotypes Complete!
Extraction of 1000 Haplotypes Complete!
Reduced to markers called 77652 / 77652
(Fraction SNP: 1.0)
Successfully saved to: ./Simulated/1000G_Mosaic/TSI5/ch3_4cm/output/iid0/chr3/test_d/
Shuffling phase of target...
Successfully loaded Data from: ./Simulated/1000G_Mosaic/TSI5/ch3_4cm/output/iid0/chr3/test_d/
Loaded Emission Model: diploid_gt
Loaded Transition Model: model
Loaded Post Processing Model: Standard
Minimum Genetic Map: 0.0000
Maximum Genetic Map: 2.2326


# Call ROHS Blocks within multiple target HDF5s

In [20]:
### Prepare Parameter files and run
#### Create the parameters array for the starmap:
ch = 3
n_ref = 503
save=True
save_fp=False
base_path="./Simulated/1000G_Mosaic/"
exclude_pops = ["TSI", ]
prefix_out = "ROH1000/" # Initially: None

roh_in = 100 
roh_out= 100
roh_jump= 1000
e_rate = 0.001  # The Error Rate

n = 100
#targets = ["CHB", "CLM", "YRI"]
targets = ["TSI5",]
lengths = [0, 2, 4, 6, 8, 10]
logfile = True

### Create list of IIDs and of Folders
iids = ["iid" + str(i) for i in range(n)]   # Prepare List of iids

### Create the List of Parameter Lists (input for starmap)
prms = []

for t in targets:
    base_path1 = base_path + t + "/"
    folders = [base_path1 + "ch" + str(ch) + "_" + str(int(l)) + "cm/" for l in lengths]  # Prepare Length folders
    for f in folders:
        for iid in iids:
            new_par = [iid, ch, n_ref, save, save_fp, f, exclude_pops, prefix_out, roh_in, roh_out, roh_jump, e_rate, logfile]
            prms.append(new_par)  # Append to the Parameters

assert(len(prms[0])==13)   # The function takes 12 Parameters as input

In [4]:
#multi_run(analyze_individual, [prms[0],], processes = 1)  # To Trouble Shoot
len(prms)

600

In [5]:
multi_run(analyze_individual, prms, processes = 8)

Running 600 jobs in parallel.
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/output/iid57/chr3/ROH1000/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_2cm/output/iid33/chr3/ROH1000/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_2cm/output/iid14/chr3/ROH1000/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/output/iid38/chr3/ROH1000/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/output/iid19/chr3/ROH1000/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/output/iid76/chr3/ROH1000/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/output/iid0/chr3/ROH1000/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/output/iid95/chr3/ROH1000/hmm_run_log.txt


In [6]:
print("Hello? Blizzard?")

Hello? Blizzard?


# Call ROH for multiple error levels (and multiple lengths)

In [3]:
### Prepare Parameter files and run
#### Create the parameters array for the starmap:
ch = 3
n_ref = 503
save=True
save_fp=False
base_path="./Simulated/1000G_Mosaic/TSI5/"
exclude_pops = ["TSI", ]
roh_in = 100 
roh_out= 100
roh_jump= 385
e_rate = 0.01  # The Error Rate
n = 100
prefix_out = "e01/"   # Error saved in folder structure

### The arrays to iterate over
lengths = [0, 2, 4, 6, 8, 10] # For chromosomes
error_vec = np.logspace(-3,-1, 8)

### Create list of IIDs and of Folders
iids = ["iid" + str(i) for i in range(n)]   # Prepare List of iids
logfile = True
#folders = [base_path + "ch" + str(ch) + "_" + str(int(l)) + "cm/" for l in lengths]  # Prepare Length folders

### Create the List of Parameter Lists (input for starmap)
prms = []

for l in lengths:
    for e in error_vec:
        e_print = str(round(e, 4)).split(".")[1] # Extract four digits after decimal         
        f = base_path + "ch3_" + str(l) + "cm/error/" + e_print + "/"   

        for iid in iids:
            new_par = [iid, ch, n_ref, save, save_fp, f, exclude_pops, prefix_out, roh_in, roh_out, roh_jump, e_rate, logfile]
            prms.append(new_par)  # Append to the Parameters

assert(len(prms[0])==13)   # The function takes 12 Parameters as input

In [None]:
multi_run(analyze_individual, prms, processes = 8)

Running 4800 jobs in parallel.
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/error/001/output/iid0/chr3/e01/hmm_run_log.txt
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/error/0072/output/iid0/chr3/e01/hmm_run_log.txt
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/error/0019/output/iid50/chr3/e01/hmm_run_log.txt
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/error/0139/output/iid50/chr3/e01/hmm_run_log.txt
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_2cm/error/0037/output/iid50/chr3/e01/hmm_run_log.txt
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/error/0518/output/iid0/chr3/e01/hmm_run_log.txt
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/error/1/output/iid50/chr3/e01/hmm_run_log.txt
Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_2cm/error/0019/output/iid0/chr3/e01/hmm_run_log.txt


In [None]:
print("Hello? Blizzard?")

# Call ROH for multiple downsample levels (and multiple lengths)

In [9]:
### Prepare Parameter files and run
#### Create the parameters array for the starmap:
ch = 3
n_ref = 503
save=True
save_fp=False
base_path="./Simulated/1000G_Mosaic/TSI5/"   #TSI5 CHB
exclude_pops = ["TSI", ]
roh_in = 100 
roh_out= 100
roh_jump= 385
e_rate = 0.001    # The Error Rate
n = 100
prefix_out = "diploidGT/"   #  e01/ Error saved in folder structure

### The arrays to iterate over
lengths = [0, 2, 4, 6, 8, 10] 
missing_vec = np.linspace(0.1, 1.0, 10)
#missing_vec = np.array([0.1, 0.2])
logfile = True

### Create list of IIDs and of Folders
iids = ["iid" + str(i) for i in range(n)]   # Prepare List of iids

### Create the List of Parameter Lists (input for starmap)
prms = []

for l in lengths:
    for m in missing_vec:
        m_print = str(round(m, 4)).split(".")[1] # Extract four digits after decimal         
        f = base_path + "ch3_" + str(l) + "cm/missing/" + m_print + "/"   

        for iid in iids:
            new_par = [iid, ch, n_ref, save, save_fp, f, exclude_pops, prefix_out, roh_in, roh_out, roh_jump, e_rate, logfile]
            prms.append(new_par)  # Append to the Parameters

assert(len(prms[0])==13)   # The function takes 12 Parameters as input

In [None]:
multi_run(analyze_individual, prms, processes = 6)

Running 6000 jobs in parallel.
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/missing/1/output/iid0/chr3/diploidGT/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_2cm/missing/1/output/iid0/chr3/diploidGT/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/missing/3/output/iid50/chr3/diploidGT/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/missing/8/output/iid50/chr3/diploidGT/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_0cm/missing/6/output/iid0/chr3/diploidGT/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/ch3_2cm/missing/3/output/iid50/chr3/diploidGT/hmm_run_log.txt


In [None]:
print("Hello? Blizzard?")

# Call ROH for ReadCount data (Normal or Lambda)
For Lambda change folder name (parameter f below)

In above function analyze_individual_rc it is hard-coded which reference files to use

In [4]:
### Prepare Parameter files and run
#### Create the parameters array for the starmap:
ch = 3
n_ref=2504
save=True
save_fp=False
base_path="./Simulated/1000G_Mosaic/TSI5/"   #TSI5 CHB
exclude_pops = ["TSI", ]
roh_in = 100 
roh_out= 100
roh_jump= 385
e_rate = 0.001    # The Error Rate for Read Count
e_rate_ref = 0.001 # The  Error Rate for Reference Genotypes
n = 100  # The Number of Individuals
prefix_out = "allref/"   #  e01/ e001/ Error saved in folder structure ROH385

### The arrays to iterate over
lengths = [0, 2, 4, 6, 8, 10] 

mean_rcs = np.linspace(0.1, 1, 10)
#mean_rcs= [mean_rcs[-1],]
logfile = True

### Create list of IIDs and of Folders
iids = ["iid" + str(i) for i in range(n)]   # Prepare List of iids

### Create the List of Parameter Lists (input for starmap)
prms = []

for m_rc in mean_rcs:
    for l in lengths:      
        #f = base_path + "lambda_rc" + f"{m_rc:.1f}" + "/ch3_" + str(l) + "cm/"   # lambda_rc or rc
        f = base_path + "lambda_rc" + str(m_rc) + "/ch3_" + str(l) + "cm/"   # lambda_rc or rc
        
        for iid in iids:
            new_par = [iid, ch, n_ref, save, save_fp, f, exclude_pops, prefix_out, roh_in, roh_out, roh_jump, e_rate, e_rate_ref, logfile]
            prms.append(new_par)  # Append to the Parameters

assert(len(prms[0]) == 14)  # The RC function takes 14 Parameters as input

In [None]:
multi_run(analyze_individual_rc, prms[3000:], processes = 6)

Running 3000 jobs in parallel.
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_0cm/output/iid0/chr3/allref/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_2cm/output/iid25/chr3/allref/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_4cm/output/iid50/chr3/allref/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_6cm/output/iid75/chr3/allref/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.7000000000000001/ch3_0cm/output/iid25/chr3/allref/hmm_run_log.txt
Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_10cm/output/iid0/chr3/allref/hmm_run_log.txt


In [7]:
print("Hello? Blizzard?")

Hello? Blizzard?


### Delete unnecessary folders
(Introduced by happy little accident where "output" was forgotten in prepare path

In [None]:
for prm in prms:
    print(f"Doing {prm[0]}...")
    f0 = os.path.join(prm[5], prm[0], "")
    !rm -r $f0

In [None]:
os.path.join(f, )

### Test run for single individual

In [12]:
print(len(prms))

6000


In [17]:
prms[3000]

['iid0',
 3,
 2504,
 True,
 False,
 './Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_0cm/',
 ['TSI'],
 'allref/',
 100,
 100,
 385,
 0.001,
 0.001,
 False]

In [15]:
%%time
analyze_individual_rc(*prms[3300])

Using Low-Mem Cython Linear Speed Up.
Loaded Pre Processing Model: MosaicHDF5
Loading Individual: iid0
Creating folder ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_6cm/output/iid0/chr3/allref/...

Loaded 77650 variants
Loaded 100 individuals
HDF5 loaded from ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_6cm/data.h5

Loaded 77652 variants
Loaded 2504 individuals
HDF5 loaded from ./Data/1000Genomes/HDF5/1240kHDF5/all1240/chr3.hdf5

Intersection on Positions: 77650
Nr of Matching Refs: 77650 / 77650
Full Intersection Ref/Alt Identical: 77650 / 77650
2397 / 2504 Individuals included in Reference
Extraction of 2 Haplotypes complete
Extraction of 4794 Haplotypes complete
Reduced to markers called 28745 / 77650
(Fraction SNP: 0.37018673535093366)
Successfully saved to: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_6cm/output/iid0/chr3/allref/
Loading Readcounts...
Mean Readcount markers loaded: 1.62362
Successfully loaded Data from: ./Simulated/1000G_Mosaic/TSI5/lambda_rc0.6/ch3_6cm/outpu

# Area 51

### Test single parameter run
Set logfile=False in analyze_individual to print output

In [10]:
analyze_individual_rc(*prms[0])

Set Output Log path: ./Simulated/1000G_Mosaic/TSI5/lambda_rc1.0/ch3_8cm/output/iid0/chr3/hmm_run_log.txt
