In [None]:
import duckdb
from pathlib import Path

from varpile import process_chromosome


In [None]:
file_path = Path("/Users/vinter/data/geert/GC138853-i.vqsr_filtered.vcf.gz")
out_path = file_path.with_name(file_path.name.rstrip(".vcf.gz"))

In [None]:
%%bash
time python allele_counts.py

In [None]:
%%time
%run allele_counts
process_chromosome(file_path, out_path, "chr1")

- with Parsing:  9.5 s
- with iter_allele: 16.2 s
- with data.txt output: 18 s
- parquet conversion is 1.5s
- with parquet conversion: 21 s
- 10 threads (per chromosome): 5s

Alternatively
- bcftools query -f '%CHROM %POS %REF %ALT [%GT]' : 19.2 s

Size:
- data.txt: 99M
- data.parquet snappy: 38M 
- data.parquet GZIP: 22M
- data.parquet ZSTD: 26M


AGGREGATE chr1 benchmark
- 10 GB data (3 duplicated files, each roughly 3Mb): 36.5s
- 10 GB data (3 duplicated files, each roughly 3Mb), mem_limit = 2GB: 1m 28s


## 1. Create temporary parquet files

In [None]:
%run allele_counts # import allel_counts code

#input_dir = Path("/Users/vinter/data/geert/")
input_dir = Path("/Users/vinter/MyData/test_data/")
input_files = list(input_dir.glob("*.vcf.gz")) # list of input files
# tmp_dir is for temporary parquet files
tmp_dir = input_dir / "tmp"

# Convert input files to temporary files (per chromosomes)
chromosome = "chr1"
for input_file in input_files:
    name = input_file.name.rstrip("vcf.gz") + ".parquet"
    process_chromosome(input_file, tmp_dir / name, chromosome)

duckdb.read_parquet(f"{tmp_dir}/**/*.parquet").limit(4)

## 2. Merge temporary parquet files

In [None]:
%%time
duckdb.query(f"""
    select
    pos, ref, alt,
    sum(case AC when DP > 10 then AC else 0 end)::int AC,
    sum(case AC_hom when DP > 10 then AC_hom else 0 end)::int AC_hom,
    sum(case AC_hemi when DP > 10 then AC_hemi else 0 end)::int AC_hemi,
    -- DP stat counts
    count(*)::int n_samples,  -- number of samples
    sum(DP)::double DP_sum,
    sum(DP**2)::double DP2_sum,
    -- array_agg(DP) DPs, -- for debug
    from read_parquet('{tmp_dir}/*/chromosome=chr1/*.parquet', hive_partitioning = false)
    group by *
    order by pos, ref, alt
""").write_parquet(f'{tmp_dir}/chr1.parquet')

## 3. Calculate mean and standard deviation using n_samples, DP_sum and DP2_sum

In [None]:
rel = duckdb.read_parquet(f'{tmp_dir}/chr1.parquet')
duckdb.query("""
select
 * exclude (DP_sum, DP2_sum),
 (DP_sum/n_samples) as DP_mean,
 sqrt((DP2_sum - 2*DP_mean*DP_sum + n_samples*(DP_mean**2)) / n_samples)  as DP_std,
 from rel
""").limit(100).df()