In [None]:
from Bio import SeqIO
from Bio.SeqIO.QualityIO import FastqGeneralIterator
from scipy.stats import binom
from functools import reduce
from time import sleep
from Bio import SeqIO
from subprocess import Popen

import mappy as mp
import numpy as np
import pandas as pd
import altair as alt

import gzip
import random
import itertools
import time
import random
import math
import os
import subprocess
import paramiko
import dateutil.parser
import re
import signal

server_machine='www.example.com'

TARGET_CONTIGS = {
    'mrsa' : 'CP000253.1',
    'cov' :'MN908947.3'
}

MAPPING_PRESETS = {
    'wgs' : 'sr',
    'ont': 'map-ont'
}

DATABASES = {
    'NEGATIVE' : {
        'mrsa' : 'human',
        'cov' : 'human'
    },
    'POSITIVE' : {
        'mrsa' : 'mrsa',
        'cov' : 'covid'        
    },
    'COMBINED' : {
        'mrsa' : 'mrsa_human',
        'cov' : 'cov_human'            
    }
}

CHUNK_SIZES = [1_000,2_500,5_000,10_000,25_000,50_000,100_000,300_000,1_000_000,3_000_000]
BUFFER_SIZES = [1_000,2_500,5_000,10_000,25_000,50_000,100_000,300_000,1_000_000,3_000_000]

# Step 1: Generating of Test Data

We have three isolates for Sars-Cov2 and three isolates for MRSA as well as three individuals from the 1000 genomes project. For each isolate we have Nanopore and Illumina reads.
We mix 10.000 reads of human reads with 990.000 pathogen reads to create test data.

In [None]:
READS_HUMAN = 10_000
READS_PATHOGEN = 990_000

SAMPLES = {}
SAMPLES['HUMAN1']={
    'ONT' : '20210519_210512_21-lee-006_PCT0053_2-A9-D9_guppy-5.0.11-sup-prom_fastq_pass.fastq.gz',
    'WGS_1' : 'SRR015521_1.filt.fastq.gz' ,
    'WGS_2' : 'SRR015521_2.filt.fastq.gz' 
}
SAMPLES['HUMAN2']={
    'ONT' : '20211102_211027_21-lee-006_PCT0053_2-A5-D5_guppy-5.0.11-sup-prom_fastq_pass.fastq.gz',
    'WGS_1' : 'ERR055344_1.filt.fastq.gz' ,
    'WGS_2' : 'ERR055344_2.filt.fastq.gz' 
}
SAMPLES['HUMAN3']={
    'ONT' : '20210913_210831_21-lee-006_PCT0053_2-A1-D1_guppy-5.0.11-sup-prom_fastq_pass.fastq.gz',
    'WGS_1' : 'ERR016221_1.filt.fastq.gz' ,
    'WGS_2' : 'ERR016221_2.filt.fastq.gz' 
}

SAMPLES['MRSA1']={
    'ONT' : 'SRR25890280.fastq',
    'WGS_1' : 'SRR25890263_1.fastq' ,
    'WGS_2' : 'SRR25890263_2.fastq' 
}
SAMPLES['MRSA2']={
    'ONT' : 'SRR25890279.fastq',
    'WGS_1' : 'SRR25890262_1.fastq' ,
    'WGS_2' : 'SRR25890262_2.fastq' 
}
SAMPLES['MRSA3']={
    'ONT' : 'SRR25890268.fastq',
    'WGS_1' : 'SRR25890261_1.fastq' ,
    'WGS_2' : 'SRR25890261_2.fastq' 
}

SAMPLES['COV1']={
    'ONT' : 'SRR16555792.fastq',
    'WGS_1' : 'SRR16555926_1.fastq' ,
    'WGS_2' : 'SRR16555926_2.fastq' 
}
SAMPLES['COV2']={
    'ONT' : 'SRR16555741.fastq',
    'WGS_1' : 'SRR16555757_1.fastq' ,
    'WGS_2' : 'SRR16555757_2.fastq' 
}
SAMPLES['COV3']={
    'ONT' : 'SRR16555592.fastq',
    'WGS_1' : 'SRR16555506_1.fastq' ,
    'WGS_2' : 'SRR16555506_2.fastq' 
}

In [None]:
#pre-sample human for runtime efficiency

cmd = f"seqtk sample -s100 {SAMPLES['HUMAN1']['WGS_1']} 100000 > human1.wgs1.fq"
!{cmd}
cmd = f"seqtk sample -s100 {SAMPLES['HUMAN1']['WGS_2']} 100000 > human1.wgs2.fq"
!{cmd}

cmd = f"seqtk sample -s100 {SAMPLES['HUMAN2']['WGS_1']} 100000 > human2.wgs1.fq"
!{cmd}
cmd = f"seqtk sample -s100 {SAMPLES['HUMAN2']['WGS_2']} 100000 > human2.wgs2.fq"
!{cmd}

cmd = f"seqtk sample -s100 {SAMPLES['HUMAN3']['WGS_1']} 100000 > human3.wgs1.fq"
!{cmd}
cmd = f"seqtk sample -s100 {SAMPLES['HUMAN3']['WGS_2']} 100000 > human3.wgs2.fq"
!{cmd}

In [None]:
cmd = f"seqtk sample -s100 {SAMPLES['HUMAN1']['ONT']} 100000 > human1.ont.fq"
!{cmd}

cmd = f"seqtk sample -s100 {SAMPLES['HUMAN2']['ONT']} 100000 > human2.ont.fq"
!{cmd}

cmd = f"seqtk sample -s100 {SAMPLES['HUMAN3']['ONT']} 100000 > human3.ont.fq"
!{cmd}


In [None]:
def get_x_random_reads(files,x,seed=50123):
    
    #Determine read count total
    print('Pass 1, determining total read count')
    read_count = 0
    fs = []
    for pair_idx, f in enumerate(files):
        print(f'Adding {f} to pool')
        if f.endswith('.gz'):
            fs.append(SeqIO.parse(gzip.open(f, "rt"),'fastq'))
        else:
            fs.append(SeqIO.parse(f,'fastq'))
    for stream in zip(*fs):
        read_count += 1
    print(f'Found a total of {read_count} reads')
    
    random.seed(seed)
    selected_ids = set(random.sample(range(read_count),x))
    
    print('Pass 2, selecting reads')
    selected_reads = []
    fs = []    
    for pair_idx, f in enumerate(files):
        print(f'Adding {f} to pool')
        if f.endswith('.gz'):
            fs.append(SeqIO.parse(gzip.open(f, "rt"),'fastq'))
        else:
            fs.append(SeqIO.parse(f,'fastq'))
        
    for idx, reads in enumerate(zip(*fs)):
        if idx in selected_ids:
            selected_ids.remove(idx)
            selected_reads.append(reads)
            if len(selected_ids) == 0:
                break
    return selected_reads

In [None]:
def add_suffix(reads,suffix):
    for read_idx,paired_reads in enumerate(reads):
        for read in paired_reads:
            read.id += '_'+suffix

In [None]:
reads_mrsa_1=get_x_random_reads([SAMPLES['MRSA1']['WGS_1'],SAMPLES['MRSA1']['WGS_2']],330_000+62892)
#sample 2 only has 204215 reads
reads_mrsa_2=get_x_random_reads([SAMPLES['MRSA2']['WGS_1'],SAMPLES['MRSA2']['WGS_2']],204_215)
reads_mrsa_3=get_x_random_reads([SAMPLES['MRSA3']['WGS_1'],SAMPLES['MRSA3']['WGS_2']],330_000+62893)

reads_human_1=get_x_random_reads(['human1.wgs1.fq','human1.wgs2.fq'],3333)
reads_human_2=get_x_random_reads(['human2.wgs1.fq','human2.wgs2.fq'],3333)
reads_human_3=get_x_random_reads(['human3.wgs1.fq','human3.wgs2.fq'],3334)

