In [1]:
from pafpy import PafFile
from tqdm.notebook import tqdm
from collections import defaultdict
import intervaltree as itree
from Bio import SeqIO
import matplotlib.pyplot as plt
from scipy import stats
import numpy as np
from collections import defaultdict
import time
plt.rcParams["figure.figsize"] = (10, 6)
plt.rcParams["figure.facecolor"] = "white"
plt.rcParams.update({'font.size': 18})
import math, re

In [2]:
from glob import glob
# from hurry.filesize import size
import psutil
import shlex
import csv
from pathlib import Path
import itertools
import random
from collections import namedtuple
from dataclasses import dataclass, field

In [3]:
Minimap2Params = namedtuple("Minimap2Params", "is_ont n")
MashMap3Params = namedtuple("MashMap3Params", "L k ss pi n")
MashMap2Params = namedtuple("MashMap2Params", "L k w pi n")
MashMapParams = namedtuple("MashMapParams", "L k ss pi n tool")
ParameterSet = namedtuple("ParameterSet", "dataset mashmap2_params mashmap3_params minimap2_params")
Dataset = namedtuple("Dataset", "name reference queries")

@dataclass
class RunProfile:
    time: int = 0
    memory: int = 0

In [4]:
ss_to_dens_5000 = {10: 0.0027033015587942942,
 20: 0.004999545524382726,
 30: 0.007225297850085516,
 40: 0.009414666280896978,
 50: 0.011580825266216262,
 60: 0.0137304687797538,
 70: 0.015867562596763904,
 75: 0.0169,
 80: 0.017994721638507495,
 90: 0.020113733315396117,
 100: 0.022225882956137456,
 110: 0.024332211034229525,
 120: 0.026433437214099118,
 130: 0.028530192910771867,
 140: 0.03062297923626709,
 150: 0.032712196059314504,
 160: 0.03479819307413521,
 170: 0.03688125618779697,
 180: 0.038961631636590104,
 190: 0.04103953307815283,
 200: 0.04310220928250878,
 300: 0.06375817159996389,
 400: 0.08429124072361874,
 500: 0.10474345862936014,
 600: 0.12513590112065218,
 700: 0.14548170418253295,
 800: 0.16578947227428895,
 900: 0.18606471061147348,
 1000: 0.206311821699839}
ss_to_w = {ss: int(np.round(2/dens)) - 1 for (ss, dens) in ss_to_dens_5000.items()}


In [6]:
human_ref = "/home/Users/blk6/Data/assemblies/human/T2T_v2.1.fna"
ONT_indel_balanced_reads = {ANI: f"/home/Users/blk6/Data/reads/human/ONT-{ANI}-nosd-balanced-d2-5000.fa" for ANI in [99, 98, 95]}

chm13_ONT_indel_balanced_datasets = {ANI: Dataset(f"chm13-ONT-indel-balanced-{ANI}-reads", human_ref, (ONT_indel_balanced_reads[ANI],)) for ANI in ONT_indel_balanced_reads}

In [7]:
dataset_run_profile = defaultdict(        # Dataset name
    lambda: defaultdict(                  # Tool ParamsObj
        lambda: {                         
            "map": RunProfile(), 
            "align": RunProfile()
        }
    )
)

In [8]:
toolnames = [
    "mashmap2",
    "mashmap3"]
runs = defaultdict(list)

# HiFi run
default_pi = 95
L = 5000
n = 1
sketch_sizes = [20, 50, 80, 100, 150, 200, 300, 400]
kmers = [19]

for k in kmers:
    for ss in sketch_sizes:

        for ANI, dataset in chm13_ONT_indel_balanced_datasets.items():
            pi = ANI-5 # What should this be???
            for tool in toolnames:
                if "empty" in tool:
                    continue
                mm3_params = MashMapParams(L, k, ss, pi, n, tool=tool)
                runs[dataset].append(mm3_params)



In [None]:
CONCURRENT_RUNS = 1
THREADS = 16
run_name_to_fstream = defaultdict(lambda: defaultdict(dict))
active_runs = {}
with open("runtimes.txt", "w") as profile_out:
    profile_out.write(f"Run-basename\ttime")

