Skip to content

Commit

Permalink
Merge d3e7514 into fd36cec
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Sep 16, 2018
2 parents fd36cec + d3e7514 commit cc29dc5
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 13 deletions.
7 changes: 4 additions & 3 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin
Loads data from various source types (local files, remote URLS, Google Spreadsheets, databases...)

```python
def load(path, name=None, resources=None, **options):
def load(source, name=None, resources=None, **options):
pass
```

- `path` - location of the data that is to be loaded. This can be either:
- `source` - location of the data that is to be loaded. This can be either:
- a local path (e.g. `/path/to/the/data.csv`)
- a remote URL (e.g. `https://path.to/the/data.csv`)
- Other supported links, based on the current support of schemes and formats in [tabulator](https://github.com/frictionlessdata/tabulator-py#schemes)
- a local path or remote URL to a datapackage.json file (e.g. `https://path.to/data_package/datapackage.json`)
- `resources` - required if path points to a datapackage.json file. Value should be one of the following:
- a tuple containing (datapackage_descriptor, resources_iterator)
- `resources` - optional, relevant only if source points to a datapackage.json file or datapackage / resourecs tuple. Value should be one of the following:
- Name of a single resource to load
- A regular expression matching resource names to load
- A list of resource names to load
Expand Down
27 changes: 18 additions & 9 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,32 @@

class load(DataStreamProcessor):

def __init__(self, path, name=None, resources=None, **options):
def __init__(self, load_source, name=None, resources=None, **options):
super(load, self).__init__()
self.path = path
self.load_source = load_source
self.options = options
self.name = name
self.resource_matcher = ResourceMatcher(resources)
self.load_dp = None

def process_datapackage(self, dp: Package):
if os.path.basename(self.path) == 'datapackage.json':
self.load_dp = Package(self.path)
if isinstance(self.load_source, tuple):
datapackage_descriptor, _ = self.load_source
for resource_descriptor in datapackage_descriptor['resources']:
if self.resource_matcher.match(resource_descriptor['name']):
dp.add_resource(resource_descriptor)
elif os.path.basename(self.load_source) == 'datapackage.json':
self.load_dp = Package(self.load_source)
for resource in self.load_dp.resources:
if self.resource_matcher.match(resource.name):
dp.add_resource(resource.descriptor)
else:
if os.path.exists(self.path):
base_path = os.path.dirname(self.path) or '.'
self.path = os.path.basename(self.path)
if os.path.exists(self.load_source):
base_path = os.path.dirname(self.load_source) or '.'
self.load_source = os.path.basename(self.load_source)
else:
base_path = None
descriptor = dict(path=self.path,
descriptor = dict(path=self.load_source,
profile='tabular-data-resource')
if 'format' in self.options:
descriptor['format'] = self.options['format']
Expand All @@ -43,7 +48,11 @@ def process_datapackage(self, dp: Package):

def process_resources(self, resources):
yield from super(load, self).process_resources(resources)
if self.load_dp is not None:
if isinstance(self.load_source, tuple):
datapackage_descriptor, resources = self.load_source
yield from (resource for resource, descriptor in zip(resources, datapackage_descriptor['resources'])
if self.resource_matcher.match(descriptor['name']))
elif self.load_dp is not None:
yield from (resource.iter(keyed=True) for resource in self.load_dp.resources
if self.resource_matcher.match(resource.name))
else:
Expand Down
37 changes: 36 additions & 1 deletion tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,48 @@ def test_load_from_package():
).process()

ds = Flow(
load('data/load_from_package/datapackage.json', resources=None)
load('data/load_from_package/datapackage.json')
).datastream()

assert len(ds.dp.resources) == 1
assert [list(res) for res in ds.res_iter] == [[{'foo': 'bar'}]]


def test_load_from_package_resource_matching():
from dataflows import dump_to_path, load

Flow(
[{'foo': 'bar'}],
[{'foo': 'baz'}],
dump_to_path('data/load_from_package')
).process()

ds = Flow(
load('data/load_from_package/datapackage.json', resources=['res_2'])
).datastream()

assert len(ds.dp.resources) == 1
assert [list(res) for res in ds.res_iter] == [[{'foo': 'baz'}]]



def test_load_from_package_resources():
from dataflows import load

datapackage = {'resources': [{'name': 'my-resource-{}'.format(i),
'path': 'my-resource-{}.csv'.format(i),
'schema': {'fields': [{'name': 'foo', 'type': 'string'}]}} for i in range(2)]}
resources = ((row for row in [{'foo': 'bar{}'.format(i)}, {'foo': 'baz{}'.format(i)}]) for i in range(2))

data, dp, *_ = Flow(
load((datapackage, resources), resources=['my-resource-1']),
).results()

assert len(dp.resources) == 1
assert dp.get_resource('my-resource-1').descriptor['path'] == 'my-resource-1.csv'
assert data[0][1] == {'foo': 'baz1'}


def test_cache():
import os
import shutil
Expand Down

0 comments on commit cc29dc5

Please sign in to comment.