Skip to content

Commit

Permalink
Merge d377937 into 0db3831
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Oct 10, 2018
2 parents 0db3831 + d377937 commit 2d88781
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 61 deletions.
19 changes: 17 additions & 2 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def load(source, name=None, resources=None, **options):
- A regular expression matching resource names to load
- A list of resource names to load
- `None` indicates to load all resources
- The index of the resource in the package
- `options` - based on the loaded file, extra options (e.g. `sheet` for Excel files etc., see the link to tabulator above)

#### printer
Expand Down Expand Up @@ -219,6 +220,7 @@ def delete_fields(fields, resources=None):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package

#### add_computed_field
Add field(s) to streamed resources
Expand Down Expand Up @@ -253,6 +255,7 @@ def add_computed_field(fields, resources=None):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package


#### find_replace.py
Expand All @@ -273,18 +276,25 @@ def find_replace(fields, resources=None):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package

#### set_type.py
Sets a field's data type and type options and validates its data based on its new type definition.

This processor modifies the last resource in the package.
By default, this processor modifies the last resource in the package.

```python
def set_type(name, **options):
def set_type(name, resources=-1, **options):
pass
```

- `name` - the name of the field to modify
- `resources`
- A name of a resource to operate on
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package
- `options` - options to set for the field. Most common ones would be:
- `type` - set the data type (e.g. `string`, `integer`, `number` etc.)
- `format` - e.g. for date fields
Expand Down Expand Up @@ -318,6 +328,7 @@ def sort_rows(key, resources=None, reverse=False):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package
- `reverse` - Set to True to return results in descending order

#### unpivot.py
Expand All @@ -344,6 +355,7 @@ def unpivot(unpivot_fields, extra_keys, extra_value, resources=None):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package


#### filter_rows.py
Expand All @@ -362,6 +374,7 @@ def filter_rows(equals=tuple(), not_equals=tuple(), resources=None):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package

Both `in` and `out` should be a list of dicts.

