Skip to content

Commit

Permalink
Merge pull request #8 from ContinuumIO/caching
Browse files Browse the repository at this point in the history
Test to run with directory caching
  • Loading branch information
martindurant committed Sep 4, 2018
2 parents e567875 + 62016d8 commit 5d122d5
Show file tree
Hide file tree
Showing 54 changed files with 59 additions and 4 deletions.
4 changes: 2 additions & 2 deletions docs/source/conf.py
Expand Up @@ -59,9 +59,9 @@
# built documents.
#
# The short X.Y version.
version = u'0.0.1'
version = u'0.2.1'
# The full version, including alpha/beta/rc tags.
release = u'0.0.1'
release = u'0.2.1'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
18 changes: 18 additions & 0 deletions docs/source/quickstart.rst
Expand Up @@ -83,3 +83,21 @@ Parquet data also plays well with Dask parallel processing, so the method ``to_d
be considered. Importantly, sub-selecting from the columns of the Dask data-frame will prevent
unnecessary loading of the non-required columns even in the case where columns selection has
not been included in the catalog entry user parameters.

Caching
~~~~~~~

Parquet data-sets can be singular, lists of files, or whole directory trees. The first two can
be cached using the standard "files" type cache, but the latter requires "dir" type cachimg to
capture the whole structure. An example may look like:

.. code-block:: yaml
cache:
- type: dir
regex: '{{ CATALOG_DIR }}/split'
argkey: urlpath
depth: 4
Where the extra ``depth`` parameter indicates the number of directory levels that should be
scanned.
6 changes: 4 additions & 2 deletions intake_parquet/source.py
Expand Up @@ -48,10 +48,11 @@ def __init__(self, urlpath, metadata=None,
def _get_schema(self):
import fastparquet as fp
from dask.bytes.core import get_fs_token_paths
urlpath, *_ = self._get_cache(self._urlpath)
if self._pf is None:
# copied from dask to allow remote
fs, fs_token, paths = get_fs_token_paths(
self._urlpath, mode='rb', storage_options=self._storage_options)
urlpath, mode='rb', storage_options=self._storage_options)

if len(paths) > 1:
pf = fp.ParquetFile(paths, open_with=fs.open, sep=fs.sep)
Expand Down Expand Up @@ -124,12 +125,13 @@ def to_dask(self):
Create a lazy dask-dataframe from the parquet data
"""
import dask.dataframe as dd
urlpath, *_ = self._get_cache(self._urlpath)
# More efficient to call dask function directly.
self._load_metadata()
columns = self._kwargs.get('columns', None)
index = self._kwargs.get('index', None)
filters = self._kwargs.get('filters', [])
self._df = dd.read_parquet(self._urlpath, columns=columns, index=index,
self._df = dd.read_parquet(urlpath, columns=columns, index=index,
filters=filters,
storage_options=self._storage_options)
self._schema = None
Expand Down
10 changes: 10 additions & 0 deletions tests/cache_cat.yaml
@@ -0,0 +1,10 @@
sources:
split:
driver: parquet
args:
urlpath: '{{ CATALOG_DIR }}/split'
cache:
- type: dir
regex: '{{ CATALOG_DIR }}/split'
argkey: urlpath
depth: 4
Binary file added tests/split/_metadata
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
25 changes: 25 additions & 0 deletions tests/test_source.py
@@ -1,11 +1,15 @@
import fastparquet
import glob
import msgpack
import os
import pickle
import pytest
import shutil

from intake_parquet.source import ParquetSource
import intake

intake.registry['parquet'] = ParquetSource # because pytest defers import
here = os.path.dirname(__file__)
path = os.path.join(here, 'test.parq')
path2 = os.path.join(here, 'test2.parq')
Expand Down Expand Up @@ -102,3 +106,24 @@ def test_filter():
finally:
import shutil
shutil.rmtree(d)


def test_with_cache():
import tempfile
d = tempfile.mkdtemp()
old = intake.config.conf['cache_dir']
expected = fastparquet.ParquetFile(os.path.join(here, 'split')).to_pandas()
try:
intake.config.conf['cache_dir'] = d
cat = intake.open_catalog(os.path.join(here, 'cache_cat.yaml'))
s = cat.split()
assert isinstance(s.cache[0], intake.source.cache.DirCache)
outfiles = s.cache[0].load(s._urlpath, output=False)
assert outfiles
assert outfiles[0].startswith(s.cache_dirs[0])
loc = s.cache[0]._path(s._urlpath)
assert glob.glob(loc + '/*/*/*.parquet')
assert s.read().equals(expected)
finally:
shutil.rmtree(d)
intake.config.conf['cache_dir'] = old

0 comments on commit 5d122d5

Please sign in to comment.