Skip to content

Commit

Permalink
Merge 5488093 into 11b42ec
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Aug 9, 2018
2 parents 11b42ec + 5488093 commit f2b16c6
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 16 deletions.
56 changes: 55 additions & 1 deletion PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin
Loads data from various source types (local files, remote URLS, Google Spreadsheets, databases...)

```python
def load(path, name=None, **options):
def load(path, name=None, resources=False, **options):
pass
```

- `path` - location of the data that is to be loaded. This can be either:
- a local path (e.g. `/path/to/the/data.csv`)
- a remote URL (e.g. `https://path.to/the/data.csv`)
- Other supported links, based on the current support of schemes and formats in [tabulator](https://github.com/frictionlessdata/tabulator-py#schemes)
- a local path or remote URL to a datapackage.json file (e.g. `https://path.to/data_package/datapackage.json`)
- `resources` - required if path points to a datapackage.json file. Value should be one of the following:
- Name of a single resource to load
- A regular expression matching resource names to load
- A list of resource names to load
- `None` indicates to load all resources
- `options` - based on the loaded file, extra options (e.g. `sheet` for Excel files etc., see the link to tabulator above)

#### printer
Expand Down Expand Up @@ -109,6 +115,54 @@ def dump_to_zip(out_file,
#### dump_to_sql
Store the results in a relational database (creates one or more tables or updates existing tables)

#### cache
Cache results from running a series of steps, if cache exists - loads from cache instead of running the steps.

Cache invalidation should be handled manually - by deleting the cache path.

```python
def cache(steps*, cache_path):
pass
```

- `steps*` - step functions to run to create the cache (same as the `Flow` class arguments)
- `cache_path` - path to a unique directory that will hold the cache for the provided series of steps

```python
f = Flow(
cache(
load('http://example.com/large_resource.csv'),
load('http://example.com/another_large_resource.csv'),
cache_path='.cache/unique-cache-identifier'
),
printer()
)

# first run will load the resources and save to the cache_path
f.process()

# next runs will load the resources from the cache_path
f.process()
```

The cache processor should always be the first step in the flow, you can nest cache processors for incremental caching.

In the following example, if can refresh `another_resource` by deleting it's cache path, the `very_large_resources` cache will not be refreshed.

```python
f = Flow(
cache(
cache(
load('http://example.com/very_large_resource.csv'),
load('http://example.com/another_very_large_resource.csv'),
cache_path='.cache/very_large_resources'
),
load('http://example.com/another_resource.csv')
cache_path='.cache/another_resource'
)
)
```

### Manipulate row-by-row
#### delete_fields
Delete fields (columns) from streamed resources
Expand Down
18 changes: 18 additions & 0 deletions TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,21 @@ In the next example we're removing an entire resource in a package processor - t
DataFlows comes with a few built-in processors which do most of the heavy lifting in many common scenarios, leaving you to implement only the minimum code that is specific to your specific problem.

A complete list, which also includes an API reference for each one of them, can be found in the [Built-in Processors](PROCESSORS.md) page.

## Nested Flows

The flow object itself can be used as a step in another flow

```python
def upper(row):
for k in row:
row[k] = row[k].upper()

def lower_first_letter(row):
for k in row:
row[k][0] = row[k][0].lower()

text_processing_flow = Flow(upper, lower_first_letter)

assert Flow([{'foo': 'bar'}], text_processing_flow).results()[0] == [[{'foo': 'bAR'}]]
```
10 changes: 7 additions & 3 deletions dataflows/base/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ def results(self):
def process(self):
return self._chain().process()

def _chain(self):
def datastream(self, ds=None):
return self._chain(ds)._process()

def _chain(self, ds=None):
from ..helpers import datapackage_processor, rows_processor, row_processor, iterable_loader

ds = None
for link in self.chain:
if isinstance(link, DataStreamProcessor):
if isinstance(link, Flow):
ds = link._chain(ds)
elif isinstance(link, DataStreamProcessor):
ds = link(ds)
elif isfunction(link):
sig = signature(link)
Expand Down
1 change: 1 addition & 0 deletions dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@
from .find_replace import find_replace
from .sort_rows import sort_rows
from .unpivot import unpivot
from .cache import cache
20 changes: 20 additions & 0 deletions dataflows/processors/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
from dataflows import Flow, DataStream
from . import dump_to_path, load


def cache(*steps, cache_path):
cache_package_json_path = os.path.join(cache_path, 'datapackage.json')

def processor(package):
if os.path.exists(cache_package_json_path):
print('using cache data from {}'.format(cache_path))
flow = Flow(load(cache_package_json_path, resources=None))
else:
print('loading fresh data, saving cache to: {}'.format(cache_path))
flow = Flow(*steps + (dump_to_path(cache_path),))
ds = flow.datastream(DataStream(package.pkg, package))
yield ds.dp
yield from ds.res_iter

return processor
41 changes: 29 additions & 12 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,47 @@

from datapackage import Package, Resource
from .. import DataStreamProcessor
from ..helpers.resource_matcher import ResourceMatcher


class load(DataStreamProcessor):

def __init__(self, path, name=None, **options):
def __init__(self, path, name=None, resources=False, **options):
super(load, self).__init__()
self.path = path
self.options = options
self.name = name
if resources is False:
self.resource_matcher = None
else:
self.resource_matcher = ResourceMatcher(resources)

def process_datapackage(self, dp: Package):
if os.path.exists(self.path):
base_path = os.path.dirname(self.path)
self.path = os.path.basename(self.path)
if self.resource_matcher:
self.load_dp = Package(self.path)
for resource in self.load_dp.resources:
if self.resource_matcher.match(resource.name):
dp.add_resource(resource.descriptor)
else:
base_path = None
self.res = Resource(dict(path=self.path, profile='tabular-data-resource', **self.options), base_path=base_path)
self.res.infer(confidence=1, limit=1000)
if self.name is not None:
self.res.descriptor['name'] = self.name
self.res.descriptor['path'] = '{name}.{format}'.format(**self.res.descriptor)
dp.add_resource(self.res.descriptor)
if os.path.exists(self.path):
base_path = os.path.dirname(self.path)
self.path = os.path.basename(self.path)
else:
base_path = None
self.res = Resource(dict(path=self.path,
profile='tabular-data-resource',
**self.options), base_path=base_path)
self.res.infer(confidence=1, limit=1000)
if self.name is not None:
self.res.descriptor['name'] = self.name
self.res.descriptor['path'] = '{name}.{format}'.format(**self.res.descriptor)
dp.add_resource(self.res.descriptor)
return dp

def process_resources(self, resources):
yield from super(load, self).process_resources(resources)
yield self.res.iter(keyed=True)
if self.resource_matcher:
yield from (resource.iter(keyed=True) for resource in self.load_dp.resources
if self.resource_matcher.match(resource.name))
else:
yield self.res.iter(keyed=True)
80 changes: 80 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,83 @@ def test_duplicate_many_rows():
results, _, _ = f.results()
assert len(results[0]) == 10000
assert len(results[1]) == 10000


def test_flow_as_step():
def upper(row):
for k in row:
row[k] = row[k].upper()

def lower_first_letter(row):
for k in row:
row[k] = row[k][0].lower() + row[k][1:]

text_processing_flow = Flow(upper, lower_first_letter)

assert Flow([{'foo': 'bar'}], text_processing_flow).results()[0] == [[{'foo': 'bAR'}]]


def test_load_from_package():
from dataflows import dump_to_path, load

Flow(
[{'foo': 'bar'}],
dump_to_path('data/load_from_package')
).process()

ds = Flow(
load('data/load_from_package/datapackage.json', resources=None)
).datastream()

assert len(ds.dp.resources) == 1
assert [list(res) for res in ds.res_iter] == [[{'foo': 'bar'}]]


def test_cache():
import os
import shutil
from dataflows import cache

stats = {'a': 0, 'foo': 0}

def incr_stat(name):
stats[name] += 1
return stats[name]

cache_path = '.cache/test_cache'
expected_files = ['datapackage.json', 'res_1.csv', 'cached_resource.csv']

shutil.rmtree(cache_path, ignore_errors=True)

def load_data(resource_name):

def processor(package):
package.pkg.add_resource({'name': resource_name,
'path': resource_name+'.csv',
'schema': {'fields': [{'name': 'foo', 'type': 'integer'}]}})
yield package.pkg
yield from package
yield ({'foo': incr_stat('foo')} for _ in range(20))

return processor

f = Flow(
cache(
cache(
({'a': incr_stat('a'), 'i': i} for i in range(10)),
cache_path=cache_path + '/first_cache'
),
load_data('cached_resource'),
cache_path=cache_path
)
)

for i in range(3):
results, *_ = f.results()
assert results == [[{'a': i+1, 'i': i} for i in range(10)],
[{'foo': i+1} for i in range(20)]], 'failed iteration {}'.format(i)

assert stats['a'] == 10
assert stats['foo'] == 20
for f in expected_files:
assert os.path.exists(cache_path + '/' + f)

0 comments on commit f2b16c6

Please sign in to comment.