Expand Down Expand Up @@ -391,6 +404,7 @@ def update_resource(resources, **metadata):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package
- `metadata` - Any allowed property (according to the [spec]([https://frictionlessdata.io/specs/data-resource/#metadata)) can be provided here.

You can use `update_resource` to rename a resource like so:
Expand Down Expand Up @@ -418,6 +432,7 @@ def concatenate(fields, target={}, resources=None):
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package
Resources to concatenate must appear in consecutive order within the data-package.

#### duplicate.py
Expand Down
2 changes: 1 addition & 1 deletion dataflows/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.16
0.0.17
8 changes: 7 additions & 1 deletion dataflows/helpers/resource_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@

class ResourceMatcher(object):

def __init__(self, resources):
def __init__(self, resources, datapackage):
self.resources = resources
if resources is None:
self.resources = None
elif isinstance(self.resources, str):
self.resources = re.compile('^' + self.resources + '$')
self.re = True
elif isinstance(self.resources, int):
if isinstance(datapackage, dict):
self.resources = [datapackage['resources'][self.resources]['name']]
else:
self.resources = [datapackage.resources[self.resources].name]
self.re = False
else:
assert isinstance(self.resources, list)
self.re = False
Expand Down
3 changes: 1 addition & 2 deletions dataflows/processors/add_computed_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ def process_resource(fields, rows):

def add_computed_field(fields, resources=None):

matcher = ResourceMatcher(resources)

def func(package):
matcher = ResourceMatcher(resources, package.pkg)
for resource in package.pkg.descriptor['resources']:
if matcher.match(resource['name']):
descriptor = resource
Expand Down
5 changes: 2 additions & 3 deletions dataflows/processors/concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ def concatenator(resources, all_target_fields, field_mapping):


def concatenate(fields, target={}, resources=None):
matcher = ResourceMatcher(resources)

def func(package):
matcher = ResourceMatcher(resources, package.pkg)
# Prepare target resource
if 'name' not in target:
target['name'] = 'concat'
Expand Down Expand Up @@ -102,8 +102,7 @@ def func(package):
yield package.pkg

it = iter(package)
while True:
resource = next(it)
for resource in it:
if matcher.match(resource.res.name):
resource_chain = \
itertools.chain([resource],
Expand Down
3 changes: 1 addition & 2 deletions dataflows/processors/delete_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ def process_resource(rows, fields):

def delete_fields(fields, resources=None):

matcher = ResourceMatcher(resources)

def func(package):
matcher = ResourceMatcher(resources, package.pkg)
dp_resources = package.pkg.descriptor.get('resources', [])
for resource in dp_resources:
if matcher.match(resource['name']):
Expand Down
15 changes: 8 additions & 7 deletions dataflows/processors/filter_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ def process_resource(rows, conditions):

def filter_rows(equals=tuple(), not_equals=tuple(), resources=None):

matcher = ResourceMatcher(resources)

conditions = [
(operator.eq, k, v)
for o in equals
Expand All @@ -23,10 +21,13 @@ def filter_rows(equals=tuple(), not_equals=tuple(), resources=None):
for k, v in o.items()
]

def func(rows):
if matcher.match(rows.res.name):
return process_resource(rows, conditions)
else:
return rows
def func(package):
matcher = ResourceMatcher(resources, package.pkg)
yield package.pkg
for r in package:
if matcher.match(r.res.name):
yield process_resource(r, conditions)
else:
yield r

return func
33 changes: 19 additions & 14 deletions dataflows/processors/find_replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,26 @@
from ..helpers.resource_matcher import ResourceMatcher


def find_replace(fields, resources=None):
def _find_replace(rows, fields):
for row in rows:
for field in fields:
for pattern in field.get('patterns', []):
row[field['name']] = re.sub(
str(pattern['find']),
str(pattern['replace']),
str(row[field['name']]))
yield row


matcher = ResourceMatcher(resources)
def find_replace(fields, resources=None):

def func(rows):
if matcher.match(rows.res.name):
for row in rows:
for field in fields:
for pattern in field.get('patterns', []):
row[field['name']] = re.sub(
str(pattern['find']),
str(pattern['replace']),
str(row[field['name']]))
yield row
else:
yield from rows
def func(package):
matcher = ResourceMatcher(resources, package.pkg)
yield package.pkg
for rows in package:
if matcher.match(rows.res.name):
yield _find_replace(rows, fields)
else:
yield rows

return func
4 changes: 3 additions & 1 deletion dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ def __init__(self, load_source, name=None, resources=None, **options):
self.load_source = load_source
self.options = options
self.name = name
self.resource_matcher = ResourceMatcher(resources)
self.resources = resources
self.load_dp = None

def process_datapackage(self, dp: Package):
if isinstance(self.load_source, tuple):
datapackage_descriptor, _ = self.load_source
dp.descriptor.setdefault('resources', [])
self.resource_matcher = ResourceMatcher(self.resources, datapackage_descriptor)
for resource_descriptor in datapackage_descriptor['resources']:
if self.resource_matcher.match(resource_descriptor['name']):
dp.add_resource(resource_descriptor)
elif os.path.basename(self.load_source) == 'datapackage.json':
self.load_dp = Package(self.load_source)
self.resource_matcher = ResourceMatcher(self.resources, self.load_dp)
dp.descriptor.setdefault('resources', [])
for resource in self.load_dp.resources:
if self.resource_matcher.match(resource.name):
Expand Down
31 changes: 16 additions & 15 deletions dataflows/processors/set_type.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
from itertools import islice
from datapackage import Resource
from ..helpers.resource_matcher import ResourceMatcher
from .. import DataStreamProcessor, schema_validator


class set_type(DataStreamProcessor):

def __init__(self, name, **options):
def __init__(self, name, resources=-1, **options):
super(set_type, self).__init__()
self.name = name
self.options = options
self.resource = None
self.resource_count = None
self.resources = resources

def process_resources(self, resources):
resources = super(set_type, self).process_resources(resources)
yield from islice(resources, self.resource_count - 1)
yield schema_validator(self.resource, next(resources))
for res in resources:
if self.matcher.match(res.res.name):
yield schema_validator(res.res, res)
else:
yield res

def process_datapackage(self, dp):
dp = super(set_type, self).process_datapackage(dp)
self.matcher = ResourceMatcher(self.resources, dp)
added = False
for field in dp.descriptor['resources'][-1]['schema']['fields']:
if field['name'] == self.name:
field.update(self.options)
added = True
break
self.resource = Resource(dp.descriptor['resources'][-1])
self.resource_count = len(dp.descriptor['resources'])
for res in dp.descriptor['resources']:
if self.matcher.match(res['name']):
for field in res['schema']['fields']:
if field['name'] == self.name:
field.update(self.options)
added = True
break
assert added, 'Failed to find field {} in schema'.format(self.name)
return dp
14 changes: 8 additions & 6 deletions dataflows/processors/sort_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ def _sorter(rows, key_calc, reverse, batch_size):


def sort_rows(key, resources=None, reverse=False, batch_size=1000):
matcher = ResourceMatcher(resources)
key_calc = KeyCalc(key)

def func(rows):
if matcher.match(rows.res.name):
yield from _sorter(rows, key_calc, reverse, batch_size)
else:
yield from rows
def func(package):
matcher = ResourceMatcher(resources, package.pkg)
yield package.pkg
for rows in package:
if matcher.match(rows.res.name):
yield _sorter(rows, key_calc, reverse, batch_size)
else:
yield rows

return func
3 changes: 1 addition & 2 deletions dataflows/processors/unpivot.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ def unpivot_rows(rows, fields_to_unpivot, fields_to_keep, extra_value):

def unpivot(unpivot_fields, extra_keys, extra_value, resources=None):

matcher = ResourceMatcher(resources)

def func(package):

matcher = ResourceMatcher(resources, package.pkg)
unpivot_fields_without_regex = []
for resource in package.pkg.descriptor['resources']:
name = resource['name']
Expand Down
9 changes: 4 additions & 5 deletions dataflows/processors/update_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@
from dataflows.helpers.resource_matcher import ResourceMatcher


def update_resource(resource, **props):

resources = ResourceMatcher(resource)
def update_resource(resources, **props):

def func(package: PackageWrapper):
matcher = ResourceMatcher(resources, package.pkg)
for resource in package.pkg.descriptor['resources']:
if resources.match(resource['name']):
if matcher.match(resource['name']):
resource.update(props)
yield package.pkg

res_iter = iter(package)
for r in res_iter:
if resources.match(r.res.name):
if matcher.match(r.res.name):
yield r.it
else:
yield r
Expand Down
19 changes: 19 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,22 @@ def test_update_resource():
assert dp.descriptor['resources'][0]['source'] == 'thewild'
assert dp.descriptor['resources'][2]['source'] == 'thewild'
assert dp.descriptor['resources'][4]['source'] == 'thewild'

def test_set_type_resources():
from dataflows import Flow, set_type, validate

f = Flow(
[dict(a=str(i)) for i in range(10)],
[dict(b=str(i)) for i in range(10)],
[dict(c=str(i)) for i in range(10)],
set_type('a', resources='res_[1]', type='integer'),
set_type('b', resources=['res_2'], type='integer'),
set_type('c', resources=-1, type='integer'),
validate()
)
results, dp, stats = f.results()
print(dp.descriptor)
assert results[0][1]['a'] == 1
assert results[1][3]['b'] == 3
assert results[2][8]['c'] == 8

0 comments on commit 2d88781

Please sign in to comment.