In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
pip install --upgrade polars

Collecting polars
  Downloading polars-0.20.31-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Downloading polars-0.20.31-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (28.8 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m28.8/28.8 MB[0m [31m53.2 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0mm
[?25hInstalling collected packages: polars
  Attempting uninstall: polars
    Found existing installation: polars 0.20.25
    Uninstalling polars-0.20.25:
      Successfully uninstalled polars-0.20.25
Successfully installed polars-0.20.31
Note: you may need to restart the kernel to use updated packages.


In [2]:
from bystro.api import auth, annotation
from bystro.proteomics.annotation_interface import get_annotation_result_from_query, execute_query
import pandas as pd
import json

In [3]:
ancestry_json = None

with open('ancestry_results.json', 'r') as f:
    ancestry_json = json.load(f)

In [4]:
results = []
for row in ancestry_json['results']:
    # print(row)
    max_superpop_prob = 0
    max_superpop_label = None

    for superpop, value in row['superpops'].items():
        mean = (value['lowerBound'] + value['upperBound']) / 2

        if mean > max_superpop_prob:
            max_superpop_label = superpop
            max_superpop_prob = mean
    
    results.append({
       'sample_id': row['sampleId'],
       'superpop_top_hit': superpop,
       'superpop_top_hit_prob': max_superpop_prob
    })

In [None]:
ancestry_df = pd.DataFrame(results)
ancestry_df

In [3]:
user = auth.login('akotlar@bu.edu', 'gonxug-xubram-nUbny6', 'https://bystro-dev.emory.edu', print_result=False)
index = '661be6b20f228023b55a6927_657a50d4b2d0278938ba791d'

Existing session found, logging out


In [4]:
results = annotation.get_jobs('completed')
job_id = None
index_name = None
for res in results:
    if 'big_daly' in res.name:
        job_id = res._id
        break
job = annotation.get_jobs(job_id=job_id)

In [5]:
job['outputFileNames']

{'annotation': 'big_daly_vcf-20240414142242561-20240414142242572.annotation.tsv.gz',
 'statistics': {'json': 'big_daly_vcf-20240414142242561-20240414142242572.statistics.json',
  'qc': 'big_daly_vcf-20240414142242561-20240414142242572.statistics.qc.tsv',
  'tab': 'big_daly_vcf-20240414142242561-20240414142242572.statistics.tsv'},
 'config': 'hg38.yml',
 'sampleList': 'big_daly_vcf-20240414142242561-20240414142242572.sample_list',
 'dosageMatrixOutPath': 'big_daly_vcf-20240414142242561-20240414142242572.dosage.feather',
 'header': 'big_daly_vcf-20240414142242561-20240414142242572.annotation.header.json',
 'log': 'big_daly_vcf-20240414142242561-20240414142242572.annotation.log.txt'}

In [6]:
index_name = job['search']['indexName']

In [7]:
job_id

'661be6b20f228023b55a6927'

In [12]:
!du -sh "tmp"

92G	tmp


In [None]:
import psutil
import os

process = psutil.Process(os.getpid())
print(f"Before run: {process.memory_info().rss / 1e9} GB")

# Default behavior is to show 1 sample per row, e.g. `melt_samples=True`
query_result_df_array_of_structs_select_fields = get_annotation_result_from_query(
    query_string="cadd:>10",
    index_name=index_name,
    bystro_api_auth=user,
    fields=['refSeq.name2', 'refSeq.name', 'refSeq.exonicAlleleFunction', 'refSeq.siteType'],
    explode_field='refSeq.name2',
    tmp_dir='/mnt/ssd2/tmp',
    output_path='/mnt/ssd2/example_cadd_10_query.feather',
    max_threads=8,
    as_arrow_table=True
)
process = psutil.Process(os.getpid())
print(f"After run: {process.memory_info().rss / 1e9} GB")

Before run: 0.209395712 GB


In [3]:
import glob

feather_files = glob.glob('/mnt/ssd2/tmp/*.feather')
output_parquet_file = '/mnt/ssd2/example_cadd_10_query.parquet'
output_feather_file = '/mnt/ssd2/example_cadd_10_query.feather'

In [4]:
import pyarrow.dataset as ds

dataset = ds.dataset(feather_files, format="feather")
dataset.schema

chrom: string
pos: int64
vcfPos: int64
inputRef: string
alt: string
type: string
id: string
locus: string
sample: string
dosage: int64
refSeq.name2: string
refSeq.name: string
refSeq.exonicAlleleFunction: string
refSeq.siteType: string
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 1882

In [2]:
# Too memory heavy

import pyarrow as pa
import pyarrow.dataset as ds
sink = pa.BufferOutputStream()

dataset = ds.dataset(feather_files, format="feather")

with pa.ipc.new_file(sink, dataset.schema) as writer:
    for batch in dataset.to_batches(batch_readahead=0, fragment_readahead=0):
        writer.write_batch(batch)

KeyboardInterrupt: 

In [None]:
# Too memory hungry

import pyarrow.dataset as ds
import pyarrow.parquet as pq

dataset = ds.dataset(feather_files, format="feather")
pq.write_table(dataset, output_parquet_file)

In [None]:
# Too memory heavy

collected_batches = []

for feather_file in feather_files:
    # Read the Feather file
    table = feather.read_table(feather_file)
    
    # Collect batches
    collected_batches.extend(table.to_batches())

# Combine collected batches into a single table
combined_table = pa.Table.from_batches(collected_batches)

# Write the combined table to a Feather file
output_feather_file = "/path/to/output/combined.feather"
feather.write_table(combined_table, output_feather_file)

In [2]:
# Slow but OK

from pyarrow import feather
import pyarrow.parquet as pq

writer = None
for feather_file in feather_files:
    # Read the Feather file
    table = feather.read_table(feather_file)

    if writer is None:
        # Create a Parquet writer on the first iteration
        writer = pq.ParquetWriter(output_parquet_file, table.schema)
    
    # Write the table to the Parquet file
    writer.write_table(table)

# Close the writer
if writer:
    writer.close()

In [None]:
# too memory hungry
import pandas as pd

df = pd.read_parquet(output_parquet_file, engine='pyarrow')
df

In [4]:
import polars
lazy_df = polars.scan_parquet(output_parquet_file)

In [5]:
lazy_df

In [None]:
import pyarrow.parquet as pq

df = pq.read_table(output_parquet_file, memory_map=True)
df

In [2]:
import pyarrow as pa
from pyarrow import feather
import psutil
import os

def _concatenate_feather_files_memory_mapped(file_list: list[str], output_file: str) -> pa.Table:
    # Print memory usage before concatenating
    process = psutil.Process(os.getpid())
    print(f"Memory usage before concatenating: {process.memory_info().rss / 1e9}")
    # Open the Feather files using memory mapping
    tables = []
    for i, file in enumerate(file_list):
        tables.append(feather.read_table(file, memory_map=True))
        print(f"Memory usage after concatenating file {i}: {process.memory_info().rss / 1e9}")
    # tables = [feather.read_table(file, memory_map=True) for file in file_list]

    # Concatenate the Arrow Tables
    concatenated_table = pa.concat_tables(tables)

    # Optionally, write the concatenated table to a new Feather file
    if output_file:
        feather.write_feather(concatenated_table, output_file)

    # Print memory usage after concatenating
    print(f"Memory usage after concatenating: {process.memory_info().rss / 1e9}")

    return concatenated_table


In [3]:
# res = _concatenate_feather_files_memory_mapped(files, '/mnt/ssd2/example_cadd_10_query.feather')

In [4]:
import pyarrow.dataset as ds
import pyarrow as pa
dat = ds.dataset(files, format='arrow')
scanner = dat.scanner(batch_size=131000, batch_readahead=1,fragment_readahead=1)

In [5]:
new_part = ds.partitioning(
    pa.schema([("chrom", pa.string()), ("pos", pa.int64())]), flavor="hive"
)

In [None]:
ds.write_dataset(scanner, '/mnt/ssd2/example_cadd_10_query.feather', format='arrow', partitioning=new_part, max_open_files=8)

In [None]:
from pyarrow import feather
df2 = feather.read_('foo', memory_map=True)

In [2]:
import psutil
import os

process = psutil.Process(os.getpid())
print(f"{process.memory_info().rss / 1e9} GB")


9.15685376 GB


In [13]:
query_result_df_array_of_structs_select_fields

pyarrow.Table
chrom: string
pos: int64
vcfPos: int64
inputRef: string
alt: string
type: string
id: string
locus: string
sample: string
dosage: int64
refSeq.name2: string
refSeq.name: string
refSeq.exonicAlleleFunction: string
refSeq.siteType: string
----
chrom: [["chr1","chr1","chr1","chr1","chr1",...,"chr16","chr16","chr16","chr16","chr16"],["chr16","chr16","chr16","chr16","chr16",...,"chrX","chrX","chrX","chrX","chrX"]]
pos: [[63454892,63454892,63454892,63454892,63454892,...,81149720,81149720,81149720,81149720,81149720],[81149720,81149720,81149720,81149720,81149720,...,155941919,155941919,155941919,155941919,155941919]]
vcfPos: [[63454892,63454892,63454892,63454892,63454892,...,81149720,81149720,81149720,81149720,81149720],[81149720,81149720,81149720,81149720,81149720,...,155941919,155941919,155941919,155941919,155941919]]
inputRef: [["G","G","G","G","G",...,"T","T","T","T","T"],["T","T","T","T","T",...,"G","G","G","G","G"]]
alt: [["A","A","A","A","A",...,"A","A","A","A","A"],["A","A

In [None]:
query_result_df_array_of_structs_select_fields.shape

In [None]:
import pandas as pd
df = pd.read_feather('0.feather')
df

In [None]:
df.memory_usage(deep=True).sum()

In [None]:
df = feather.read_feather('concat.feather', memory_map=True)

In [None]:
df.shape[0]/1e6

In [None]:
import psutil
import os
psutil.Process(os.getpid()).memory_info().rss / 1e9

In [None]:
! du -ah 'concat.feather'

In [None]:
! du -ah '0.feather'

In [None]:
concatenate_feather_files_memory_mapped(['0.feather', '0.feather', '0.feather', '0.feather', '0.feather', 
                                         '0.feather', '0.feather', '0.feather', '0.feather', '0.feather', '0.feather', '0.feather'], output_file='concat.feather')

In [None]:
4358583520/1e6

In [None]:
pwd

In [None]:
query_result_df_array_of_structs_select_fields['sample'].apply(lambda x: "FOO" if x == 1847 else "BAR")

In [None]:
query_result_df_array_of_structs_select_fields.to_csv('query_result_df_array_of_structs_select_fields_exploded.csv')

In [None]:
query_result_df_array_of_structs_select_fields[query_result_df_array_of_structs_select_fields['locus'] == "chr17:40699291:G:A"]

In [None]:
merged_df = query_result_df_array_of_structs_select_fields.merge(ancestry_df, left_on='sample', right_on='sample_id')

In [None]:
# You can choose to also "melt" the dataframe by a field,
# so that when the field has an array value, it is flattened
# When the primary key of the track isn't found, by default we force that 
# field to have 1 value per row, and warn you of this behavior.
# To change it, set `force_flatten_exploded_field=False`
query_result_df = get_annotation_result_from_query(
    query_string="(type:del && (alt:\-6 || alt:\-1)) || (cadd:>40)",
    index_name=index,
    bystro_api_auth=user,
    melt_samples=True,
    explode_field='refSeq.name2',
    fields=['refSeq.name2', 'refSeq.spID', 'refSeq.name'],
    force_flatten_exploded_field=False
)
query_result_df[['refSeq.name2', 'refSeq.spID']]

In [None]:
# You can disable sample melting by setting `melt_samples=False`
query_result_df = await async_get_annotation_result_from_query(
    query_string="(type:del && (alt:\-6 || alt:\-1)) || (cadd:>40)",
    index_name=index,
    bystro_api_auth=user,
    fields=['refSeq.name2', 'refSeq.name'],
    melt_samples=False,
    explode_field='refSeq.name2'
)
query_result_df.head(n=10)

In [None]:
# You can also represent tracks that have multiple fields by dictionaries, instead of flattening
# the dictionaries into separate columns, by setting `structs_of_arrays=False`
# This is currently incompatible with `explode_field` and `fields`
query_result_df_array_of_structs_no_melt = get_annotation_result_from_query(
    query_string="*",
    index_name=index,
    bystro_api_auth=user,
    structs_of_arrays=False
)
query_result_df_array_of_structs_no_melt.head(n=10)