Skip to content

Commit

Permalink
perf: added metadata taste functionality to the reader. Improved perf…
Browse files Browse the repository at this point in the history
…ormace of block transposition
  • Loading branch information
amanas committed May 20, 2022
1 parent b4ba095 commit 26b2353
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
26 changes: 21 additions & 5 deletions src/dnarecords/reader.py
Expand Up @@ -75,13 +75,22 @@ def _types_dict():
'double': tf.float32}

@staticmethod
def _pandas_safe_read_parquet(path):
def _pandas_safe_read_parquet(path, columns, taste):
import pandas as pd
import tensorflow as tf

files = tf.io.gfile.glob(f'{path}/*.parquet')
if files:
return pd.concat(pd.read_parquet(f) for f in files)
if columns:
if taste:
return pd.read_parquet(files[0], columns=columns)
else:
return pd.concat(pd.read_parquet(f, columns=columns) for f in files)
if not columns:
if taste:
return pd.read_parquet(files[0])
else:
return pd.concat(pd.read_parquet(f) for f in files)
return None

@staticmethod
Expand All @@ -94,11 +103,14 @@ def _pandas_safe_read_json(path):
return pd.concat(pd.read_json(f) for f in files)
return None

def metadata(self) -> Dict[str, DataFrame]:
def metadata(self, vkeys_columns: List[str] = None, skeys_columns: List[str] = None, taste: bool = False) -> Dict[str, DataFrame]:
"""Gets the metadata associated to the DNARecords dataset as a dictionary of names to pandas DataFrames.
:rtype: Dict[str, DataFrame].
:return: the metadata associated to the DNARecords as a dictionary of names to pandas DataFrames.
:param vkeys_columns: columns to return from variant metadata files (potentially big files). Defaults to None (all columns).
:param skeys_columns: columns to return from sample metadata files (potentially big files). Defaults to None (all columns).
:param taste: The full metadata DataFrames could be huge, wo you can get a taste of them without going into memory issues. With that, decide wich columns to get metadata for. Defaults to False.
See Also
--------
Expand All @@ -109,8 +121,12 @@ def metadata(self) -> Dict[str, DataFrame]:
result = {}
tree = dr.helper.DNARecordsUtils.dnarecords_tree(self._dnarecords_path)
for k, v in tree.items():
if k in ['skeys', 'vkeys', 'swpfs', 'vwpfs', 'swrfs', 'vwrfs']:
result.update({k: self._pandas_safe_read_parquet(v)})
if k == 'skeys':
result.update({k: self._pandas_safe_read_parquet(v, skeys_columns, taste)})
if k == 'vkeys':
result.update({k: self._pandas_safe_read_parquet(v, vkeys_columns, taste)})
if k in ['swpfs', 'vwpfs', 'swrfs', 'vwrfs']:
result.update({k: self._pandas_safe_read_parquet(v, None, False)})
if k in ['swpsc', 'vwpsc', 'swrsc', 'vwrsc']:
result.update({k: self._pandas_safe_read_json(v)})
return result
Expand Down
12 changes: 7 additions & 5 deletions src/dnarecords/writer.py
Expand Up @@ -280,6 +280,7 @@ def _write_dnarecords(output, output_schema, dna_blocks, write_mode, gzip, tfrec
if tfrecord_format:
df_writer = df_writer.format("tfrecord").option("recordType", "Example")
if gzip:
# Needs huge overhead memory
df_writer = df_writer.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
else:
df_writer = df_writer.format('parquet')
Expand Down Expand Up @@ -333,11 +334,17 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari
if not tfrecord_format and not parquet_format:
raise Exception('At least one of tfrecord_format, parquet_format must be True')

otree = DNARecordsUtils.dnarecords_tree(output)

self._set_mt()
self._index_mt()
self._set_vkeys_skeys()
self._set_chrom_ranges()
self._update_vkeys_by_chrom_ranges()

self._vkeys.write.mode(write_mode).parquet(otree['vkeys'])
self._skeys.write.mode(write_mode).parquet(otree['skeys'])

self._select_ijv()
self._filter_out_undefined_entries()
if sparse:
Expand All @@ -347,8 +354,6 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari
self._build_ij_blocks()
self._set_ij_blocks()

otree = DNARecordsUtils.dnarecords_tree(output)

if variant_wise:
self._build_dna_blocks('i')
if tfrecord_format:
Expand All @@ -370,6 +375,3 @@ def write(self, output: str, sparse: bool = True, sample_wise: bool = True, vari
self._write_dnarecords(otree['swpar'], otree['swpsc'], f'{self._sw_dna_staging}/*', write_mode,
gzip, False)
self._write_key_files(otree['swpar'], otree['swpfs'], False, write_mode)

self._vkeys.write.mode(write_mode).parquet(otree['vkeys'])
self._skeys.write.mode(write_mode).parquet(otree['skeys'])

0 comments on commit 26b2353

Please sign in to comment.