Skip to content

Commit

Permalink
Merge 010a8c4 into 11b42ec
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Aug 8, 2018
2 parents 11b42ec + 010a8c4 commit df18505
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 0 deletions.
14 changes: 14 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ def dump_to_zip(out_file,
#### dump_to_sql
Store the results in a relational database (creates one or more tables or updates existing tables)

#### cache.py
Cache results from running a series of steps, if cache exists - loads from cache instead of running the steps.

Cache invalidation should be handled manually - by deleting the cache path.

```python
def cache(cache_path, steps*):
pass
```

- `cache_path` - path to a unique directory that will hold the cache for the provided series of steps
- `steps*` - step functions to run to create the cache (same as the `Flow` class arguments)


### Manipulate row-by-row
#### delete_fields
Delete fields (columns) from streamed resources
Expand Down
1 change: 1 addition & 0 deletions dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
from .find_replace import find_replace
from .sort_rows import sort_rows
from .unpivot import unpivot
from .cache import cache
26 changes: 26 additions & 0 deletions dataflows/processors/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from datapackage import Package
from dataflows import PackageWrapper, DataStream, Flow
from . import dump_to_path


def cache(cache_path, *steps):
cache_package_json_path = os.path.join(cache_path, 'datapackage.json')

def processor(package: PackageWrapper):
if os.path.exists(cache_package_json_path):
print('using cache data from {}'.format(cache_path))
dp = Package(cache_package_json_path)
res_iter = (resource.iter(keyed=True) for resource in dp.resources)
else:
print('loading fresh data, saving cache to: {}'.format(cache_path))
ds: DataStream = Flow(*steps + (dump_to_path(cache_path),))._chain()._process()
dp = ds.dp
res_iter = ds.res_iter
for resource in dp.resources:
package.pkg.add_resource(resource.descriptor)
yield package.pkg
yield from package
yield from res_iter

return processor
36 changes: 36 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,39 @@ def test_duplicate_many_rows():
results, _, _ = f.results()
assert len(results[0]) == 10000
assert len(results[1]) == 10000


def test_cache():
import os
from dataflows import cache

stats = {'a': 0, 'foo': 0}

def incr_stat(name):
stats[name] += 1
return stats[name]

cache_path = '.cache/test_cache'
expected_files = ['datapackage.json', 'res_1.csv', 'res_2.csv']

for f in expected_files:
if os.path.exists(cache_path + '/' + f):
os.remove(cache_path + '/' + f)
if os.path.exists(cache_path):
os.rmdir(cache_path)

f = Flow(
cache(cache_path,
({'a': incr_stat('a'), 'i': i} for i in range(10)),
({'foo': incr_stat('foo')} for _ in range(20)),)
)

for _ in range(3):
results, *_ = f.results()
assert results == [[{'a': i+1, 'i': i} for i in range(10)],
[{'foo': i+1} for i in range(20)]]

assert stats['a'] == 10
assert stats['foo'] == 20
for f in expected_files:
assert os.path.exists(cache_path + '/' + f)

0 comments on commit df18505

Please sign in to comment.