Skip to content

Commit

Permalink
Merge 26ba493 into 120bb50
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Mar 25, 2019
2 parents 120bb50 + 26ba493 commit 20daf34
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 0 deletions.
21 changes: 21 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin

### Manipulate row-by-row
- **add_field** - Adds a column to the data
- **select_fields** - Selects and keeps some columns in the data
- **delete_fields** - Removes some columns from the data
- **add_computed_field** - Adds new fields whose values are based on existing columns
- **find_replace** - Look for specific patterns in specific fields and replace them with new data
Expand Down Expand Up @@ -246,6 +247,26 @@ def add_field(name, type, default=None, resources=None, **options):
- `None` indicates operation should be done on all resources
- The index of the resource in the package

#### select_fields
Select fields (columns) in streamed resources and remove all other fields. Can also be used to reorder schema fields.

`select_fields` accepts a list of resources and list of fields to remove

_Note: if multiple resources provided, all of them should contain all fields to select_

```python
def select_fields(fields, resources=None):
pass
```

- `fields` - List of field (column) names to keep
- `resources`
- A name of a resource to operate on
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package

#### delete_fields
Delete fields (columns) from streamed resources

Expand Down
1 change: 1 addition & 0 deletions dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .filter_rows import filter_rows
from .find_replace import find_replace
from .join import join, join_self
from .select_fields import select_fields
from .set_primary_key import set_primary_key
from .sort_rows import sort_rows
from .stream import stream
Expand Down
45 changes: 45 additions & 0 deletions dataflows/processors/select_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from ..helpers.resource_matcher import ResourceMatcher


def process_resource(rows, fields):
fields = set(fields)
for row in rows:
row = dict(
(k, v)
for k, v in row.items()
if k in fields
)
yield row


def select_fields(fields, resources=None):

def func(package):
matcher = ResourceMatcher(resources, package.pkg)
dp_resources = package.pkg.descriptor.get('resources', [])
for resource in dp_resources:
if matcher.match(resource['name']):
dp_fields = resource['schema'].get('fields', [])
dp_fields = dict(
(f['name'], f)
for f in dp_fields
)
non_existing = [f for f in fields if f not in dp_fields]

assert len(non_existing) == 0, \
"Can't find following field(s): %s" % '; '.join(non_existing)

new_fields = [
dp_fields.get(f)
for f in fields
]
resource['schema']['fields'] = new_fields
yield package.pkg

for resource in package:
if not matcher.match(resource.res.name):
yield resource
else:
yield process_resource(resource, fields)

return func
13 changes: 13 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ def test_delete_field():
[dict(name='y', type='string', format='default')]


def test_select_field():
from dataflows import select_fields
f = Flow(
data,
select_fields(['y'])
)
results, dp, _ = f.results()
for i in results[0]:
assert list(i.keys()) == ['y']
assert dp.descriptor['resources'][0]['schema']['fields'] == \
[dict(name='y', type='string', format='default')]


def test_find_replace():
from dataflows import find_replace
f = Flow(
Expand Down

0 comments on commit 20daf34

Please sign in to comment.