# Measure performance for different methods of querying variant data

First let's print out the exact git commit of my thesis-index code I am using.

In [1]:
!git -C ../../../thesis-index rev-parse HEAD

41a96d85dbbb3b5910206a5fff4455dcb1cb1785


In [2]:
from storage.variant.io.SnippyVariantsReader import SnippyVariantsReader
from pathlib import Path
from os import listdir, path

snippy_dir = Path('..', 'phylogeny')
sample_dirs = [snippy_dir / d for d in listdir(snippy_dir) if path.isdir(snippy_dir / d)]

variants_reader = SnippyVariantsReader(sample_dirs)

A function used to wrap around another function and measure runtime

In [4]:
import time
from statistics import mean, stdev
import pandas as pd

def get_runtime(func, **args):
    start = time.time()
    value = func(**args)
    end = time.time()
    runtime = end - start
    return value, end - start

def runtime_iteration(func, table: pd.DataFrame = None, name: str = None, num_samples: int = None, iterations = 5, print_result=True, **args):
    runtimes = []
    results = []
    for i in range(0, iterations):
        result, runtime = get_runtime(func, **args)
        runtimes.append(runtime)
        results.append(result)
    
    m_runtime = mean(runtimes)
    s_runtime = stdev(runtimes) if len(runtimes) > 1 else 0
    
    if print_result:
        result_value = results[0]
        print(f'Runtime (m+-s): {m_runtime:0.2f} +- {s_runtime:0.2f} seconds, iters {iterations}, result {set(results)}')
    else:
        result_value = pd.NA
        print(f'Runtime (m+-s): {m_runtime:0.2f} +- {s_runtime:0.2f} seconds, iters {iterations}')
        
    if table is not None:
        # Drop previously-ran matching results so I don't have duplicate cases
        drop_index = table[(table['Case'] == name) & (table['Number of Samples'] == num_samples)].index
        table = table.drop(drop_index, axis='index')

        table = table.append({
            'Case': name,
            'Number of Samples': num_samples,
            'Iterations': iterations,
            'Mean runtime (s)': m_runtime,
            'Stdev runtime (s)': s_runtime,
            'Result': result_value
        }, ignore_index=True)
              
    return table, results

# Test variants union

Let's test getting set of all variants among a list of samples (union)

In [14]:
case1_samples = ['SH12-001']
case2_samples = ["SH14-004" ,"SH13-001" ,"SH14-011" ,"SH14-016" ,"SH09-29" ,"SH12-008" ,"SH14-010" ,"SH14-028" ,"SH10-30" ,"SH12-007"]

results_table = pd.DataFrame(data=[], columns=[
    'Case',
    'Number of Samples',
    'Iterations',
    'Mean runtime (s)',
    'Stdev runtime (s)',
    'Result',
])
results_table

Unnamed: 0,Case,Number of Samples,Iterations,Mean runtime (s),Stdev runtime (s),Result


## 1. From VCF files

### 1.1. VCF including loading file

In [6]:
from typing import List, Set

def get_union_from_vcf(samples: List[str]) -> int:
    sample_dirs = [snippy_dir / s for s in samples]
    variants_reader = SnippyVariantsReader(sample_dirs)
    var_df = variants_reader.get_variants_table()
    
    vars_union = set()
    var_df['SPDI'] = var_df['CHROM'] + ':' + var_df['POS'].astype(str) + ':' + var_df['REF'] + ':' + var_df['ALT']
    for sample in samples:
        vars_union = vars_union.union(set(var_df[var_df['SAMPLE'] == sample]['SPDI'].tolist()))
        
    return len(vars_union)

print(f'Case 1: Single ({len(case1_samples)}) sample')
results_table, results = runtime_iteration(get_union_from_vcf, results_table, 'Load VCF as DataFrame', len(case1_samples), samples=case1_samples)

print(f'\nCase 2: {len(case2_samples)} samples')
results_table, results = runtime_iteration(get_union_from_vcf, results_table, 'Load VCF as DataFrame', len(case2_samples), samples=case2_samples)
results_table

Case 1: Single (1) sample
Runtime (m+-s): 1.14 +- 0.03 seconds, iters 5, result {17193}