reads_mrsa = reads_mrsa_1+reads_mrsa_2+reads_mrsa_3
add_suffix(reads_mrsa,'pat')
reads_human = reads_human_1+reads_human_2+reads_human_3
add_suffix(reads_human,'hom')

reads = reads_mrsa+reads_human

for pair_idx in range(len(reads[0])):
    SeqIO.write((read[pair_idx] for read in reads),f'mrsa_{pair_idx+1}_wgs.fq','fastq')

In [None]:
reads_cov_1=get_x_random_reads([SAMPLES['COV1']['WGS_1'],SAMPLES['COV1']['WGS_2']],330_000)
reads_cov_2=get_x_random_reads([SAMPLES['COV2']['WGS_1'],SAMPLES['COV2']['WGS_2']],330_000)
reads_cov_3=get_x_random_reads([SAMPLES['COV3']['WGS_1'],SAMPLES['COV3']['WGS_2']],330_000)

reads_human_1=get_x_random_reads(['human1.wgs1.fq','human1.wgs2.fq'],3333,seed=123)
reads_human_2=get_x_random_reads(['human2.wgs1.fq','human2.wgs2.fq'],3333,seed=123)
reads_human_3=get_x_random_reads(['human3.wgs1.fq','human3.wgs2.fq'],3334,seed=123)

reads_cov = reads_cov_1+reads_cov_2+reads_cov_3
add_suffix(reads_cov,'pat')
reads_human = reads_human_1+reads_human_2+reads_human_3
add_suffix(reads_human,'hom')

reads = reads_cov+reads_human

for pair_idx in range(len(reads[0])):
    SeqIO.write((read[pair_idx] for read in reads),f'cov_{pair_idx+1}_wgs.fq','fastq')

In [None]:
reads_mrsa_1=get_x_random_reads([SAMPLES['MRSA1']['ONT']],82_500)
reads_mrsa_2=get_x_random_reads([SAMPLES['MRSA2']['ONT']],82_500)
reads_mrsa_3=get_x_random_reads([SAMPLES['MRSA3']['ONT']],82_500)

reads_human_1=get_x_random_reads(['human1.ont.fq'],833,seed=999)
reads_human_2=get_x_random_reads(['human2.ont.fq'],833,seed=999)
reads_human_3=get_x_random_reads(['human3.ont.fq'],834,seed=999)

reads_mrsa = reads_mrsa_1+reads_mrsa_2+reads_mrsa_3
add_suffix(reads_mrsa,'pat')
reads_human = reads_human_1+reads_human_2+reads_human_3
add_suffix(reads_human,'hom')

reads = reads_mrsa+reads_human

for pair_idx in range(len(reads[0])):
    SeqIO.write((read[pair_idx] for read in reads),f'mrsa_{pair_idx+1}_ont.fq','fastq')

In [None]:
reads_cov_1=get_x_random_reads([SAMPLES['COV1']['ONT']],82_500)
reads_cov_2=get_x_random_reads([SAMPLES['COV2']['ONT']],82_500)
reads_cov_3=get_x_random_reads([SAMPLES['COV3']['ONT']],82_500)

reads_human_1=get_x_random_reads(['human1.ont.fq'],833,seed=333)
reads_human_2=get_x_random_reads(['human2.ont.fq'],833,seed=333)
reads_human_3=get_x_random_reads(['human3.ont.fq'],834,seed=333)

reads_cov = reads_cov_1+reads_cov_2+reads_cov_3
add_suffix(reads_cov,'pat')
reads_human = reads_human_1+reads_human_2+reads_human_3
add_suffix(reads_human,'hom')

reads = reads_cov+reads_human

for pair_idx in range(len(reads[0])):
    SeqIO.write((read[pair_idx] for read in reads),f'cov_{pair_idx+1}_ont.fq','fastq')

# Step 2: Running ReadItAndKeep and evaulating the results

In [None]:
#Install ReadItAndKeep conda environment according to readme
cmd = "mamba create -y -n readitandkeep -c conda-forge -c bioconda read-it-and-keep"
!{cmd}

In [None]:
#COV Illumina
cmd = 'source ~/miniforge3/etc/profile.d/conda.sh && conda activate readitandkeep && readItAndKeep --ref_fasta MN908947.3.no_poly_A.fa --tech illumina --reads1 cov_1_wgs.fq --reads2 cov_2_wgs.fq -o riak_cov_wgs'
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 990_000
total_p = 10_000

