Skip to content

Commit

Permalink
Adding the join processor (#18)
Browse files Browse the repository at this point in the history
* Adding the join processor

* Documentation

* Missing file

* Fix tests
  • Loading branch information
akariv committed Sep 13, 2018
1 parent a76f030 commit a9a94aa
Show file tree
Hide file tree
Showing 9 changed files with 465 additions and 11 deletions.
70 changes: 70 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,73 @@ def duplicate(source=None, target_name=None, target_path=None):
- `target_name` - Name of the new, duplicated resource.
- `target_path` - Path for the new, duplicated resource.

#### join.py
Joins two streamed resources.

"Joining" in our case means taking the *target* resource, and adding fields to each of its rows by looking up data in the _source_ resource.

A special case for the join operation is when there is no target stream, and all unique rows from the source are used to create it.
This mode is called _deduplication_ mode - The target resource will be created and de-duplicated rows from the source will be added to it.

```python
def join(source_name, source_key, target_name, target_key, fields={}, full=True, source_delete=True):
pass

def join_self(source_name, source_key, target_name, fields):
pass
```

- `source_name` - name of the _source_ resource
- `source_name` - One of
- List of field names which should be used as the lookup key
- String, which would be interpreted as a Python format string used to form the key (e.g. `{<field_name_1>}:{field_name_2}`)
- `source_delete` - delete source from data-package after joining (`True` by default)

- `target_name` - name of the _target_ resource to hold the joined data.
- `target_key` - as in `source_key`

- `fields` - mapping of fields from the source resource to the target resource.
Keys should be field names in the target resource.
Values can define two attributes:
- `name` - field name in the source (by default is the same as the target field name)

- `aggregate` - aggregation strategy (how to handle multiple _source_ rows with the same key). Can take the following options:
- `sum` - summarise aggregated values.
For numeric values it's the arithmetic sum, for strings the concatenation of strings and for other types will error.

- `avg` - calculate the average of aggregated values.

For numeric values it's the arithmetic average and for other types will err.

- `max` - calculate the maximum of aggregated values.

For numeric values it's the arithmetic maximum, for strings the dictionary maximum and for other types will error.

- `min` - calculate the minimum of aggregated values.

For numeric values it's the arithmetic minimum, for strings the dictionary minimum and for other types will error.

- `first` - take the first value encountered

- `last` - take the last value encountered

- `count` - count the number of occurrences of a specific key
For this method, specifying `name` is not required. In case it is specified, `count` will count the number of non-null values for that source field.

- `counters` - count the number of occurrences of distinct values
Will return an array of 2-tuples of the form `[value, count-of-value]`.

- `set` - collect all distinct values of the aggregated field, unordered

- `array` - collect all values of the aggregated field, in order of appearance

- `any` - pick any value.

By default, `aggregate` takes the `any` value.

If neither `name` or `aggregate` need to be specified, the mapping can map to the empty object `{}` or to `null`.
- `full` - Boolean,
- If `True` (the default), failed lookups in the source will result in "null" values at the source.
- if `False`, failed lookups in the source will result in dropping the row from the target.

_Important: the "source" resource **must** appear before the "target" resource in the data-package._
39 changes: 39 additions & 0 deletions TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,52 @@ In the next example we're removing an entire resource in a package processor - t

# -->
# double_winners/academy.csv contains:
#
# Year,Ceremony,Award,Winner,Name,Film
# 1931/1932,5,Actress,1,Helen Hayes,The Sin of Madelon Claudet
# 1932/1933,6,Actress,1,Katharine Hepburn,Morning Glory
# 1935,8,Actress,1,Bette Davis,Dangerous
# 1938,11,Actress,1,Bette Davis,Jezebel
# ...
```

This was a bit complicated, but luckily we have the `join`, `concatenate` and `filter_rows` processors which make such combinations a snap:

```python
from dataflows import Flow, load, dump_to_path, join, concatenate, filter_rows

f = Flow(
# Emmy award nominees and winners
load('emmy.csv', name='emmies'),
filter_rows(equals=[dict(winner=1)]),
concatenate(dict(
emmy_nominee=['nominee'],
),
dict(name='emmies_filtered'),
resources='emmies'),
# Academy award nominees and winners
load('academy.csv', encoding='utf8', name='oscars'),
join('emmies_filtered', ['emmy_nominee'], # Source resource
'oscars', ['Name'], # Target resource
full=False # Don't add new fields, remove unmatched rows
),
filter_rows(equals=[dict(Winner='1')]),
dump_to_path('double_winners')
)
_ = f.process()

# -->
# double_winners/oscars.csv contains:
#
# Year,Ceremony,Award,Winner,Name,Film
# 1931/1932,5,Actress,1,Helen Hayes,The Sin of Madelon Claudet
# 1932/1933,6,Actress,1,Katharine Hepburn,Morning Glory
# 1935,8,Actress,1,Bette Davis,Dangerous
# 1938,11,Actress,1,Bette Davis,Jezebel
# ...
```


## Builtin Processors

DataFlows comes with a few built-in processors which do most of the heavy lifting in many common scenarios, leaving you to implement only the minimum code that is specific to your specific problem.
Expand Down
2 changes: 1 addition & 1 deletion dataflows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .base import DataStream, DataStreamProcessor, schema_validator
from .base import DataStream, DataStreamProcessor, schema_validator, ValidationError
from .base import ResourceWrapper, PackageWrapper
from .base import Flow
from .processors import * # noqa
2 changes: 1 addition & 1 deletion dataflows/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from .resource_wrapper import ResourceWrapper
from .package_wrapper import PackageWrapper
from .flow import Flow
from .schema_validator import schema_validator
from .schema_validator import schema_validator, ValidationError
21 changes: 16 additions & 5 deletions dataflows/base/schema_validator.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@

import logging

from datapackage import Resource
from tableschema import Schema
from tableschema.exceptions import CastError


class ValidationError(Exception):

def __init__(self, resource_name, row, index, cast_error):
msg = '\nROW: %r\n----\n' % row
msg += '\n'.join('%d) %s' % (i+1, err)
for i, err
in enumerate(cast_error.errors))
super().__init__(msg)
self.resource_name = resource_name
self.row = row
self.index = index
self.cast_error = cast_error


def schema_validator(resource: Resource, iterator):
schema: Schema = resource.schema
field_names = [f.name for f in schema.fields]
warned_fields = set()
for row in iterator:
for i, row in enumerate(iterator):
to_cast = [row.get(f) for f in field_names]
try:
casted = schema.cast_row(to_cast)
row = dict(zip(field_names, casted))
except CastError as e:
msg = '\nROW: %r\n----\n' % row
msg += '\n'.join('%d) %s' % (i+1, err) for i, err in enumerate(e.errors))
raise ValueError(msg) from None
raise ValidationError(resource.name, row, i, e)

for k in set(row.keys()) - set(field_names):
if k not in warned_fields:
Expand Down
3 changes: 2 additions & 1 deletion dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

from .add_computed_field import add_computed_field
from .add_metadata import add_metadata
from .cache import cache
from .concatenate import concatenate
from .delete_fields import delete_fields
from .duplicate import duplicate
from .filter_rows import filter_rows
from .find_replace import find_replace
from .join import join, join_self
from .sort_rows import sort_rows
from .unpivot import unpivot
from .cache import cache
Loading

0 comments on commit a9a94aa

Please sign in to comment.