Skip to content

Commit

Permalink
Improvements to add_computed_fields, allow custom functions as data s…
Browse files Browse the repository at this point in the history
…ource (#78)

* Add custom functions to add_computed_field (+ some better interface and beter documentation)

* Rewrite add_field with add_computed_field, allow custom callables

* Fix tests
  • Loading branch information
akariv committed Apr 6, 2019
1 parent 406de14 commit d25aec6
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 55 deletions.
42 changes: 32 additions & 10 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ def add_field(name, type, default=None, resources=None, **options):

- `name` - Name of field to add
- `type` - Type of field to add
- `default` - Default value to assign to the field
- `default` - Value to assign to the field
- can be a literal value to be added to all rows
- can be a callable which gets a row and returns a value for the newly added field
- `options` - Other properties of the newly added field
- `resources`
- A name of a resource to operate on
Expand Down Expand Up @@ -288,19 +290,22 @@ def delete_fields(fields, resources=None):
- The index of the resource in the package

#### add_computed_field
Add field(s) to streamed resources
Adds new fields whose values are based on existing columns.

`add_computed_field` accepts a list of resources and fields to add to existing resource. It will output the rows for each resource with new field(s) (columns) in it. `add_computed_field` allows to perform various operations before inserting value into targeted field.

Adds new fields whose values are based on existing columns
`add_computed_field` accepts one or more fields to add to existing resources. It will modify the schemas and add the new fields to processed resources. The content of the new fields can be a constant, based on other columns with some predefined operations or be dynamically calculated using a user-specified function.

```python
def add_computed_field(fields, resources=None):
def add_computed_field(*args, **kw, resources=None):
pass
```

- `fields` - List of actions to be performed on the targeted fields. Each list item is an object with the following keys:
- `operation`: operation to perform on values of pre-defined columns of the same row. available operations:
This function accepts a single field specification as keyword parameters, or a list of field specifications as a list of dicts in the first positional argument.

A new field specification has the following keys:
- `target` - can be the name of the new field, or a full field specification (dict with `name`, `type` and other fields)
- `operation`: operation to perform on values of pre-defined columns of the same row.
Can be a function that accepts a row and returns the value of the new field, or a string containing the name of a predefined operation.
Available operations:
- `constant` - add a constant value
- `sum` - summed value for given columns in a row.
- `avg` - average value from given columns in a row.
Expand All @@ -309,19 +314,36 @@ def add_computed_field(fields, resources=None):
- `multiply` - product of given columns in a row.
- `join` - joins two or more column values in a row.
- `format` - Python format string used to form the value Eg: `my name is {first_name}`.
- `target` - name of the new field.
- `source` - list of columns the operations should be performed on (Not required in case of `format` and `constant`).
- `with` - String passed to `constant`, `format` or `join` operations
- `with` (also accepts `with_`) - String passed to `constant`, `format` or `join` operations
- in `constant` - used as constant value
- in `format` - used as Python format string with existing column values Eg: `{first_name} {last_name}`
- in `join` - used as delimiter

- `resources`
- A name of a resource to operate on
- A regular expression matching resource names
- A list of resource names
- `None` indicates operation should be done on all resources
- The index of the resource in the package

Examples:
```python
Flow(
# ... adding single fields with built-in operations
add_computed_field(target='the-avg', operation='avg', source=['col-a', 'col-b']),
add_computed_field(target='the-sum', operation='sum', source=['col-a', 'col-b']),
# ... adding two fields in a single operation
add_computed_field([
dict(target='formatted', operation='format', with_='{col-a}-{col-b}'),
dict(target=dict(name='created', type='date'), operation='constant', with_=datetime.today()),
]),
# ... and with a custom function
add_computed_field(target=dict(name='power', type='integer'),
operation=lambda row: row['col-a'] ** row['col-b'],
resources='my-resource-name'),
)
```

#### find_replace.py
Look for specific patterns in specific fields and replace them with new data
Expand Down
65 changes: 45 additions & 20 deletions dataflows/processors/add_computed_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_type(res_fields, operation_fields, operation):
types = [f.get('type') for f in res_fields if f['name'] in operation_fields]
if 'any' in types:
return 'any'
if (operation == 'format') or (operation == 'join'):
if operation in ('format', 'join'):
return 'string'
if ('number' in types) or (operation == 'avg'):
return 'number'
Expand All @@ -37,35 +37,60 @@ def get_type(res_fields, operation_fields, operation):
def process_resource(fields, rows):
for row in rows:
for field in fields:
values = [
row.get(c)
for c in field.get('source', [])
if row.get(c) is not None
]
with_ = field.get('with', field.get('with_', ''))
new_col = AGGREGATORS[field['operation']].func(values, with_, row)
row[field['target']] = new_col
op = field['operation']
target = field['target']['name']
if isinstance(op, str):
values = [
row.get(c)
for c in field.get('source', [])
if row.get(c) is not None
]
with_ = field.get('with', field.get('with_', ''))
new_col = AGGREGATORS[op].func(values, with_, row)
row[target] = new_col
elif callable(op):
row[target] = op(row)
yield row


def add_computed_field(fields, resources=None):
def get_new_fields(resource, fields):
new_fields = []
for f in fields:
target = f['target']
if isinstance(target, str):
target = dict(
name=target,
type=get_type(resource['schema']['fields'],
f.get('source', []),
f['operation'])
)
new_fields.append(target)
elif isinstance:
new_fields.append(target)
return new_fields


def add_computed_field(*args, resources=None, **kw):

def func(package):
matcher = ResourceMatcher(resources, package.pkg)
assert len(args) < 2, 'add_computed_fields expects at most one positional argument'
if len(args) == 0:
fields = [kw]
elif len(args) == 1:
fields = args[0]

for resource in package.pkg.descriptor['resources']:
if matcher.match(resource['name']):
descriptor = resource
new_fields = [
{
'name': f['target'],
'type': get_type(descriptor['schema']['fields'],
f.get('source', []),
f['operation'])
} for f in fields
]
descriptor['schema']['fields'].extend(new_fields)
new_fields = get_new_fields(resource, fields)
resource['schema']['fields'].extend(new_fields)
yield package.pkg

for f in fields:
target = f['target']
if isinstance(target, str):
f['target'] = dict(name=target)

for resource in package:
if not matcher.match(resource.res.name):
yield resource
Expand Down
39 changes: 14 additions & 25 deletions dataflows/processors/add_field.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,17 @@
from dataflows.helpers.resource_matcher import ResourceMatcher


def column_adder(rows, k, v):
for row in rows:
row[k] = v
yield row
from .add_computed_field import add_computed_field


def add_field(name, type, default=None, resources=None, **options):

def func(package):
matcher = ResourceMatcher(resources, package.pkg)
for resource in package.pkg.descriptor['resources']:
if matcher.match(resource['name']):
resource['schema']['fields'].append(dict(
name=name,
type=type,
**options
))
yield package.pkg
for res in package:
if matcher.match(res.res.name):
yield column_adder(res, name, default)
else:
yield res

return func
return add_computed_field(
target=dict(
name=name,
type=type,
**options
),
resources=resources,
operation=(
default
if callable(default) else
(lambda row: default)
)
)
25 changes: 25 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ def test_add_computed_field():
assert f == ['a - 1', 'b - 2', 'c - 3']


def test_add_computed_field_func():
from dataflows import add_computed_field

data = [
dict(x=i) for i in range(3)
]

f = Flow(
data,
add_computed_field([
dict(target=dict(name='sq', type='integer'),
operation=lambda row: row['x'] ** 2),
dict(target='f', operation='format', with_='{x} - {x}')
])
)
results, *_ = f.results()
results = list(results[0])

assert results == [
dict(x=0, sq=0, f='0 - 0'),
dict(x=1, sq=1, f='1 - 1'),
dict(x=2, sq=4, f='2 - 2'),
]


def test_add_metadata():
from dataflows import add_metadata
f = Flow(
Expand Down

0 comments on commit d25aec6

Please sign in to comment.