In [None]:
#hide
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
# default_exp petastorm

In [None]:
from petastorm import make_batch_reader, TransformSpec
from petastorm.pytorch import DataLoader as PetaDataLoader
from torch.utils.data import TensorDataset, DataLoader as TorchDataLoader, IterableDataset
from torch import tensor
from pyarrow.parquet import ParquetFile, ParquetReader
import os
import re
import torch
import math
import pandas as pd

def _init_filenames(log, filename_param, rex=None):
    # note to self: I think that comes from petastorm/parquet but I don't really remember now :/
    FILE_PREFIX = 'file:'
    
    if rex is None:
        return filename_param

    filename = filename_param[len(FILE_PREFIX):]
    if not os.path.isdir(filename):
        raise ValueError(f"Filteri ng only possible for dirs, {filename} is not a one")

    paths = [os.path.join(dp, f) for dp, dn, fn in os.walk(filename) for f in fn]
    res = list(map(
        lambda f: FILE_PREFIX + f,
        filter(lambda f: re.match(rex, f) is not None, paths)
    ))
    if (len(res) == 0):
        raise ValueError(f"0 files remained out ot {len(paths)} - seems regex is too restrictive")

    if (len(res) == len(paths)):
        raise ValueError(f"{len(paths)} files remained out ot {len(paths)} - seems regex is a no op")

    log.debug(f"{filename_param} -> {len(res)} files out of {len(paths)} remained after applying filter ({rex})")
    return res;

In [None]:
# hide

# TODO: see if there are nicer ways of importing from other notebooks
import sys
sys.path.insert(1, './kaggle-m5-nbdev') 
from core import test_eq, test_err, configure_logging
log = configure_logging('./tmp', 'test_log', con_log_lvl='DEBUG')

Logging already initialized


In [None]:
test_dir = './tmp/_init_filenames_test'
!rm -rf {test_dir}
df = pd.DataFrame({
    'id': np.arange(0, 1000), 
    'sell_price': np.arange(0, 1000)
})
df['partition'] = df['id'] % 2
df.to_parquet(test_dir, partition_cols = ['partition'])
files = !find {test_dir} | sort 
files

['./tmp/_init_filenames_test',
 './tmp/_init_filenames_test/partition=0',
 './tmp/_init_filenames_test/partition=0/5846db13c7b744a5991598700f307860.parquet',
 './tmp/_init_filenames_test/partition=1',
 './tmp/_init_filenames_test/partition=1/b7b316e71f764762959315955ed5c26c.parquet']

In [None]:
filename = f'file:{test_dir}'
test_eq(filename, _init_filenames(log, filename))

filtered_list = _init_filenames(log, filename, rex='.*partition=1')
test_eq([f'file:{files[-1]}'], filtered_list)

noop_filtered_list = lambda: _init_filenames(log, filename, rex='.*')
test_err(noop_filtered_list, '2 files remained out ot 2')

empty_filtered_list = lambda: _init_filenames(log, filename, rex='aa')
test_err(empty_filtered_list, '0 files remained out ot 2')

DEBUG [2020-07-10 19:31:18] root: file:./tmp/_init_filenames_test -> 1 files out of 2 remained after applying filter (.*partition=1)


In [None]:
class PetastormLeakyDescriptorsPatch():
    def patch_leaking_fd(self, log):
        def _patched_init(self, source, **kwargs):
            self.source = source
            return ParquetFile.__old_init__(self, source, **kwargs)

        def _exit(self, *args, **kwargs):
            if hasattr(self.source, 'close'):
                self.source.close()
                del self.source

        def _bopen(fn):    
            return open(fn, 'rb')

        PetastormLeakyDescriptorsPatch.pre_open_fds = _bopen
        if not hasattr(ParquetFile, '__old_init__'):
            log.debug("Patching")
            ParquetFile.__old_init__ = ParquetFile.__init__

            ParquetFile.__init__ = _patched_init
            ParquetFile.__exit__ = _exit
            ParquetFile.__del__ = _exit

        else:
            log.debug("Already patched")
            
    def patch_loader(self, loader):
        if PetastormLeakyDescriptorsPatch.pre_open_fds:
            loader.reader.dataset.fs.open = pre_open_fds
        else:
            raise ValueError("Loader patching can't happen before class patching")            

In [None]:
loader = PetaDataLoader(
    reader = make_batch_reader('file:./tmp/_init_filenames_test/',
        schema_fields=['id'],
        workers_count=1
    ),
    batch_size=128,
    shuffling_queue_capacity=100000
)
# TODO: this not warning it out suggests the leak is no longer here?
for batch in loader:
    pass

  column_as_pandas = column.data.chunk(0).to_pandas()


In [None]:
#export
class ParquetIterableDataset(IterableDataset):
    def __init__(self, filename, log, rex=None):
        super().__init__()
        self._filename_param = filename
        self.rex_param = rex
        self.log = log
        # self.patch = PetastormLeakyDescriptorsPatch()
        # self.patch.patch_leaking_fd(log)
        self.filename_param = filename
        self.filename = _init_filenames(log, filename, rex)

    def _init_petaloader(self):
        def _transform_row(df_batch):
            return df_batch

        transform = TransformSpec(_transform_row, removed_fields=['cat_id', 'store_id', 'state_id'])
        reader = make_batch_reader(self.filename,
                 schema_fields=['id', 'item_id', 'dept_id', 'cat_id', 'day_id',
               'sales', 'day_date_str', 'month_id', 'date', 'wm_yr_wk',
               'snap_flag', 'sell_price', 'sales_dollars', 'store_id', 'state_id'],
                workers_count=1
                #,transform_spec = transform
        )
        return PetaDataLoader(reader=reader, batch_size=128, shuffling_queue_capacity=100000)
        
    def __len__(self):
        return 1913*30490 # can be arbitrary large value to prevent WARN logs, seem to be ignored anyway

    def __iter__(self):
        self.log.debug(f"Iterator created on {self._filename_param}")
        worker_info = torch.utils.data.get_worker_info()
        if worker_info is None:
            count_cells = 0
            count_batches = 0
            with self._init_petaloader() as loader:
                # self.patch.patch_loader(loader)
                for batch in loader:
                    count_batches += 1
                    # TODO: propagate petaloader's batches without breaking them into individual items
                    for idx in range(len(batch['sell_price'])):
                        price         = batch['sell_price'][idx]
                        sales_dollars = batch['sales_dollars'][idx] if ('sales_dollars' in batch) else -1.
                        price_is_nan = math.isnan(price)
                        # TODO: this starts to look like feature extraction, doesn't belong here
                        price_or_zero = 0. if price_is_nan else price
                        count_cells += 1
                        # float32 needed for pytorch downstream
                        yield {'features': tensor([price_or_zero, price_is_nan], dtype=torch.float32),
                               'targets': tensor([sales_dollars])}
                        
            self.log.debug(f'Done iterating: {count_batches} batches / ({count_cells} cells) ')
        else:
            raise ValueError("Not implemented for multithreading")

In [None]:
ds = ParquetIterableDataset('file:./tmp/_init_filenames_test/', log)
row = next(iter(ds))
test_eq(tensor([-1.]), row['targets'])

ds = ParquetIterableDataset('file:./tmp/_init_filenames_test/', log)
test_eq(1000, len([row for row in ds]))

DEBUG [2020-07-10 19:31:22] root: Iterator created on file:./tmp/_init_filenames_test/
DEBUG [2020-07-10 19:31:23] root: Iterator created on file:./tmp/_init_filenames_test/
DEBUG [2020-07-10 19:31:24] root: Done iterating: 8 batches / (1000 cells) 