for record in SeqIO.parse(gzip.open('riak_cov_wgs.reads_1.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

In [None]:
#COV Ont
cmd = 'source ~/miniforge3/etc/profile.d/conda.sh && conda activate readitandkeep && readItAndKeep --ref_fasta MN908947.3.no_poly_A.fa --tech ont --reads1 cov_1_ont.fq -o riak_cov_ont'
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 247_500
total_p = 2_500

for record in SeqIO.parse(gzip.open('riak_cov_ont.reads.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

In [None]:
#MRSA Ont
cmd = 'source ~/miniforge3/etc/profile.d/conda.sh && conda activate readitandkeep && readItAndKeep --ref_fasta GCA_000013425.1_ASM1342v1_genomic.fna --tech ont --reads1 mrsa_1_ont.fq -o riak_mrsa_ont'
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 247_500
total_p = 2_500

for record in SeqIO.parse(gzip.open('riak_mrsa_ont.reads.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

In [None]:
#MRSA WGS
cmd = 'source ~/miniforge3/etc/profile.d/conda.sh && conda activate readitandkeep && readItAndKeep --ref_fasta GCA_000013425.1_ASM1342v1_genomic.fna --tech illumina --reads1 mrsa_1_wgs.fq --reads2 mrsa_2_wgs.fq -o riak_mrsa_wgs'
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 990_000
total_p = 10_000

for record in SeqIO.parse(gzip.open('riak_mrsa_wgs.reads_1.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

# Step 3: Running Hostile and evaulating the results

In [None]:
#Install hostile conda environment according to readme
cmd = "mamba create -y -n hostile -c conda-forge -c bioconda hostile"
!{cmd}

In [None]:
# Cov WGS

cmd = "source ~/miniforge3/etc/profile.d/conda.sh && conda activate hostile && hostile clean --fastq1 cov_1_wgs.fq --fastq2 cov_2_wgs.fq --out-dir hostile_output"
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 990_000
total_p = 10_000

for record in SeqIO.parse(gzip.open('hostile_output/cov_1_wgs.clean_1.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat/1'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

In [None]:
# MRSA WGS

cmd = "source ~/miniforge3/etc/profile.d/conda.sh && conda activate hostile && hostile clean --fastq1 mrsa_1_wgs.fq --fastq2 mrsa_2_wgs.fq --out-dir hostile_output"
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 990_000
total_p = 10_000

for record in SeqIO.parse(gzip.open('hostile_output/mrsa_1_wgs.clean_1.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat/1'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

In [None]:
# MRSA ONT

cmd = "source ~/miniforge3/etc/profile.d/conda.sh && conda activate hostile && hostile clean --fastq1 mrsa_1_ont.fq --out-dir hostile_output"
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 247_500
total_p = 2_500

for record in SeqIO.parse(gzip.open('hostile_output/mrsa_1_ont.clean.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

In [None]:
# COV ONT

cmd = "source ~/miniforge3/etc/profile.d/conda.sh && conda activate hostile && hostile clean --fastq1 cov_1_ont.fq --out-dir hostile_output"
!{cmd}

In [None]:
#positive: filtered
tn = 0
fn = 0
total_n = 247_500
total_p = 2_500

for record in SeqIO.parse(gzip.open('hostile_output/cov_1_ont.clean.fastq.gz','rt'),'fastq'):
    if record.id.endswith('_pat'):
        tn += 1
    else:
        fn += 1

#filtered falsely     
fp = total_n-tn
#filtered correctly
tp = total_p-fn

print(tn,fn,fp,tp)
print(sum([tn,fn,fp,tp]))

# Step 4: Running SWGTS and evaulating the results

In [None]:
#COMBINED: Uses a database where one reference is labeled as the target (MINIMAP2_POSITIVE_CONTIG)
#NEGATIVE: Only host database is provided and all hits are discarded

swgts_tuples = []

#We need to evaluate all three operation modes
for method in ['NEGATIVE','POSITIVE','COMBINED']:
    for pathogen in ['mrsa','cov']:
        for technology in ['ont','wgs']:
            for buffer_size in BUFFER_SIZES:
                #For Illumina WGS we only need one experiment as this is not dependent on buffer size
                print(method,pathogen,technology,buffer_size)
                if technology == 'wgs' and buffer_size != 3_000_000:
                    continue
                if os.path.exists(f'../swgts/swgts-submit/outfolder_{method}/{pathogen}_1_{technology}.filtered.fq'):
                    print('Detected previous submission, skipping transmission ...')
                else:
                    FILTER_MODE = 'NEGATIVE' if method == 'NEGATIVE' else 'COMBINED'
                    cmd='sed -i -E "s|(FILTER_MODE:\\sstr\\s=\\s)(.+)|\\1\''+\
                    FILTER_MODE+'\'|" ../swgts/input/config_filter.py'
                    !{cmd}
                    cmd= 'sed -i -E "s|(MINIMAP2_POSITIVE_CONTIG:\\sstr\\s=\\s)(.+)|\\1\''+\
                    TARGET_CONTIGS[pathogen]+'\'|" ../swgts/input/config_filter.py'
                    !{cmd}
                    cmd='sed -i -E "s|(MAPPING_PRESET:\\sstr\\s=\\s)(.+)|\\1\''+MAPPING_PRESETS[technology]+'\'|" ../swgts/input/config_filter.py'
                    !{cmd}
                    cmd='sed -i -E "s|(\'databases/)(.+)(\.mmi\'\))|\\1'+\
                        DATABASES[method][pathogen]+\
                        '\\3|" ../swgts/input/config_filter.py'
                    !{cmd}

                    cmd='sed -i -E "s|(MAXIMUM_PENDING_BYTES:\\sint\\s=\\s)(.+)|\\1'+\
                        str(300_000_000)+\
                        '|" ../swgts/input/config_api.py'
                    !{cmd}

                    cmd='docker compose --project-directory ../swgts down 2>/dev/null'
                    !{cmd}
                    print('Docker down')
                    cmd='docker compose --project-directory ../swgts up --detach --wait 2>/dev/null'
                    !{cmd}
                    print('Docker up')
                    sleep(15)
                    #Run Client 
                    pathprefix = '../../rebenching_revision/'
                    add_args_wgs = '' if technology == 'ont' else pathprefix+pathogen+'_2_'+technology+'.fq --paired'

                    cmd = f'. ~/miniforge3/etc/profile.d/conda.sh && conda activate swgts-submit && cd ~/sgftp_sandbox/swgts/swgts-submit/ && python3 -m swgts-submit --outfolder outfolder_{method} --verbose --server https://localhost/api {pathprefix}{pathogen}_1_{technology}.fq {add_args_wgs}'
                    completed_process = subprocess.run(cmd,shell=True)
                    if completed_process.returncode != 0:
                        print(completed_process)
                        assert(False)

                #For Nanopore we need to vary the buffer size since this affects performance
                for buffer_size in reversed(
                    [1_000,2_500,5_000,10_000,25_000,50_000,100_000,300_000,1_000_000,3_000_000]
                ):
                    print(buffer_size)

                    #positive: filtered
                    tn = 0
                    fn = 0
                    total_n = 247_500 if technology == 'ont' else 990_000
                    total_p = 2_500 if technology == 'ont' else 10_000

                    for record in SeqIO.parse(
                        f'../swgts/swgts-submit/outfolder_{method}/{pathogen}_1_{technology}.filtered.fq','fastq'
                    ):
                        if len(record) <= buffer_size:
                            if record.id.endswith('_pat'):
                                tn += 1
                            else:
                                fn += 1

                    #filtered falsely     
                    fp = total_n-tn
                    #filtered correctly
                    tp = total_p-fn

                    swgts_tuples.append((technology,pathogen,buffer_size,method,tn,fn,fp,tp,sum([tn,fn,fp,tp])))         


In [None]:
results_swgts = pd.DataFrame(swgts_tuples,
             columns=[
                 'Technology','Pathogen','Buffer Size','Method',
                 'TN','FN','FP','TP','Sum'
                     ])

results_swgts

In [None]:
results_swgts['Dataset'] = results_swgts['Technology'].apply(lambda x: x.upper())+'_'+results_swgts['Pathogen'].apply(lambda x: x.upper())
results_swgts['Precision'] = results_swgts['TP']/(results_swgts['TP']+results_swgts['FP'])
results_swgts['Recall'] = results_swgts['TP']/(results_swgts['TP']+results_swgts['FN'])
results_swgts['F1'] = 2*results_swgts['TP']/(2*results_swgts['TP']+results_swgts['FN']+results_swgts['FP'])

results_swgts[['Buffer Size','Dataset','Method','TN','FN','FP','TP','Precision','Recall','F1']]

# Step 5: Buffer Size Simulations

We use the InfiniumOmni2 Chip as a basis

## Align real human reads and evaluate SNP coverage

In [None]:
#Read Omnium Chip Annotations (SNPs) and transform into 0-based coordinates
annotations = pd.read_csv('InfiniumOmni2-5-8v1-5_A1.hg19.annotated.txt',sep='\t',dtype={
    'Chr' : str
})
annotation_positions = {}

for chromosome in annotations['Chr'].unique():
    #trasform to 0 based
    annotation_positions[chromosome] = set(int(x)-1 for x in annotations[annotations['Chr']==chromosome]['MapInfo'].unique())


In [None]:
genome_length = 0
with gzip.open('Homo_sapiens.GRCh37.dna.primary_assembly.fa.gz','rt') as reference_file:
    for record in SeqIO.parse(reference_file,'fasta'):
        genome_length += len(record)

In [None]:
snp_count = len(annotations)

In [None]:
print(f'Assuming a genome length of {genome_length} with {snp_count} SNPs')

In [None]:
# Binomial Expected Values

print('Calculating the values based on binomial distribution ...')

tuples_binom = []

for buffer_size in [500,1000,2500,5000,10000,25000,50000,100000,300000,1000000,3000000]:
    mean, var = binom.stats(buffer_size, snp_count/genome_length, moments='mv')
    std_dev = math.sqrt(var)
    print(buffer_size,mean,std_dev)
    tuples_binom.append((buffer_size,mean,std_dev))
    
binom_df = pd.DataFrame(tuples_binom,columns=['Buffer Size','Mean','Std. Deviation'])

In [None]:
random.seed(1856663)
result_tuples = []

aligner = mp.Aligner('Homo_sapiens.GRCh37.dna.primary_assembly.mmi', preset='map-ont', best_n=1)
reads = []
with open('human1.ont.fq','r') as file1, open('human2.ont.fq','r') as file2,open('human3.ont.fq','r') as file3:
    print('1')
    reads += list(
        x[1] for x in FastqGeneralIterator(file1)
    )
    print('2')
    reads+=list(
        x[1] for x in FastqGeneralIterator(file2)
    )
    print('3')
    reads+=list(
        x[1] for x in FastqGeneralIterator(file3)
    )
print('All Nanopore Reads in Memory')

for target_buffer_size in [500,1000,2500,5000,10000,25000,50000,100000,300000,1000000,3000000]:
    print(target_buffer_size)

    for repeat in range(10):
        hits = set()

        random.shuffle(reads)
        buffer_size = 0
        while buffer_size < target_buffer_size:
            for read in reads:
                excess = (len(read) + buffer_size)-target_buffer_size
                if excess > 0:
                    read = read[:-excess]
                for hit in aligner.map(read):
                    if hit.ctg in annotation_positions:
                        for pos in range(hit.r_st,hit.r_en+1):
                            if pos in annotation_positions[hit.ctg]:
                                hits.add((hit.ctg,pos))
                buffer_size += len(read)

        result_tuples.append(
            ('Nanopore',repeat,target_buffer_size,buffer_size,len(hits))
        )


aligner = mp.Aligner('Homo_sapiens.GRCh37.dna.primary_assembly.mmi', preset='sr', best_n=1)
reads = []
with open('human1.wgs1.fq','r') as file11,\
    open('human2.wgs1.fq','r') as file21,\
    open('human3.wgs1.fq','r') as file31,\
    open('human1.wgs2.fq','r') as file12,\
    open('human2.wgs2.fq','r') as file22,\
    open('human3.wgs2.fq','r') as file32:
    print('1')
    reads += list(
        zip(
            (x[1] for x in FastqGeneralIterator(file11)),
            (x[1] for x in FastqGeneralIterator(file12))                
        )
    )
    print('2')
    reads += list(
        zip(
            (x[1] for x in FastqGeneralIterator(file21)),
            (x[1] for x in FastqGeneralIterator(file22))                
        )
    )
    print('3')
    reads += list(
        zip(
            (x[1] for x in FastqGeneralIterator(file31)),
            (x[1] for x in FastqGeneralIterator(file32))                
        )
    )
print('All Illumina Reads in Memory')

for target_buffer_size in [500,1000,2500,5000,10000,25000,50000,100000,300000,1000000,3000000]:
    print(target_buffer_size)
    for repeat in range(10):
        hits = set()

        random.shuffle(reads)
        buffer_size = 0
        while buffer_size < target_buffer_size:
            for read1,read2 in reads:
                excess = (len(read1)+len(read2) + buffer_size)-target_buffer_size
                if excess > 0:
                    if excess > len(read2):
                        excess_read1 = excess-len(read2)
                        read2 = ''
                        read1 = read1[:-excess_read1]
                    else:
                        read2 = read2[:-excess]
                for hit in aligner.map(read1,read2):
                    if hit.ctg in annotation_positions:
                        for pos in range(hit.r_st,hit.r_en+1):
                            if pos in annotation_positions[hit.ctg]:
                                hits.add((hit.ctg,pos))
                buffer_size += (len(read1)+len(read2))

        result_tuples.append(
            ('Illumina',repeat,target_buffer_size,buffer_size,len(hits))
        )
    
aligner = None
reads = None

In [None]:
simulation_df = pd.DataFrame(result_tuples,columns=['Method','Repeat','Target Buffer Size','Buffer Size','Hits'])

In [None]:
# Save intermediates
simulation_df.to_csv('simulation_human_snp.csv',index=False)
binom_df.to_csv('binom_human_snp.csv',index=False)


In [None]:
simulation_df = pd.read_csv('simulation_human_snp.csv')
binom_df = pd.read_csv('binom_human_snp.csv')

## Create Paper Figure

In [None]:
binom_df['Low'] = binom_df['Mean']-binom_df['Std. Deviation']
binom_df['High'] = binom_df['Mean']+binom_df['Std. Deviation']


(alt.Chart(simulation_df).mark_boxplot().encode(
    x=alt.X('Buffer Size:O',title='Buffer Size (Bases)'),
    y=alt.Y(
        'Hits:Q',
        title='Covered Panel SNPs',
        scale=alt.Scale(type='symlog'
                       ),
        axis=alt.Axis(values=[0,10,50,100,500,1000,1500,2000])
    ),
    xOffset='Method:N',
    color='Method:N'
)+alt.Chart(binom_df).mark_line(color='black').encode(
    x='Buffer Size:O',
    y='Mean'
)+alt.Chart(binom_df).mark_point(
    color='black',
    filled=True,
    fill='white',
    strokeWidth=2,
    opacity=1,
    stroke='black'
).encode(
    x='Buffer Size:O',
    y='Mean'
)+alt.Chart(pd.DataFrame([(92)],columns=['Y'])
           ).mark_rule(strokeDash=[2,1]).encode(
    y='Y'
)

).properties(width=300).configure_legend(
    symbolFillColor=None
).resolve_scale(color='shared')



## Combine tables into readable supplementary table

In [None]:
sim_df_transformed = simulation_df.groupby(
    ['Method','Buffer Size']
)['Hits'].agg(
    Mean=np.mean,Std=np.std
).reset_index().pivot(
    index='Buffer Size',
    columns='Method',
    values=['Mean','Std']
)
sim_df_transformed.columns = [col[0] + f"_{col[1]}" for col in sim_df_transformed.columns]

In [None]:
#Combine Tables and add Mean/Std. Deviation
final = pd.merge(sim_df_transformed,binom_df,on='Buffer Size').sort_values(by='Buffer Size')
#Generate Buffer Size Modelig Table
final.to_csv('BufferSizeModeling.csv',index=False)

# Step 6: Performance Benchmarking

## Subsample the data sets

In [None]:
random.seed(270923)

for pathogen in ['mrsa','cov']:
    for technology in ['ont','wgs']:
        print(technology,pathogen)
        #Total read count / bases should be able to fill the buffer
        SAMPLE_SIZE = 50_000 if technology == 'wgs' else 20_000
        if os.path.exists(f'benchmark_{pathogen}_{technology}_1.fq'):
            print(f'Skipping {pathogen}/{technology}, already existing ...')
        if technology == 'wgs':
            records = []
            for r1,r2 in zip(SeqIO.parse(f'{pathogen}_1_wgs.fq','fastq'),SeqIO.parse(f'{pathogen}_2_wgs.fq','fastq')):
                records.append((r1,r2))
            random_sample = random.sample(records,SAMPLE_SIZE)
            r1s = (x[0] for x in random_sample)
            r2s = (x[1] for x in random_sample)
            SeqIO.write(r1s,f'benchmark_{pathogen}_{technology}_1.fq','fastq')
            SeqIO.write(r2s,f'benchmark_{pathogen}_{technology}_2.fq','fastq')
        elif technology == 'ont':
            records = []
            for r in SeqIO.parse(f'{pathogen}_1_ont.fq','fastq'):
                records.append(r)
            random_sample = random.sample(records,SAMPLE_SIZE)
            SeqIO.write(random_sample,f'benchmark_{pathogen}_{technology}_1.fq','fastq')

## Measuring Chunk Size and Buffer Size Impact


In [None]:
REPEATS = 5

method = 'NEGATIVE'

for chunk_size in reversed(CHUNK_SIZES):
    for buffer_size in reversed(BUFFER_SIZES):
        if chunk_size > buffer_size:
            continue
        for technology in ['ont','wgs']:
            if technology == 'wgs':
                continue
            for pathogen in ['mrsa','cov']:
                if pathogen == 'mrsa':
                    continue

                print(chunk_size,buffer_size,pathogen,technology)
                if os.path.exists(f'bufferchunk/{chunk_size}_{buffer_size}_{pathogen}_{technology}.csv'):
                    continue

                client = paramiko.SSHClient()
                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                try:

                    client.connect(hostname=server_machine, username='root')
                    #Set Config Accordingly
                    FILTER_MODE = 'NEGATIVE' if method == 'NEGATIVE' else 'COMBINED'
                    client.exec_command(
                        'sed -i -E "s|(FILTER_MODE:\\sstr\\s=\\s)(.+)|\\1\''+FILTER_MODE+'\'|" /srv/swgts-deploy/input/config_filter.py'
                    )
                    client.exec_command(
                        'sed -i -E "s|(MINIMAP2_POSITIVE_CONTIG:\\sstr\\s=\\s)(.+)|\\1\''+TARGET_CONTIGS[pathogen]+'\'|" /srv/swgts-deploy/input/config_filter.py'
                    )
                    client.exec_command(
                        'sed -i -E "s|(MAPPING_PRESET:\\sstr\\s=\\s)(.+)|\\1\''+MAPPING_PRESETS[technology]+'\'|" /srv/swgts-deploy/input/config_filter.py'
                    )
                    client.exec_command(
                        'sed -i -E "s|(\'databases/)(.+)(\.mmi\'\))|\\1'+\
                        DATABASES[method][pathogen]+\
                        '\\3|" /srv/swgts-deploy/input/config_filter.py'
                    )

                    #Adjust buffer
                    client.exec_command(
                        'sed -i -E "s|(MAXIMUM_PENDING_BYTES:\\sint\\s=\\s)(.+)|\\1'+str(buffer_size)+'|" /srv/swgts-deploy/input/config_api.py'
                    ) 

                    stdin, stdout, stderr = client.exec_command(
                        'docker compose --project-directory /srv/swgts-deploy down -v'
                    )
                    stdout.readlines() #Pseudo-Block
                    stdin, stdout, stderr = client.exec_command(
                        'docker compose --project-directory /srv/swgts-deploy up --detach --wait'
                    )                
                    stdout.readlines() #Pseudo-Block

                except:
                    print("[!] Cannot connect to the SSH Server")
                    assert(False)

                client.close()
                #Run Client 
                sleep(12) #Server needs a couple of seconds post docker return code
                pathprefix = '../../rebenching_revision/'
                add_args_wgs = '' if technology == 'ont' else pathprefix+'benchmark_'+pathogen+'_'+technology+'_2'+'.fq --paired'

                cmd = f'. ~/miniforge3/etc/profile.d/conda.sh && conda activate swgts-submit && cd ~/sgftp_sandbox/swgts/swgts-submit/ && python3 -m swgts-submit --count {chunk_size} --server https://{server_machine}/api {pathprefix}benchmark_{pathogen}_{technology}_1.fq {add_args_wgs} --verbose'
                
                with open(f'bufferchunk/{chunk_size}_{buffer_size}_{pathogen}_{technology}.csv','w') as outfile,\
                     open(f'bufferchunk/{chunk_size}_{buffer_size}_{pathogen}_{technology}.output','w') as outstream:

                    #Repeat X times
                    for repeat in range(REPEATS+1):
                        print(f'Repeat: {repeat}')
                        pre = time.time()
                        completed_process = subprocess.run(cmd,shell=True,stdout = outstream,stderr=outstream)
                        if completed_process.returncode != 0:
                            print(completed_process)
                            assert(False)
                        post = time.time()
                        if repeat == 0:
                            #Discard first measurement to get rid of startup effects
                            continue
                        elapsed = post-pre
                        outfile.write(
                            f'{pathogen},{elapsed},{repeat},{chunk_size},{buffer_size},{technology}\n'
                        )


In [None]:
length_tuples = []
server_tuples = []
lengths_histo = []

#Parse exact base counts for rates
for pathogen in ['mrsa','cov']:
    #ont
    total = 0
    server = {x:0 for x in BUFFER_SIZES}
    for read in SeqIO.parse(f'benchmark_{pathogen}_ont_1.fq','fastq'):
        lengths_histo.append((pathogen,len(read),read.id.endswith('hom')))
        total += len(read)
        for buffer_size in BUFFER_SIZES:
            if len(read) <= buffer_size:
                server[buffer_size] += len(read)
    length_tuples.append((pathogen,'ont',total))
    for k,v in server.items():
        server_tuples.append(
            (pathogen,'ont',k,v/total)
        )

    #wgs
    total = 0
    server = {x:0 for x in BUFFER_SIZES}
    for r1,r2 in zip(
        SeqIO.parse(f'benchmark_{pathogen}_wgs_1.fq','fastq'),
        SeqIO.parse(f'benchmark_{pathogen}_wgs_2.fq','fastq')
    ):
        total += len(r1)+len(r2)
        for buffer_size in BUFFER_SIZES:
            if (len(r1)+len(r2)) <= buffer_size:
                server[buffer_size] += (len(r1)+len(r2))
    length_tuples.append((pathogen,'wgs',total))
    for k,v in server.items():
        server_tuples.append(
            (pathogen,'wgs',k,v/total)
        )

In [None]:
length_df = pd.DataFrame(length_tuples,columns=['Pathogen','Technology','Bases'])


In [None]:
server_df = pd.DataFrame(server_tuples,columns=['Pathogen','Technology','Buffer Size','Fraction Processed'])


In [None]:
from datetime import date

frames = []
for chunk_size in CHUNK_SIZES:
    for buffer_size in BUFFER_SIZES:
        for technology in ['ont','wgs']:
            if chunk_size > buffer_size:
                continue

            for pathogen in ['mrsa','cov']:
                if os.path.exists(f'bufferchunk/{chunk_size}_{buffer_size}_{pathogen}_{technology}.csv'):
                    frames.append(
                        pd.read_csv(
                            f'bufferchunk/{chunk_size}_{buffer_size}_{pathogen}_{technology}.csv',
                            header=None,sep=',',
                            names=['Pathogen','Time (s)','Repeat','Chunk Size','Buffer Size','Technology']
                        )
                    )
                

benchmark = pd.concat(frames)
benchmark = pd.merge(benchmark,length_df,on=['Pathogen','Technology'],how='left')
benchmark = pd.merge(benchmark,server_df,on=['Pathogen','Technology','Buffer Size'],how='left')

benchmark['Bases/s'] = benchmark['Bases'] / benchmark['Time (s)'] 
benchmark['KBases/s'] = round(benchmark['Bases/s'] / 1000)
benchmark['Percentage Processed'] = benchmark['Fraction Processed'].apply(lambda x : round(x*100,2))

benchmark['Technology'] = benchmark['Technology'].map(
    {
        'ont' : 'ONT',
        'wgs' : 'Illumina'
    }
)

benchmark['Pathogen'] = benchmark['Pathogen'].map(
    {
        'mrsa' : 'MRSA',
        'cov' : 'CoV'
    }
)

aggregated_benchmark = benchmark.groupby(
    ['Pathogen','Technology','Buffer Size','Chunk Size','Percentage Processed'],as_index=False
)['Time (s)','Bases/s','KBases/s'].mean()



In [None]:
aggregated_benchmark.to_csv(
    f'BenchmarkChunkBuffer_{date.today()}.csv',index=False)

In [None]:
length_distributions = pd.DataFrame(lengths_histo,columns=['Pathogen','Length','Human'])
def get_bin(x):
    for size in BUFFER_SIZES:
        if x <= size:
            return size
length_distributions['Bin'] = length_distributions['Length'].apply( get_bin)
length_distributions['Read Type'] = length_distributions['Human'].map({
    False : 'Pathogen',
    True : 'Human'
})

length_distributions['Pathogen'] = length_distributions['Pathogen'].map(
    {
        'mrsa' : 'MRSA',
        'cov' : 'CoV'
    }
)

## Creating the Panel for the Paper Figure

In [None]:
data = benchmark.groupby(['Buffer Size','Chunk Size','Pathogen','Technology'],as_index=False)['KBases/s'].median()
alt.data_transformers.disable_max_rows()
c1 =(alt.Chart(
    data, height=300,width=300
).mark_rect().encode(
    x=alt.X('Buffer Size:O',title=None),
    y='Chunk Size:O',
    color=alt.Color('KBases/s',scale=alt.Scale(type='symlog'),title='Kilobases / s')
)+alt.Chart(
    data, height=300,width=300
).mark_text().encode(
    x=alt.X('Buffer Size:O',title=None,axis=None),
    y='Chunk Size:O',
    text='KBases/s'
)).facet(
    row='Technology',
    column='Pathogen'
)
c2 = alt.Chart(length_distributions,height=300,width=300).mark_bar().encode(
    x=alt.X('Bin:O',title='Buffer Size'),
    y=alt.Y('count(Bin)',scale=alt.Scale(type='symlog'),title='Additional Reads <= Buffer',
            axis=alt.Axis(values=[0,10,100,1000,10000,20000],format=".1e")),
    xOffset='Read Type',
    color='Read Type',
    column=alt.Column('Pathogen',title=None,header=None),
    tooltip=['count(Bin)']
)

c1&c2

In [None]:
data = benchmark.groupby(
    ['Buffer Size','Chunk Size','Pathogen','Technology'],as_index=False
)['KBases/s'].median().groupby(
    ['Buffer Size','Pathogen','Technology'],as_index=False
).max()
data['Dataset'] = data['Technology']+'/'+data['Pathogen']

c1 = alt.Chart(benchmark,height=300).mark_rect(point=True).encode(
    x='Buffer Size:O',
    color='mean(KBases/s)',
    y='Chunk Size:O'
)+alt.Chart(benchmark,height=300).mark_text().encode(
    x='Buffer Size:O',
    text=alt.Text('mean(KBases/s)',format=".0f"),
    y='Chunk Size:O'
)


c1.properties(width=300)

## Evaluating Client/Server Scaling

We evaluate the maximum possible bandwith between the machines

In [None]:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
    client.connect(hostname=server_machine, username='root')
    client.exec_command('iperf3 -s -p 443 -1')
    cmd = f'iperf3 --verbose -c {server_machine} -p 443 > iperf3.output'
    print('Client Starting ...')
    !{cmd}
except:
    print("[!] Cannot connect to the SSH Server")
    exit()
    
client.close()

with open('iperf3.output','r') as infile:
    print(infile.read())

In [None]:
CLIENT_COUNTS=range(1,31)#[1,2,3,20,30]#range(10,11)
MEASUREMENT_PERIOD = 30

CONFIGURATIONS = {
1:  {'chunk_size' : 2_500, 'buffer_size' : 10_000, 'pathogen' : 'mrsa','threads' : 7,'technology':'wgs'},
2:    {'chunk_size' : 2_500, 'buffer_size' : 10_000, 'pathogen' : 'mrsa','threads' : 3,'technology':'wgs'},
4:    {'chunk_size' : 25_000, 'buffer_size' : 100_000, 'pathogen' : 'cov','threads' : 3,'technology':'ont'},
#5:    {'chunk_size' : 25_000, 'buffer_size' : 100_000, 'pathogen' : 'cov','threads' : 5,'technology':'ont'},
6:    {'chunk_size' : 25_000, 'buffer_size' : 100_000, 'pathogen' : 'cov','threads' : 7,'technology':'ont'},
#7:    {'chunk_size' : 25_000, 'buffer_size' : 100_000, 'pathogen' : 'cov','threads' : 9,'technology':'ont'}
#8:    {'chunk_size' : 25_000, 'buffer_size' : 100_000, 'pathogen' : 'cov','threads' : 1,'technology':'ont'}

}


description_map = { 
    1: '2.500 Chunk Size 10.000 Buffer Size MRSA Illumina 7 Threads',
    2: '2.500 Chunk Size 10.000 Buffer Size MRSA Illumina 3 Threads',
    3: '25.000 Chunk Size 100.000 Buffer Size COV ONT 7 Threads',
    4: '25.000 Chunk Size 100.000 Buffer Size COV ONT 3 Threads',
    5: '25.000 Chunk Size 100.000 Buffer Size COV ONT 5 Threads',
    6: '25.000 Chunk Size 100.000 Buffer Size COV ONT 7 Threads',
    7: '25.000 Chunk Size 100.000 Buffer Size COV ONT 9 Threads',
    8: '25.000 Chunk Size 100.000 Buffer Size COV ONT 1 Threads',


}

In [None]:
method = 'NEGATIVE'

result_tuples = []

for idx,configuration in CONFIGURATIONS.items():
    
    print(f'Running configuration {idx} : {configuration} ')
    
    pathogen = configuration['pathogen']
    technology = configuration['technology']
    buffer_size = configuration['buffer_size']
    chunk_size = configuration['chunk_size']
    
    processes = {}

    vmstat_client = paramiko.SSHClient()
    vmstat_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    dockerstat_client = paramiko.SSHClient()
    dockerstat_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    ifstat_client = paramiko.SSHClient()
    ifstat_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    redis_client = paramiko.SSHClient()
    redis_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:

        client.connect(hostname=server_machine, username='root')
       
        #Set Config Accordingly
        FILTER_MODE = 'NEGATIVE' if method == 'NEGATIVE' else 'COMBINED'
        client.exec_command(
            'sed -i -E "s|(FILTER_MODE:\\sstr\\s=\\s)(.+)|\\1\''+FILTER_MODE+'\'|" /srv/swgts-deploy/input/config_filter.py'
        )
        client.exec_command(
            'sed -i -E "s|(MINIMAP2_POSITIVE_CONTIG:\\sstr\\s=\\s)(.+)|\\1\''+TARGET_CONTIGS[pathogen]+'\'|" /srv/swgts-deploy/input/config_filter.py'
        )
        client.exec_command(
            'sed -i -E "s|(MAPPING_PRESET:\\sstr\\s=\\s)(.+)|\\1\''+MAPPING_PRESETS[technology]+'\'|" /srv/swgts-deploy/input/config_filter.py'
        )
        
        client.exec_command(
            'sed -i -E "s|(\'databases/)(.+)(\.mmi\'\))|\\1'+\
            DATABASES[method][pathogen]+\
            '\\3|" /srv/swgts-deploy/input/config_filter.py'
        )

        #Adjust buffer
        client.exec_command(
            'sed -i -E "s|(MAXIMUM_PENDING_BYTES:\\sint\\s=\\s)(.+)|\\1'+str(buffer_size)+'|" /srv/swgts-deploy/input/config_api.py'
        ) 
        
        #Adjust worker threadsMC
        client.exec_command(
            'sed -i -E "s|(WORKER_THREADS:\\sint\\s=\\s)(.+)|\\1'+str(configuration['threads'])+'|" /srv/swgts-deploy/input/config_filter.py'
        ) 

        
        for client_count in CLIENT_COUNTS:
            
            print(f'Using {client_count} clients')
            
            stdin, stdout, stderr = client.exec_command(
                'docker compose --project-directory /srv/swgts-deploy down -v'
            )
            stdout.readlines() #Pseudo-Block
            stdin, stdout, stderr = client.exec_command(
                'docker compose --project-directory /srv/swgts-deploy up --detach'
            )                
            stdout.readlines() #Pseudo-Block
            #Run Client 
            sleep(12) #Server needs a couple of seconds post docker return code


            !{f'curl --insecure https://{server_machine}/api/server-status'}
            
            logfiles = []
            
            vmstat_client.connect(hostname=server_machine, username='root')
            ifstat_client.connect(hostname=server_machine, username='root')
            redis_client.connect(hostname=server_machine, username='root')
            dockerstat_client.connect(hostname=server_machine, username='root')

            
            vmstat_client.exec_command(
                f'vmstat -n -t 1 > benchmarks/benchmark_{idx}_{client_count}_vm.txt',
                get_pty=True
            )

            ifstat_client.exec_command(
                f'while true; do echo "$(date)" && iftop -i enX0 -t -s 1 | grep "Total send and receive rate" | tr "\\n" "\\t" || break; done > /root/benchmarks/benchmark_{idx}_{client_count}_if.txt\n',
                get_pty=True
            )
            
            dockerstat_client.exec_command(
                f'while true; do docker stats --format "{{{{.Name}}}}\t{{{{.CPUPerc}}}}\t{{{{.NetIO}}}}\t{{{{.BlockIO}}}}" || break; done > /root/benchmarks/benchmark_{idx}_{client_count}_do.txt\n',
                get_pty=True
            )
            
            
            for clientidx in range(client_count):
                
                logfile = open(f'benchmarks_client_counts/benchmark_{idx}_{client_count}_{clientidx}_cl.txt','w')
                
                processes[clientidx] =  Popen(
                    f'. ~/miniforge3/etc/profile.d/conda.sh && conda activate swgts-submit'+
                    f' && echo "$(date)" && cd ~/sgftp_sandbox/swgts/swgts-submit/ &&'+
                    f' python3 '+
                    f'-m swgts-submit --count {chunk_size} --verbose --server https://{server_machine}/api'+
                    f' {pathprefix}{pathogen}_1_{technology}.fq {add_args_wgs} 2>&1 | tr "\\015" "\n"',
                    shell=True,
                    preexec_fn= os.setsid, 
                    stdout = logfile,
                    stderr = logfile
                )               
                
                
                logfiles.append(logfile)
                
            sleep(MEASUREMENT_PERIOD)
            
            stdin, stdout, stderr = redis_client.exec_command(
                f'docker exec -it swgts-deploy-redis-1 redis-cli GET stats:bases > benchmarks/benchmark_{idx}_{client_count}_bs.txt',
                get_pty=True
            )
            stdout.readlines() #Pseudo-Block
                        
            vmstat_client.close()
            ifstat_client.close()
            redis_client.close()
            dockerstat_client.close()
            
            for p in processes.values():
                os.killpg(os.getpgid(p.pid), signal.SIGTERM) 

            for l in logfiles:
                l.close()
                
            #break
        client.close()
        
    except Exception as e:
        print(e)
        redis_client.close()
        vmstat_client.close()
        ifstat_client.close()
        dockerstat_client.close()
        for p in processes.values():
            os.killpg(os.getpgid(p.pid), signal.SIGTERM) 
        for l in logfiles:
            l.close()
    #break
            
processes = None

## Transfering server-side benchmark files for local evaluation

Requires rsync to run

In [None]:
cmd = f'rsync root@{server_machine}:~/benchmarks/benchmark* benchmarks_client_counts --info=progress2'

!{cmd}

In [None]:
tuples = []

for s in CONFIGURATIONS:
    for c in CLIENT_COUNTS:
        try:
            #Step 1: Get Client
            with open(f'benchmarks_client_counts/benchmark_{s}_{c}_if.txt','r') as if_file:
                rates = []
                dead_clients = 0
                timeouts = 0
                wrong_estimates = 0
                for cidx in range(c):
                    with open(f'benchmarks_client_counts/benchmark_{s}_{c}_{cidx}_cl.txt','r') as cl_file:
                        #skip first and last 1/5 of lines to avoid measuring startup/end effects
                        lines = cl_file.readlines()
                        fifth = len(lines)//5
                        date = dateutil.parser.parse(lines[0])

                        #Scan for crashes
                        crash_found = False
                        for line in lines:
                            if line.startswith('Traceback'):
                                dead_clients += 1
                                crash_found = True
                                break
                        if crash_found:
                            break

                        current_line_is_timeout = False
                        for line in lines[fifth:-fifth]:
                            if line.startswith('Received timeout'):
                                if current_line_is_timeout:
                                    wrong_estimates += 1
                                else:
                                    current_line_is_timeout = True
                                timeouts += 1
                                continue
                            else:
                                current_line_is_timeout = False
                            m=re.search(',(.+?)[(reads/s)|(read pairs/s)]',line)
                            if m == None:
                                continue #EOF
                            try:
                                if '?' in m.group(1):
                                    continue
                                rates.append(float(m.group(1)))
                            except:
                                #No rate estimate yet
                                if 'reads/s' in line:
                                    rates.append(0)
                                else:
                                    print(s,c,cidx,line,m.group(1))
                                    assert(False)

                mean_client_rate = np.mean(rates)
                std_client_rate = np.std(rates)

                vm_data = pd.read_csv(f'benchmarks_client_counts/benchmark_{s}_{c}_vm.txt', 
                                      delim_whitespace=True, header=None,skiprows=2, names=[
         'r'  ,'b'  , 'swpd' ,'free' ,  'buff', 'cache','si',
                                          'so','bi','bo' ,'in', 'cs' ,'us', 'sy' ,'id', 'wa', 'st','date','time'           
                ])

                mean_user_cpu = np.mean(vm_data['us'])
                std_user_cpu = np.std(vm_data['us'])

                mean_kernel_cpu = np.mean(vm_data['sy'])
                std_kernel_cpu = np.std(vm_data['sy'])

                mean_idle_cpu = np.mean(vm_data['id'])
                std_idle_cpu = np.std(vm_data['id'])

                mean_waitio_cpu = np.mean(vm_data['wa'])
                std_waitio_cpu = np.std(vm_data['wa'])

                serv_rates = []
                lines = if_file.readlines()
                fifth = len(lines)//5       
                for line in lines[fifth:-fifth]:
                    data = line.split()[5]
                    if data.endswith('Kb'):
                        value /= 1024
                        value = float(data[:-2])
                    elif data.endswith('Mb'):
                        value = float(data[:-2])
                    elif data.endswith('b'):
                        value = float(data[:-1])
                        value /= (1024*1024)
                    else:
                        #Something went wrong
                        print(data)
                        assert(False)
                    serv_rates.append(value)
                mean_serv_rate = np.mean(serv_rates)
                std_serv_rate = np.std(serv_rates)

                server_rate_bases = -1
                with open(f'benchmarks_client_counts/benchmark_{s}_{c}_bs.txt','r') as bsfile:
                    server_rate_bases = int(bsfile.read()[1:-2])

                tdf = pd.read_csv(f'benchmarks_client_counts/benchmark_{s}_{c}_do.txt',
                                  sep='\t',skiprows=[-1],
                                 header=None,names=['Container','CPU','NetIO','BlockIO']).dropna()
                tdf['Service'] = tdf['Container'].apply(lambda x: x.split('-')[-2])
                tdf['CPU'] = tdf['CPU'].apply(lambda x: float(x[:-1]))

                cpumeans = tdf.groupby('Service')[['CPU']].mean()
                tuples.append(
                    (
                        c,
                        s,
                        mean_client_rate,
                        std_client_rate,
                        mean_user_cpu,
                        std_user_cpu,
                        mean_kernel_cpu,
                        std_kernel_cpu,
                        mean_idle_cpu,
                        std_idle_cpu,
                        mean_waitio_cpu,
                        std_waitio_cpu,
                        mean_serv_rate,
                        std_serv_rate,
                        server_rate_bases,
                        timeouts,
                        dead_clients,
                        wrong_estimates,
                        cpumeans.loc['redis']['CPU'],
                        cpumeans.loc['api']['CPU'],
                        cpumeans.loc['traefik']['CPU'],
                        cpumeans.loc['filter']['CPU']
                    )
                )
        except:
            print(c,s)
df = pd.DataFrame(tuples,columns=[
    'Clients',
    'Scenario',
    'Client Reads/s Rate (Mean)',
    'Client Reads/s Rate (Std. Dev.)',
    'Server CPU Usage User (Mean)',
    'Server CPU Usage User (Std. Dev.)',
    'Server CPU Usage Kernel (Mean)',
    'Server CPU Usage Kernel (Std. Dev.)',
    'Server CPU Usage Idle (Mean)',
    'Server CPU Usage Idle (Std. Dev.)',
    'Server CPU Usage Wait I/O (Mean)',
    'Server CPU Usage wait I/O (Std. Dev.)',
    'Server Bandwith Usage MByte/s (Mean)',
    'Server Bandwith Usage MByte/s (Std. Dev.)',
    'Server Total Bases Processed',
    'Timeouts',
    'Dead Clients',
    'Wrong Estimates',
    'CPU Redis',
    'CPU API',
    'CPU Traefik',
    'CPU Filter'

])

In [None]:
df['Scenario Description'] = df['Scenario'].map(description_map)
df['Chunk'] = df['Scenario Description'].apply(lambda x: x.split()[0])
df['Throughput MBases/s'] = df['Server Total Bases Processed']/(MEASUREMENT_PERIOD*1_000_000)
df['MBases/s processed per Client'] = df['Server Total Bases Processed']/(df['Clients']*MEASUREMENT_PERIOD*1_000_000)
df['Server CPU Usage User (Mean) + Kernel (Mean)'] = df['Server CPU Usage User (Mean)']+df['Server CPU Usage Kernel (Mean)']
df['Threads'] = df['Scenario Description'].apply(lambda x : int(x.split()[-2]))
df['Wrong Estimates Fraction'] = df['Wrong Estimates']/df['Timeouts']

In [None]:
def gen_chart(frame):
    charts = []
    for measurement in [
        'Throughput MBases/s',
        'MBases/s processed per Client',
        'Server CPU Usage User (Mean) + Kernel (Mean)',
        'Server Bandwith Usage MByte/s (Mean)'
    ]:
        c = alt.Chart(frame).mark_line().encode(
            x='Clients',
            y=str(measurement),
            color=alt.Color('Scenario Description:N',sort=[
                '2.500 Chunk Size 10.000 Buffer Size MRSA Illumina 7 Threads',
                '2.500 Chunk Size 10.000 Buffer Size MRSA Illumina 3 Threads',
                '25.000 Chunk Size 100.000 Buffer Size COV ONT 7 Threads',
                '25.000 Chunk Size 100.000 Buffer Size COV ONT 3 Threads'
            ])
        )
        charts.append(c)
    return reduce(lambda x,y : x|y,charts)

c1 = gen_chart(df[df['Chunk'] == '2.500'])
c2 = gen_chart(df[df['Chunk'] != '2.500'])


(c1&c2).configure_legend(labelLimit=0,orient='bottom')

In [None]:
subframe = df


charts = []
for measurement in [
    'Dead Clients',
    'Timeouts',
    'MBases/s processed per Client',
    'Server CPU Utilization (%)',
    'Server Bandwith Usage MByte/s (Mean)',
    'Wrong Estimates Fraction',
    'CPU Traefik',
    'CPU Redis',
    'CPU API',
    'CPU Filter'
]:
    c = alt.Chart(subframe).mark_rect().encode(
        x='Threads:O',
        y='Clients:O',
        color=str(measurement),
        tooltip=[str(measurement)]
    )
    charts.append(c)
    
reduce(lambda x,y : x|y,charts).configure_legend(labelLimit=0,orient='bottom').resolve_scale(color='independent')

In [None]:
df.to_csv('BenchmarkSpeedClients.csv',index=False)

In [None]:
charts = []
for measurement in [
     'Server CPU Usage User (Mean)',
     'Server CPU Usage Kernel (Mean)',
     'Server CPU Usage Idle (Mean)',
     'Senrver CPU Usage Wait I/O (Mean)'
]:
    c = alt.Chart(df).mark_line().encode(
        x='Clients',
        y=str(measurement),
        color='Scenario Description:N'
    )
    charts.append(c)
    
reduce(lambda x,y : x|y,charts).configure_legend(labelLimit=0,orient='bottom')

## Comparability of Operation Modes w.r.t. Speed

In [None]:
time_tuples = []

#We need to evaluate all three operation modes
for method in ['NEGATIVE','POSITIVE','COMBINED']:
    for pathogen in ['cov']:
        for technology in ['ont']:
            buffer_size = 3_000_000
            chunk_size = 100_000
            #For Illumina WGS we only need one experiment as this is not dependent on buffer size
            print(method,pathogen,technology)
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            try:

                client.connect(hostname=server_machine, username='root')
                FILTER_MODE = 'NEGATIVE' if method == 'NEGATIVE' else 'COMBINED'
                
                cmd='sed -i -E "s|(FILTER_MODE:\\sstr\\s=\\s)(.+)|\\1\''+\
                FILTER_MODE+'\'|" ../swgts/input/config_filter.py'
                client.exec_command(cmd)
                cmd= 'sed -i -E "s|(MINIMAP2_POSITIVE_CONTIG:\\sstr\\s=\\s)(.+)|\\1\''+\
                TARGET_CONTIGS[pathogen]+'\'|" ../swgts/input/config_filter.py'
                client.exec_command(cmd)
                cmd='sed -i -E "s|(MAPPING_PRESET:\\sstr\\s=\\s)(.+)|\\1\''+MAPPING_PRESETS[technology]+'\'|" ../swgts/input/config_filter.py'
                client.exec_command(cmd)
                cmd='sed -i -E "s|(\'databases/)(.+)(\.mmi\'\))|\\1'+\
                    DATABASES[method][pathogen]+\
                    '\\3|" ../swgts/input/config_filter.py'
                client.exec_command(cmd)

                cmd ='sed -i -E "s|(MAXIMUM_PENDING_BYTES:\\sint\\s=\\s)(.+)|\\1'+str(buffer_size)+'|" /srv/swgts-deploy/input/config_api.py'

                client.exec_command(cmd)

                stdin, stdout, stderr = client.exec_command(
                    'docker compose --project-directory /srv/swgts-deploy down -v'
                )
                stdout.readlines() #Pseudo-Block
                stdin, stdout, stderr = client.exec_command(
                    'docker compose --project-directory /srv/swgts-deploy up --detach --wait'
                )                
                stdout.readlines() #Pseudo-Block
                sleep(12)
                client.close()
            except:
                pass
            for repeat in range(1,4):
            
                #Run Client 
                pathprefix = '../../rebenching_revision/'

                cmd = f'. ~/miniforge3/etc/profile.d/conda.sh && conda activate swgts-submit && cd ~/sgftp_sandbox/swgts/swgts-submit/ && python3 -m swgts-submit --count {chunk_size} --server https://{server_machine}/api {pathprefix}{pathogen}_1_{technology}.fq {add_args_wgs} --verbose'
                now = time.time()
                completed_process = subprocess.run(cmd,shell=True)
                elapsed = time.time()-now
                time_tuples.append((elapsed,method,pathogen,technology,repeat,chunk_size,buffer_size))
                if completed_process.returncode != 0:
                    print(completed_process)
                    assert(False)


In [None]:
length_tuples = []

#Parse exact base counts for rates
for pathogen in ['cov']:
    #ont
    total = 0
    server = {x:0 for x in BUFFER_SIZES}
    for read in SeqIO.parse(f'{pathogen}_1_ont.fq','fastq'):
        lengths_histo.append((pathogen,len(read),read.id.endswith('hom')))
        total += len(read)
        for buffer_size in BUFFER_SIZES:
            if len(read) <= buffer_size:
                server[buffer_size] += len(read)
    length_tuples.append((pathogen,'ont',total))
length_df = pd.DataFrame(length_tuples,columns=['Pathogen','Technology','Bases'])


In [None]:
time_df = pd.DataFrame(
    time_tuples,
    columns=[
        'Time','Mode','Pathogen','Technology','Repeat','Chunk','Buffer'
    ]
)
time_df = pd.merge(time_df,length_df,on=['Pathogen','Technology'],how='left')
time_df['Bases/s'] = time_df['Bases']/time_df['Time']
time_df.groupby('Mode')['Bases/s'].mean()
time_df['Mode'] = time_df['Mode'].map({
    'NEGATIVE' : 'Host Subtraction',
    'POSITIVE' : 'Simple Pathogen Retention',
    'COMBINED' : 'Host-Competitive Pathogen Retention'
})
time_df

In [None]:
alt.Chart(time_df).mark_point().encode(
    x='Mode',
    y='Bases/s'
)