diff --git a/PROCESSORS.md b/PROCESSORS.md index 095c39d..85ebd3e 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -635,7 +635,7 @@ This mode is called _deduplication_ mode - The target resource will be created a def join(source_name, source_key, target_name, target_key, fields={}, full=True, source_delete=True): pass -def join_self(source_name, source_key, target_name, fields): +def join_with_self(resource_name, join_key, fields): pass ``` @@ -646,10 +646,12 @@ def join_self(source_name, source_key, target_name, fields): - `source_delete` - delete source from data-package after joining (`True` by default) - `target_name` - name of the _target_ resource to hold the joined data. -- `target_key` - as in `source_key` +- `target_key`, `join_key` - as in `source_key` - `fields` - mapping of fields from the source resource to the target resource. - Keys should be field names in the target resource. + Keys should be field names in the target resource. + You can use the special catchall key `*` which will apply for all fields in the source which were not specifically mentioned. + Values can define two attributes: - `name` - field name in the source (by default is the same as the target field name) @@ -693,3 +695,101 @@ def join_self(source_name, source_key, target_name, fields): - if `False`, failed lookups in the source will result in dropping the row from the target. _Important: the "source" resource **must** appear before the "target" resource in the data-package._ + +**Examples:** + +With these two sources: + +`characters`: +first_name | house |last_name |age +------------|----------|-----------|---------- +Jaime |Lannister |Lannister |34 +Tyrion |Lannister |Lannister |27 +Cersei |Lannister |Lannister |34 +Jon |Stark |Snow |17 +Sansa |Stark |Stark |14 +Rickon |Stark |Stark |5 +Arya |Stark |Stark |11 +Bran |Stark |Stark |10 +Daenerys |Targaryen |Targaryen |16 + +`houses`: +|house +|------------------ +|House of Lannister +|House of Greyjoy +|House of Stark +|House of Targaryen +|House of Martell +|House of Tyrell + +*Joining two resources*: +```python +Flow(#... + join( + 'characters', + 'House of {house}', # Note we need to format the join keys so they match + 'houses', + '{house}', + dict( + max_age={ + 'name': 'age', + 'aggregate': 'max' + }, + avg_age={ + 'name': 'age', + 'aggregate': 'avg' + }, + representative={ + 'name': 'first_name', + 'aggregate': 'last' + }, + representative_age={ + 'name': 'age' + }, + number_of_characters={ + 'aggregate': 'count' + }, + last_names={ + 'name': 'last_name', + 'aggregate': 'counters' + } + ), + False, # Don't do a full join (i.e. discard houses which have no characters) + True # Remove the source=characters resource from the output + ) +) +``` + +Output: +house | avg_age | last_names | max_age | number_of_characters | representative | representative_age +--------------------|----------|---------------------------|----------|----------------------|----------------|-------------------- +House of Lannister | 31.6667 | [('Lannister', 3)] | 34 | 3 | Cersei | 34 +House of Stark | 11.4 | [('Stark', 4), ('Snow', 1)] | 17 | 5 | Bran | 10 +House of Targaryen | 16 | [('Targaryen', 1)] | 16 | 1 | Daenerys | 16 + +*Self-Joining a resource (find the youngest member of each house)*: +```python +Flow(#... + sort_rows('{age:02}'), + join_with_self( + 'characters', + '{house}', + { + 'the_house': { + 'name': 'house' + }, + '*': { + 'aggregate': 'first' + }, + } + ), +) +``` + +Output: +age|first_name |last_name |the_house +----------|------------|-----------|----------- +27|Tyrion |Lannister |Lannister +5|Rickon |Stark |Stark +16|Daenerys |Targaryen |Targaryen \ No newline at end of file diff --git a/dataflows/VERSION b/dataflows/VERSION index a24809a..50f402a 100644 --- a/dataflows/VERSION +++ b/dataflows/VERSION @@ -1 +1 @@ -0.0.48 +0.0.49 diff --git a/dataflows/processors/__init__.py b/dataflows/processors/__init__.py index 2b28afb..7d78245 100644 --- a/dataflows/processors/__init__.py +++ b/dataflows/processors/__init__.py @@ -12,7 +12,7 @@ from .duplicate import duplicate from .filter_rows import filter_rows from .find_replace import find_replace -from .join import join, join_self +from .join import join, join_self, join_with_self from .select_fields import select_fields from .set_primary_key import set_primary_key from .sort_rows import sort_rows diff --git a/dataflows/processors/join.py b/dataflows/processors/join.py index 48df4a3..9e0f6b0 100644 --- a/dataflows/processors/join.py +++ b/dataflows/processors/join.py @@ -130,6 +130,17 @@ def fix_fields(fields): return fields +def expand_fields(fields, schema_fields): + if '*' in fields: + existing_names = set(f['name'] for f in fields.values()) + spec = fields.pop('*') + for sf in schema_fields: + sf_name = sf['name'] + if sf_name not in existing_names: + fields[sf_name] = copy.deepcopy(spec) + fields[sf_name]['name'] = sf_name + + def concatenator(resources, all_target_fields, field_mapping): for resource_ in resources: for row in resource_: @@ -241,8 +252,7 @@ def process_target_resource(source_spec, resource): if data_type is None: try: source_field = \ - next(filter(lambda f, spec_=spec: - f['name'] == spec_['name'], + next(filter(lambda f: f['name'] == spec['name'], source_spec['schema']['fields'])) except StopIteration: raise KeyError('Failed to find field with name %s in resource %s' % @@ -282,6 +292,7 @@ def process_datapackage(datapackage): if resource['name'] == source_name: source_spec = resource + expand_fields(fields, source_spec.get('schema', {}).get('fields', [])) if not source_delete: new_resources.append(resource) if deduplication: @@ -317,5 +328,10 @@ def join(source_name, source_key, target_name, target_key, fields={}, full=True, return join_aux(source_name, source_key, source_delete, target_name, target_key, fields, full) +def join_with_self(resource_name, join_key, fields): + return join_aux(resource_name, join_key, True, resource_name, None, fields, True) + + def join_self(source_name, source_key, target_name, fields): + raise DeprecationWarning('join_self is being deprecated, use join_with_self instead') return join_aux(source_name, source_key, True, target_name, None, fields, True) diff --git a/tests/test_lib.py b/tests/test_lib.py index 49700fd..62eeb36 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -133,15 +133,17 @@ def test_unpivot(): data, unpivot( [ - dict(name='x', - keys=dict( - field='x-value' - ) + dict( + name='x', + keys=dict( + field='x-value' + ) ), - dict(name='y', - keys=dict( - field='y-value' - ) + dict( + name='y', + keys=dict( + field='y-value' + ) ), ], [ @@ -158,7 +160,7 @@ def test_unpivot(): ) results, _, _ = f.results() assert results[0] == [ - dict(zip(['field', 'the-value'], r)) + dict(zip(['field', 'the-value'], r)) for r in [ ['x-value', 1], @@ -259,7 +261,7 @@ def test_unpivot_any_resources(): def test_concatenate(): from dataflows import concatenate - + f = Flow( [ {'a': 1, 'b': 2}, @@ -302,14 +304,14 @@ def test_filter_rows(): filter_rows(not_equals=[dict(b=3)]), ) results, _, _ = f.results() - assert results[0][0] == dict(a = 1, b = 4) + assert results[0][0] == dict(a=1, b=4) assert len(results[0]) == 1 assert len(results) == 1 def test_sort_rows(): from dataflows import sort_rows - + f = Flow( [ {'a': 1, 'b': 3}, @@ -332,7 +334,7 @@ def test_sort_reverse_many_rows(): from dataflows import sort_rows f = Flow( - ({'a': i, 'b': i%5} for i in range(1000)), + ({'a': i, 'b': i % 5} for i in range(1000)), sort_rows(key='{b}{a}', reverse=True, batch_size=0), ) results, _, _ = f.results() @@ -343,7 +345,7 @@ def test_sort_reverse_many_rows(): def test_duplicate(): from dataflows import duplicate - + a = [ {'a': 1, 'b': 3}, {'a': 2, 'b': 3}, @@ -415,7 +417,7 @@ def test_load_from_package(): def test_load_from_env_var(): import os from dataflows import load, dump_to_path - + Flow( [{'foo': 'bar'}], dump_to_path('data/load_from_package') @@ -447,7 +449,6 @@ def test_load_from_package_resource_matching(): assert [list(res) for res in ds.res_iter] == [[{'foo': 'baz'}]] - def test_load_from_package_resources(): from dataflows import load @@ -513,7 +514,6 @@ def test_load_from_checkpoint(): ).results()[0] == [[{'foo': 'bar'}]] - def test_update_resource(): from dataflows import Flow, printer, update_resource @@ -545,7 +545,7 @@ def test_set_type_resources(): validate() ) results, dp, stats = f.results() - print(dp.descriptor) + print(dp.descriptor) assert results[0][1]['a'] == 1 assert results[1][3]['b'] == 3 assert results[2][8]['c'] == 8.0 @@ -615,7 +615,7 @@ def test_dump_to_path_use_titles(): def test_load_dates(): - from dateutil.tz import tzutc + # from dateutil.tz import tzutc from dataflows import Flow, dump_to_path, load, set_type, ValidationError import datetime @@ -672,11 +672,11 @@ def test_load_dates_timezones(): results = Flow( checkpoint('test_load_dates_timezones') ).results() - + assert list(map(lambda x: x['date'], results[0][0])) == \ - list(map(lambda x: x.date(), dates)) + list(map(lambda x: x.date(), dates)) assert list(map(lambda x: x['datetime'], results[0][0])) == \ - list(map(lambda x: x, dates)) + list(map(lambda x: x, dates)) def test_add_field(): @@ -689,86 +689,105 @@ def test_add_field(): ) results, dp, _ = f.results() assert results == [[ - {'a': 0, 'b': 'b', 'c': None, 'd': None}, - {'a': 1, 'b': 'b', 'c': None, 'd': None}, + {'a': 0, 'b': 'b', 'c': None, 'd': None}, + {'a': 1, 'b': 'b', 'c': None, 'd': None}, {'a': 2, 'b': 'b', 'c': None, 'd': None} ]] assert dp.descriptor == \ - {'profile': 'data-package', - 'resources': [{'name': 'res_1', - 'path': 'res_1.csv', - 'profile': 'tabular-data-resource', - 'schema': {'fields': [{'format': 'default', - 'name': 'a', - 'type': 'integer'}, - {'format': 'default', - 'name': 'b', - 'type': 'string'}, - {'format': 'default', - 'name': 'c', - 'type': 'number'}, - {'format': 'default', - 'name': 'd', - 'title': 'mybool', - 'type': 'boolean'}], - 'missingValues': ['']}}]} + { + 'profile': 'data-package', + 'resources': [ + { + 'name': 'res_1', + 'path': 'res_1.csv', + 'profile': 'tabular-data-resource', + 'schema': { + 'fields': [ + { + 'format': 'default', + 'name': 'a', + 'type': 'integer' + }, + { + 'format': 'default', + 'name': 'b', + 'type': 'string' + }, + { + 'format': 'default', + 'name': 'c', + 'type': 'number' + }, + { + 'format': 'default', + 'name': 'd', + 'title': 'mybool', + 'type': 'boolean' + } + ], + 'missingValues': [''] + } + } + ] + } def test_load_empty_headers(): - from dataflows import Flow, load, printer + from dataflows import Flow, load def ensure_type(t): def func(row): assert isinstance(row['a'], t) return func - results, dp, stats = Flow(load('data/empty_headers.csv'), + results, dp, stats = Flow(load('data/empty_headers.csv'), ensure_type(str)).results() assert results[0] == [ - {'a': 1, 'b': 2}, - {'a': 2, 'b': 3}, - {'a': 3, 'b': 4}, + {'a': 1, 'b': 2}, + {'a': 2, 'b': 3}, + {'a': 3, 'b': 4}, {'a': 5, 'b': 6} ] assert len(dp.resources[0].schema.fields) == 2 - results, dp, stats = Flow(load('data/empty_headers.csv', validate=True), + results, dp, stats = Flow(load('data/empty_headers.csv', validate=True), ensure_type(int)).results() assert results[0] == [ - {'a': 1, 'b': 2}, - {'a': 2, 'b': 3}, - {'a': 3, 'b': 4}, + {'a': 1, 'b': 2}, + {'a': 2, 'b': 3}, + {'a': 3, 'b': 4}, {'a': 5, 'b': 6} ] - results, dp, stats = Flow(load('data/empty_headers.csv', force_strings=True), + results, dp, stats = Flow(load('data/empty_headers.csv', force_strings=True), ensure_type(str)).results() assert results[0] == [ - {'a': '1', 'b': '2'}, - {'a': '2', 'b': '3'}, - {'a': '3', 'b': '4'}, + {'a': '1', 'b': '2'}, + {'a': '2', 'b': '3'}, + {'a': '3', 'b': '4'}, {'a': '5', 'b': '6'} ] assert len(dp.resources[0].schema.fields) == 2 - results, dp, stats = Flow(load('data/empty_headers.csv', force_strings=True, validate=True), + results, dp, stats = Flow(load('data/empty_headers.csv', force_strings=True, validate=True), ensure_type(str)).results() assert results[0] == [ - {'a': '1', 'b': '2'}, - {'a': '2', 'b': '3'}, - {'a': '3', 'b': '4'}, + {'a': '1', 'b': '2'}, + {'a': '2', 'b': '3'}, + {'a': '3', 'b': '4'}, {'a': '5', 'b': '6'} ] assert len(dp.resources[0].schema.fields) == 2 + def test_load_xml(): from dataflows import Flow, load results, dp, stats = Flow(load('data/sample.xml')).results() assert results[0] == [ - {'publication-year': 1954, 'title': 'The Fellowship of the Ring'}, - {'publication-year': 1954, 'title': 'The Two Towers'}, + {'publication-year': 1954, 'title': 'The Fellowship of the Ring'}, + {'publication-year': 1954, 'title': 'The Two Towers'}, {'publication-year': 1955, 'title': 'The Return of the King'} ] @@ -788,7 +807,7 @@ def test_save_load_dates(): load('data/test_save_load_dates/datapackage.json'), printer() ).results() - + def test_stream_simple(): from dataflows import stream, unstream @@ -881,3 +900,136 @@ def __call__(self, name, row, i, e): assert len(res[0]) == 3 assert handler.bad_row == {'a': 4, 'b': 'a'} assert handler.bad_index == 3 + + +def test_join(): + from dataflows import Flow, join, join_with_self, set_type, sort_rows + from decimal import Decimal + + characters = [ + {'first_name': 'Jaime', 'house': 'Lannister', 'last_name': 'Lannister', 'age': 34}, + {'first_name': 'Tyrion', 'house': 'Lannister', 'last_name': 'Lannister', 'age': 27}, + {'first_name': 'Cersei', 'house': 'Lannister', 'last_name': 'Lannister', 'age': 34}, + {'first_name': 'Jon', 'house': 'Stark', 'last_name': 'Snow', 'age': 17}, + {'first_name': 'Sansa', 'house': 'Stark', 'last_name': 'Stark', 'age': 14}, + {'first_name': 'Rickon', 'house': 'Stark', 'last_name': 'Stark', 'age': 5}, + {'first_name': 'Arya', 'house': 'Stark', 'last_name': 'Stark', 'age': 11}, + {'first_name': 'Bran', 'house': 'Stark', 'last_name': 'Stark', 'age': 10}, + {'first_name': 'Daenerys', 'house': 'Targaryen', 'last_name': 'Targaryen', 'age': 16}, + ] + + houses = [ + {'house': 'House of Lannister'}, + {'house': 'House of Greyjoy'}, + {'house': 'House of Stark'}, + {'house': 'House of Targaryen'}, + {'house': 'House of Martell'}, + {'house': 'House of Tyrell'}, + ] + + res, _, _ = Flow( + characters, + set_type('age', type='number'), + houses, + join( + 'res_1', + 'House of {house}', + 'res_2', + '{house}', + dict( + max_age={ + 'name': 'age', + 'aggregate': 'max' + }, + avg_age={ + 'name': 'age', + 'aggregate': 'avg' + }, + representative={ + 'name': 'first_name', + 'aggregate': 'last' + }, + representative_age={ + 'name': 'age' + }, + number_of_characters={ + 'aggregate': 'count' + }, + last_names={ + 'name': 'last_name', + 'aggregate': 'counters' + } + ), False, True + ) + ).results() + + assert res[0] == [ + { + 'avg_age': Decimal('31.66666666666666666666666667'), + 'house': 'House of Lannister', + 'max_age': Decimal(34), + 'number_of_characters': 3, + 'representative': 'Cersei', + 'representative_age': Decimal(34), + 'last_names': [('Lannister', 3)] + }, + { + 'avg_age': Decimal('11.4'), + 'house': 'House of Stark', + 'max_age': Decimal(17), + 'number_of_characters': 5, + 'representative': 'Bran', + 'representative_age': Decimal(10), + 'last_names': [('Stark', 4), ('Snow', 1)] + }, + { + 'avg_age': Decimal(16), + 'house': 'House of Targaryen', + 'max_age': Decimal(16), + 'number_of_characters': 1, + 'representative': 'Daenerys', + 'representative_age': Decimal(16), + 'last_names': [('Targaryen', 1)] + }, + ] + + # Find youngest of each house + res, _, _ = Flow( + characters, + set_type('age', type='number'), + sort_rows('{age:02}'), + join_with_self( + 'res_1', + '{house}', + { + 'the_house': { + 'name': 'house' + }, + '*': { + 'aggregate': 'first' + }, + } + ), + sort_rows('{the_house}') + ).results() + + assert res[0] == [ + { + 'the_house': 'Lannister', + 'first_name': 'Tyrion', + 'last_name': 'Lannister', + 'age': Decimal('27') + }, + { + 'the_house': 'Stark', + 'first_name': 'Rickon', + 'last_name': 'Stark', + 'age': Decimal('5') + }, + { + 'the_house': 'Targaryen', + 'first_name': 'Daenerys', + 'last_name': 'Targaryen', + 'age': Decimal('16') + } + ]