Case 2: 10 samples
Runtime (m+-s): 11.91 +- 0.06 seconds, iters 5, result {36920}


Unnamed: 0,Case,Number of Samples,Iterations,Mean runtime (s),Stdev runtime (s),Result
0,Load VCF as DataFrame,1,5,1.135049,0.030889,17193
1,Load VCF as DataFrame,10,5,11.914648,0.061704,36920


### 1.2. VCF without loading file

In [7]:
import pandas as pd

def get_union_from_vcf_no_load(var_df: pd.DataFrame) -> int:
    vars_union = set()
    samples = set(var_df['SAMPLE'].tolist())
    var_df['SPDI'] = var_df['CHROM'] + ':' + var_df['POS'].astype(str) + ':' + var_df['REF'] + ':' + var_df['ALT']
    for sample in samples:
        vars_union = vars_union.union(set(var_df[var_df['SAMPLE'] == sample]['SPDI'].tolist()))
        
    return len(vars_union)

print(f'Case 1: Single ({len(case1_samples)}) sample')

sample_dirs = [snippy_dir / s for s in case1_samples]
variants_reader = SnippyVariantsReader(sample_dirs)
var_df = variants_reader.get_variants_table()
results_table, results = runtime_iteration(get_union_from_vcf_no_load, results_table, 'Load VCF as DataFrame (no file load)', len(case1_samples), var_df=var_df)

print(f'\nCase 2: {len(case2_samples)} samples')

sample_dirs = [snippy_dir / s for s in case2_samples]
variants_reader = SnippyVariantsReader(sample_dirs)
var_df = variants_reader.get_variants_table()
results_table, results = runtime_iteration(get_union_from_vcf_no_load, results_table, 'Load VCF as DataFrame (no file load)', len(case2_samples), var_df=var_df)
results_table

Case 1: Single (1) sample
Runtime (m+-s): 0.02 +- 0.00 seconds, iters 5, result {17193}

Case 2: 10 samples
Runtime (m+-s): 0.36 +- 0.02 seconds, iters 5, result {36920}


Unnamed: 0,Case,Number of Samples,Iterations,Mean runtime (s),Stdev runtime (s),Result
0,Load VCF as DataFrame,1,5,1.135049,0.030889,17193
1,Load VCF as DataFrame,10,5,11.914648,0.061704,36920
2,Load VCF as DataFrame (no file load),1,5,0.024153,0.002352,17193
3,Load VCF as DataFrame (no file load),10,5,0.36359,0.022145,36920


## 2. From relational database

To test this out, I first have to load all the VCF files into the database, which takes a while. Since I'm only testing a specific query (finding union of all variants in some samples) I'm not timing this loading time.

In [8]:
start = time.time()
!variants --database-connection 'mysql+pymysql://test:test@localhost/thesis?charset=utf8mb4' \
    --seqrepo-dir seq_repo --verbose \
    load-snippy --reference-file ../input/S_HeidelbergSL476.fasta.gz ../phylogeny
end = time.time()
print(f'Took {(end-start)/60:0.1f} minutes')