for dataset, params_list in runs.items():
    for params in params_list:
        dataset_name = dataset.name
        ref = dataset.reference
        queries = dataset.queries
        run_base_name = f"{params.tool}-ss{params.ss}-k{params.k}-p{params.pi}-n{params.n}-L{params.L}"
        output_dir = f"/home/Users/blk6/Contribute/wfmash/benchmarking/{dataset_name}/{run_base_name}"
        Path(output_dir).mkdir(parents=True, exist_ok=True)
        if "mashmap3" in params.tool:
            index_flag = ""
            cmd_args = f"""
                /usr/bin/time -v /home/Users/blk6/Contribute/MashMap/build-release/bin/mashmap
                --noMerge 
                --blockLength 1 
                {index_flag}
                -k {params.k} 
                --sketchSize {params.ss} 
                -t {THREADS} 
                -n {params.n} 
                --pi {params.pi} 
                -s {params.L} 
                -r {ref} 
                -q {' '.join(queries)} 
                """.replace("\n", " ")
            stdout_file = open(f"{output_dir}/{params.tool}.approx.paf", 'w')
        elif params.tool == "mashmap2":
            cmd_args = f"""
                /usr/bin/time -v /home/Users/blk6/Contribute/MashMap-orig/mashmap
                -k {params.k} 
                --window_size {math.ceil(ss_to_w[params.ss])} 
                -t {THREADS}
                --pi {params.pi} 
                -s {params.L} 
                -o {output_dir}/{params.tool}.approx.paf
                -r {ref} -q {' '.join(queries)}""".replace("\n", " ")
            stdout_file = open(f"{output_dir}/{params.tool}.out", 'w')
        elif params.tool == "minimap2":
            cmd_args = f"""
                /usr/bin/time -v minimap2
                {ref} {' '.join(queries)}
                -t {THREADS}
                -x map-ont""".replace("\n", " ")
            stdout_file = open(f"{output_dir}/{params.tool}.approx.paf", 'w')
        cmd_args = re.sub(' +', ' ', cmd_args)
        # print(cmd_args)
        run_name_to_fstream[run_base_name]["stdout"] = stdout_file
        stderr_file = open(f"{output_dir}/{params.tool}.err", 'w')
        run_name_to_fstream[run_base_name]["stderr"] = stderr_file
        print(f"Running {run_base_name}")
        active_runs[run_base_name] = (psutil.Popen(shlex.split(cmd_args), stdout=stdout_file, stderr=stderr_file), params.tool)
        time_tracker = 0
        while len(active_runs) >= CONCURRENT_RUNS:
            active_run_pairs = list(active_runs.items())
            for rbn, (p, st) in active_run_pairs:
                try:
                    if p.status() == "zombie":
                        raise psutil.NoSuchProcess(p.pid)
                    curtime = time.time()
                    if curtime > time_tracker + 5:
                        dataset_run_profile[dataset_name][run_base_name]["map"].memory = max(dataset_run_profile[dataset_name][run_base_name]["map"].memory, p.memory_info().rss)
                    t2 = p.cpu_times()
                    dataset_run_profile[dataset_name][run_base_name]["map"].time = t2.system + t2.user + t2.children_system + t2.children_user
                except psutil.NoSuchProcess as e:
                    with open("runtimes.txt", "a") as profile_out:
                        profile_out.write(f"{'EMPTY' if dataset_name == 'empty-dataset' else ''}-{rbn}\t{dataset_run_profile[dataset_name][run_base_name]['map'].time : .2f}\n")
                    print(f"Done {rbn}\t{dataset_run_profile[dataset_name][run_base_name]['map'].time : .2f}")
                    run_name_to_fstream[rbn]["stdout"].close()
                    run_name_to_fstream[rbn]["stderr"].close()
                    del run_name_to_fstream[rbn]
                    del active_runs[rbn]
            if curtime > time_tracker + 5:
                time_tracker = curtime

time_tracker = 0
while len(active_runs) > 0:
    active_run_pairs = list(active_runs.items())
    for rbn, (p, st) in active_run_pairs:
        t2 = None
        try:
            curtime = time.time()
            if curtime > time_tracker + 5:
                dataset_run_profile[dataset_name][run_base_name]["map"].memory = max(dataset_run_profile[dataset_name][run_base_name]["map"].memory, p.memory_info().rss)
            t2 = p.cpu_times()
            dataset_run_profile[dataset_name][run_base_name]["map"].time = t2.system + t2.user + t2.children_system + t2.children_user
            if p.status() == "zombie":
                raise psutil.NoSuchProcess(p.pid)
        except psutil.NoSuchProcess as e:
            with open("runtimes.txt", "a") as profile_out:
                profile_out.write(f"{'EMPTY' if dataset_name == 'empty-dataset' else ''}-{rbn}\t{dataset_run_profile[dataset_name][run_base_name]['map'].time : .2f}\n")
            print(f"Done {rbn}\t{dataset_run_profile[dataset_name][run_base_name]['map'].time : .2f}")
            run_name_to_fstream[rbn]["stdout"].close()
            run_name_to_fstream[rbn]["stderr"].close()
            del run_name_to_fstream[rbn]
            del active_runs[rbn]
    if curtime > time_tracker + 5:
        time_tracker = curtime
                        
