Skip to content

Commit

Permalink
fix: fixed scalability issues related with too many small files gener…
Browse files Browse the repository at this point in the history
…ated
  • Loading branch information
amanas committed May 20, 2022
1 parent ba74281 commit b4ba095
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -142,3 +142,4 @@ dmypy.json
.DS_Store

tests/wip.py
dataproc-init-dev.sh
27 changes: 21 additions & 6 deletions src/dnarecords/writer.py
Expand Up @@ -62,7 +62,7 @@ class DNARecordsWriter:
...
:param expr: a Hail expression. Currently, ony expressions coercible to numeric are supported
:param block_size: block size to handle transposing the matrix
:param block_size: entries per block in the internal operations
:param staging: path to staging directory to use for intermediate data. Default: /tmp/dnarecords/staging.
"""
from typing import TYPE_CHECKING
Expand All @@ -75,12 +75,13 @@ class DNARecordsWriter:
_j_blocks: set
_nrows: int
_ncols: int
_sparsity: float
_chrom_ranges: dict
_mt: 'MatrixTable'
_skeys: 'DataFrame'
_vkeys: 'DataFrame'

def __init__(self, expr: 'Expression', block_size=(10000, 10000), staging: str = '/tmp/dnarecords/staging'):
def __init__(self, expr: 'Expression', block_size: int = int(1e6), staging: str = '/tmp/dnarecords/staging'):
self._assert_expr_type(expr)
self._expr = expr
self._block_size = block_size
Expand Down Expand Up @@ -144,13 +145,26 @@ def _set_max_nrows_ncols(self):
self._nrows = self._mt.count_rows()
self._ncols = self._mt.count_cols()

def _set_sparsity(self):
mts = self._mt.head(10000, None)
entries = mts.key_cols_by().key_rows_by().entries().to_spark().filter('v is not null').count()
self._sparsity = entries / (mts.count_rows() * mts.count_cols())

def _get_block_size(self):
import math
M, N, S = self._nrows + 1, self._ncols + 1, self._sparsity + 1e-6
B = self._block_size / S
m = math.ceil(math.sqrt(B * M / N))
n = math.ceil(math.sqrt(B * N / M))
return m, n

def _build_ij_blocks(self):
import pyspark.sql.functions as F
m, n = self._block_size
m, n = self._get_block_size()
df = self._mt.key_cols_by().key_rows_by().entries().to_spark().filter('v is not null')
df = df.withColumn('ib', F.floor(F.col('i')/F.lit(m)))
df = df.withColumn('jb', F.floor(F.col('j')/F.lit(n)))
df.write.partitionBy('ib', 'jb').mode('overwrite').parquet(self._kv_blocks_path)
df = df.withColumn('ib', F.expr(f"i div {m}"))
df = df.withColumn('jb', F.expr(f"j div {n}"))
df.repartition('ib', 'jb').write.partitionBy('ib', 'jb').mode('overwrite').parquet(self._kv_blocks_path)

def _set_ij_blocks(self):
import re
Expand Down Expand Up @@ -329,6 +343,7 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari
if sparse:
self._filter_out_zeroes()
self._set_max_nrows_ncols()
self._set_sparsity()
self._build_ij_blocks()
self._set_ij_blocks()

Expand Down
10 changes: 10 additions & 0 deletions tests/test_writer.py
Expand Up @@ -2,6 +2,8 @@
import pandas as pd
import pytest
import dnarecords as dr
import glob
from itertools import groupby


def check_dosage_sample_wise(skeys, vkeys, mt, dna, approx):
Expand Down Expand Up @@ -116,3 +118,11 @@ def test_raises_exception_when_not_tfrecord_format_and_not_parquet_format(dosage
with pytest.raises(Exception) as ex_info:
dr.writer.DNARecordsWriter(dosage_1kg.dosage).write(dosage_output, tfrecord_format=False, parquet_format=False)
assert ex_info.match(r"^At least one of tfrecord_format, parquet_format must be True$")


def test_generates_single_file_per_block(dosage_output):
kv_blocks = dosage_output.replace('/output', '/staging/kv-blocks')
pairs = [f.rsplit('/', 1) for f in glob.glob(f"{kv_blocks}/*/*/*.parquet")]
for r, g in groupby(pairs, key=lambda t: t[0]):
files = list(g)
assert len(list(files)) == 1, "kv-blocks with more than one file exists"

0 comments on commit b4ba095

Please sign in to comment.