[32m2021-02-24 13:50:10[0m [1;30mINFO[0m [34mstorage.main,53:[0m Connecting to database mysql+pymysql://test:test@localhost/thesis?charset=utf8mb4
[32m2021-02-24 13:50:10[0m [1;30mINFO[0m [34mstorage.main,56:[0m Use seqrepo directory seq_repo
Loading ../phylogeny
[32m2021-02-24 13:50:10[0m [1;30mERROR[0m [34mstorage.load_variants_common,94:[0m [31mSamples ['SH14-013', 'SH14-001', 'SH13-006', 'SH12-009', 'SH12-011', 'SH12-004', 'SH14-008', 'SH14-026', 'SH12-007', 'SH13-005', 'SH14-018', 'SH13-003', 'SH13-002', 'SH14-020', 'SH14-014', 'SH11-001', 'SH14-011', 'SH14-002', 'SH12-005', 'SH13-007', 'SH14-024', 'SH13-004', 'SH14-012', 'SH10-015', 'SH13-008', 'SH14-006', 'SH14-023', 'SH10-014', 'SH14-009', 'SH12-008', 'SH12-010', 'SH12-013', 'SH10-002', 'SH12-003', 'SH14-004', 'SH14-025', 'SH14-019', 'SH14-007', 'SH10-30', 'SH14-017', 'SH12-014', 'SH14-027', 'SH12-002', 'SH12-001', 'SH14-003', 'SH08-001', 'SH14-010', 'SH14-021', 'SH14-016', 'SH11-002', 'SH09-29', 'SH14-028', 

In [9]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def create_session():
    engine = create_engine('mysql+pymysql://test:test@localhost/thesis?charset=utf8mb4', echo=False)

    Session = sessionmaker(bind=engine)
    session = Session()
    return session

### 2.1. No checking for proper reference/sequence name

In [10]:
from storage.variant.model import Sample

def get_union_from_relational_db(samples: List[str]) -> int:
    session = create_session()
    sample_objs = session.query(Sample).filter(Sample.name.in_(samples)).all()
    
    vars_union = set()
    for sample in sample_objs:
        svars = {v.id for v in sample.variants}
        vars_union = vars_union.union(svars)
        
    return len(vars_union)

print(f'Case 1: Single ({len(case1_samples)}) sample')
results_table, results = runtime_iteration(get_union_from_relational_db, results_table, 'Relational DB (no check proper ref)', len(case1_samples), samples=case1_samples)

print(f'\nCase 2: {len(case2_samples)} samples')
results_table, results = runtime_iteration(get_union_from_relational_db, results_table, 'Relational DB (no check proper ref)', len(case2_samples), samples=case2_samples)

Case 1: Single (1) sample
Runtime (m+-s): 0.42 +- 0.04 seconds, iters 5, result {17193}

Case 2: 10 samples
Runtime (m+-s): 3.24 +- 0.06 seconds, iters 5, result {36920}


### 2.2. Checking for proper reference/sequence name from database

In [15]:
from storage.variant.model import VariationAllele, ReferenceSequence, Reference

def get_union_from_relational_db_via_query(samples: List[str]) -> int:
    session = create_session()
    variants = session.query(VariationAllele) \
        .select_from(Sample) \
        .join(Sample.variants) \
        .join(ReferenceSequence) \
        .join(Reference) \
        .filter(Reference.name == 'S_HeidelbergSL476') \
        .filter(Sample.name.in_(samples)) \
        .all()
    
    vars_union = {v.id for v in variants}
    return len(vars_union)

print(f'Case 1: Single ({len(case1_samples)}) sample')
results_table, results = runtime_iteration(get_union_from_relational_db_via_query, results_table, 'Relational DB (proper ref as query)', len(case1_samples), samples=case1_samples)

print(f'\nCase 2: {len(case2_samples)} samples')
results_table, results = runtime_iteration(get_union_from_relational_db_via_query, results_table, 'Relational DB (proper ref as query)', len(case2_samples), samples=case2_samples)

Case 1: Single (1) sample
Runtime (m+-s): 2.19 +- 0.03 seconds, iters 5, result {17193}

Case 2: 10 samples
Runtime (m+-s): 3.06 +- 0.14 seconds, iters 5, result {36920}


### 2.3. Checking for proper reference/sequence name in code

In [17]:
from storage.variant.model import Sample, Reference, ReferenceSequence

def get_union_from_relational_db_via_code(samples: List[str]) -> int:
    session = create_session()
    sample_objs = session.query(Sample).filter(Sample.name.in_(samples)).all()
    ref_sequences = session.query(ReferenceSequence) \
        .join(Reference) \
        .filter(Reference.name == 'S_HeidelbergSL476') \
        .all()
    
    ref_sequence_ids = {r.id for r in ref_sequences}
    
    vars_union = set()
    for sample in sample_objs:
        svars = {v.id for v in sample.variants if v.sequence_id in ref_sequence_ids}
        vars_union = vars_union.union(svars)
        
    return len(vars_union)

print(f'Case 1: Single ({len(case1_samples)}) sample')
results_table, results = runtime_iteration(get_union_from_relational_db_via_code, results_table, 'Relational DB (proper ref as code)', len(case1_samples), samples=case1_samples)

print(f'\nCase 2: {len(case2_samples)} samples')
results_table, results = runtime_iteration(get_union_from_relational_db_via_code, results_table, 'Relational DB (proper ref as code)', len(case2_samples), samples=case2_samples)

Case 1: Single (1) sample
Runtime (m+-s): 0.41 +- 0.04 seconds, iters 5, result {17193}

Case 2: 10 samples
Runtime (m+-s): 3.30 +- 0.06 seconds, iters 5, result {36920}


## 3. From Bloom filter

I am testing out Bloom filters, but I note that I don't think there's any way to extract out the specific set of variants within a Bloom filter afterwards. At least not without storing a separate mapping of variant identifiers to the specific hashes used by the Bloom filters.

Nor do I think I can even get the number of elements in a Bloom filter.

### 3.1. In-memory Bloom filter

In [18]:
from typing import Dict
import copy

from bloom_filter import BloomFilter

def load_vcf_to_bloom_filters(samples: List[str]) -> Dict[str, BloomFilter]:
    sample_dirs = [snippy_dir / s for s in samples]
    variants_reader = SnippyVariantsReader(sample_dirs)
    var_df = variants_reader.get_variants_table()
    
    sample_filters = {}
    
    vars_union = set()
    var_df['SPDI'] = var_df['CHROM'] + ':' + var_df['POS'].astype(str) + ':' + var_df['REF'] + ':' + var_df['ALT']
    for sample in samples:
        sample_filters[sample] = BloomFilter(max_elements=10**6, error_rate=0.1)
        for v in var_df[var_df['SAMPLE'] == sample]['SPDI'].tolist():
            sample_filters[sample].add(v)
        
    return sample_filters

def union_from_bloom_filters(sample_filters: Dict[str, BloomFilter]) -> int:
    samples = list(sample_filters.keys())
    sample = samples.pop()
    start_bloom = copy.deepcopy(sample_filters[sample])
    for sample in samples:
        start_bloom.union(sample_filters[sample])
        
    return repr(start_bloom)

print(f'Case 1: Single ({len(case1_samples)}) sample')
print('Building Bloom Filters')
no_table, results_case1 = runtime_iteration(load_vcf_to_bloom_filters, print_result=False, samples=case1_samples)
print('Using Bloom Filters')
results_table, results = runtime_iteration(union_from_bloom_filters, results_table, 'Bloom filter (union)', len(case1_samples), sample_filters=results_case1[0])

print(f'\nCase 2: {len(case2_samples)} samples')
print('Building Bloom Filters')
no_table, results_case2 = runtime_iteration(load_vcf_to_bloom_filters, print_result=False, samples=case2_samples)
print('Using Bloom Filters')
results_table, results = runtime_iteration(union_from_bloom_filters, results_table, 'Bloom filter (union)', len(case2_samples), sample_filters=results_case2[0])

Case 1: Single (1) sample
Building Bloom Filters
Runtime (m+-s): 1.57 +- 0.07 seconds, iters 5
Using Bloom Filters
Runtime (m+-s): 0.00 +- 0.00 seconds, iters 5, result {'BloomFilter(ideal_num_elements_n=1000000, error_rate_p=0.100000, num_bits_m=4792530)'}

Case 2: 10 samples
Building Bloom Filters
Runtime (m+-s): 15.95 +- 0.05 seconds, iters 5
Using Bloom Filters
Runtime (m+-s): 0.30 +- 0.01 seconds, iters 5, result {'BloomFilter(ideal_num_elements_n=1000000, error_rate_p=0.100000, num_bits_m=4792530)'}


## 4. BCFTools

In [19]:
import tempfile
from pathlib import Path
import subprocess
from io import StringIO

def load_vcf_to_bcf(samples: List[str]) -> Dict[str, Path]:
    bcf_dir = Path(tempfile.mkdtemp())
    vcf_files = {s: snippy_dir / s / 'snps.vcf.gz' for s in samples}
    bcf_files = {s: bcf_dir / f'{s}.bcf' for s in samples}
    
    for sample in vcf_files:
        vcf_file = vcf_files[sample]
        bcf_file = bcf_files[sample]
        command_bcf = ['bcftools', 'view', str(vcf_file), '-o', str(bcf_file), '-O', 'b', '-l', '9']
        command_index = ['bcftools', 'index', str(bcf_file)]
        try:
            subprocess.run(command_bcf, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                       check=True, text=True)
            subprocess.run(command_index, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                       check=True, text=True)
        except subprocess.CalledProcessError as e:
            err_msg = str(e.stderr.strip())
            raise Exception(f'Could not run bcftools on vcf_file=[{vcf_file}]: error {err_msg}')
            
    return bcf_files

def union_from_bcf_files(sample_files: Dict[str, Path]) -> int:
    command_isec = ['bcftools', 'isec', '-c', 'none', '-n', '+1', '--threads', '1']
    for sample in sample_files:
        command_isec.append(str(sample_files[sample]))
        
    command_single = None
    command_single_cut = None
    if len(sample_files) == 1:
        sample = list(sample_files.keys())[0]
        command_single = ['bcftools', 'view', '--threads', '1','--no-header', str(sample_files[sample])]
        command_single_cut = ['cut', '-f', '1,2,4,5']
        
    try:
        if command_single is not None:
            result_first = subprocess.Popen(command_single, stdout=subprocess.PIPE, text=True)
            result = subprocess.run(command_single_cut, stdin=result_first.stdout, stdout=subprocess.PIPE, text=True)
        else:
            result = subprocess.run(command_isec, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                   check=True, text=True)
        vars_data = StringIO(result.stdout)
        var_df = pd.read_csv(vars_data, sep='\t', names=['CHROM', 'POS', 'REF', 'ALT', 'FILES'])
        var_df['SPDI'] = var_df['CHROM'] + ':' + var_df['POS'].astype(str) + ':' + var_df['REF'] + ':' + var_df['ALT']
        var_set = set(var_df['SPDI'].tolist())
    except subprocess.CalledProcessError as e:
        err_msg = str(e.stderr.strip())
        raise Exception(f'Could not run [{" ".join(command)}]: error {err_msg}')
        
    return len(var_set)

sam = ["SH14-004"]
sample_bcf_files = load_vcf_to_bcf(sam)
union_from_bcf_files(sample_bcf_files)

print(f'Case 1: Single ({len(case1_samples)}) sample')
print('Converting to BCF')
no_table, results_case1 = runtime_iteration(load_vcf_to_bcf, print_result=False, samples=case1_samples)
print('Finding union')
results_table, results = runtime_iteration(union_from_bcf_files, results_table, 'BCFTools', len(case1_samples), sample_files=results_case1[0])

print(f'\nCase 2: {len(case2_samples)} samples')
print('Converting to BCF')
no_table, results_case2 = runtime_iteration(load_vcf_to_bcf, print_result=False, samples=case2_samples)
print('Finding union')
results_table, results = runtime_iteration(union_from_bcf_files, results_table, 'BCFTools', len(case2_samples), sample_files=results_case2[0])

Case 1: Single (1) sample
Converting to BCF
Runtime (m+-s): 0.40 +- 0.01 seconds, iters 5
Finding union
Runtime (m+-s): 0.16 +- 0.01 seconds, iters 5, result {17193}

Case 2: 10 samples
Converting to BCF
Runtime (m+-s): 3.98 +- 0.04 seconds, iters 5
Finding union
Runtime (m+-s): 0.30 +- 0.01 seconds, iters 5, result {36920}


In [20]:
results_table

Unnamed: 0,Case,Number of Samples,Iterations,Mean runtime (s),Stdev runtime (s),Result
0,Relational DB (proper ref as query),1,5,2.187969,0.025792,17193
1,Relational DB (proper ref as query),10,5,3.062717,0.137326,36920
2,Relational DB (proper ref as code),1,5,0.407293,0.036488,17193
3,Relational DB (proper ref as code),10,5,3.297895,0.062367,36920
4,Bloom filter (union),1,5,0.000238,0.000252,"BloomFilter(ideal_num_elements_n=1000000, erro..."
5,Bloom filter (union),10,5,0.297692,0.005987,"BloomFilter(ideal_num_elements_n=1000000, erro..."
6,BCFTools,1,5,0.16271,0.01477,17193
7,BCFTools,10,5,0.304076,0.008903,36920


## 5. MongoDB