Skip to content

Commit

Permalink
v0.0.49 - Improves join docs, adds '*' catchall
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Apr 28, 2019
1 parent d2455df commit 6c29d7d
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 69 deletions.
106 changes: 103 additions & 3 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ This mode is called _deduplication_ mode - The target resource will be created a
def join(source_name, source_key, target_name, target_key, fields={}, full=True, source_delete=True):
pass

def join_self(source_name, source_key, target_name, fields):
def join_with_self(resource_name, join_key, fields):
pass
```

Expand All @@ -646,10 +646,12 @@ def join_self(source_name, source_key, target_name, fields):
- `source_delete` - delete source from data-package after joining (`True` by default)

- `target_name` - name of the _target_ resource to hold the joined data.
- `target_key` - as in `source_key`
- `target_key`, `join_key` - as in `source_key`

- `fields` - mapping of fields from the source resource to the target resource.
Keys should be field names in the target resource.
Keys should be field names in the target resource.
You can use the special catchall key `*` which will apply for all fields in the source which were not specifically mentioned.

Values can define two attributes:
- `name` - field name in the source (by default is the same as the target field name)

Expand Down Expand Up @@ -693,3 +695,101 @@ def join_self(source_name, source_key, target_name, fields):
- if `False`, failed lookups in the source will result in dropping the row from the target.

_Important: the "source" resource **must** appear before the "target" resource in the data-package._

**Examples:**

With these two sources:

`characters`:
first_name | house |last_name |age
------------|----------|-----------|----------
Jaime |Lannister |Lannister |34
Tyrion |Lannister |Lannister |27
Cersei |Lannister |Lannister |34
Jon |Stark |Snow |17
Sansa |Stark |Stark |14
Rickon |Stark |Stark |5
Arya |Stark |Stark |11
Bran |Stark |Stark |10
Daenerys |Targaryen |Targaryen |16

`houses`:
|house
|------------------
|House of Lannister
|House of Greyjoy
|House of Stark
|House of Targaryen
|House of Martell
|House of Tyrell

*Joining two resources*:
```python
Flow(#...
join(
'characters',
'House of {house}', # Note we need to format the join keys so they match
'houses',
'{house}',
dict(
max_age={
'name': 'age',
'aggregate': 'max'
},
avg_age={
'name': 'age',
'aggregate': 'avg'
},
representative={
'name': 'first_name',
'aggregate': 'last'
},
representative_age={
'name': 'age'
},
number_of_characters={
'aggregate': 'count'
},
last_names={
'name': 'last_name',
'aggregate': 'counters'
}
),
False, # Don't do a full join (i.e. discard houses which have no characters)
True # Remove the source=characters resource from the output
)
)
```

Output:
house | avg_age | last_names | max_age | number_of_characters | representative | representative_age
--------------------|----------|---------------------------|----------|----------------------|----------------|--------------------
House of Lannister | 31.6667 | [('Lannister', 3)] | 34 | 3 | Cersei | 34
House of Stark | 11.4 | [('Stark', 4), ('Snow', 1)] | 17 | 5 | Bran | 10
House of Targaryen | 16 | [('Targaryen', 1)] | 16 | 1 | Daenerys | 16

*Self-Joining a resource (find the youngest member of each house)*:
```python
Flow(#...
sort_rows('{age:02}'),
join_with_self(
'characters',
'{house}',
{
'the_house': {
'name': 'house'
},
'*': {
'aggregate': 'first'
},
}
),
)
```

Output:
age|first_name |last_name |the_house
----------|------------|-----------|-----------
27|Tyrion |Lannister |Lannister
5|Rickon |Stark |Stark
16|Daenerys |Targaryen |Targaryen
2 changes: 1 addition & 1 deletion dataflows/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.48
0.0.49
2 changes: 1 addition & 1 deletion dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .duplicate import duplicate
from .filter_rows import filter_rows
from .find_replace import find_replace
from .join import join, join_self
from .join import join, join_self, join_with_self
from .select_fields import select_fields
from .set_primary_key import set_primary_key
from .sort_rows import sort_rows
Expand Down
20 changes: 18 additions & 2 deletions dataflows/processors/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ def fix_fields(fields):
return fields


def expand_fields(fields, schema_fields):
if '*' in fields:
existing_names = set(f['name'] for f in fields.values())
spec = fields.pop('*')
for sf in schema_fields:
sf_name = sf['name']
if sf_name not in existing_names:
fields[sf_name] = copy.deepcopy(spec)
fields[sf_name]['name'] = sf_name


def concatenator(resources, all_target_fields, field_mapping):
for resource_ in resources:
for row in resource_:
Expand Down Expand Up @@ -241,8 +252,7 @@ def process_target_resource(source_spec, resource):
if data_type is None:
try:
source_field = \
next(filter(lambda f, spec_=spec:
f['name'] == spec_['name'],
next(filter(lambda f: f['name'] == spec['name'],
source_spec['schema']['fields']))
except StopIteration:
raise KeyError('Failed to find field with name %s in resource %s' %
Expand Down Expand Up @@ -282,6 +292,7 @@ def process_datapackage(datapackage):

if resource['name'] == source_name:
source_spec = resource
expand_fields(fields, source_spec.get('schema', {}).get('fields', []))
if not source_delete:
new_resources.append(resource)
if deduplication:
Expand Down Expand Up @@ -317,5 +328,10 @@ def join(source_name, source_key, target_name, target_key, fields={}, full=True,
return join_aux(source_name, source_key, source_delete, target_name, target_key, fields, full)


def join_with_self(resource_name, join_key, fields):
return join_aux(resource_name, join_key, True, resource_name, None, fields, True)


def join_self(source_name, source_key, target_name, fields):
raise DeprecationWarning('join_self is being deprecated, use join_with_self instead')
return join_aux(source_name, source_key, True, target_name, None, fields, True)
Loading

0 comments on commit 6c29d7d

Please sign in to comment.