From 400b96f3bbaff8092f847e1eaa04ac34db42e031 Mon Sep 17 00:00:00 2001 From: Adam Kariv Date: Mon, 23 Aug 2021 21:39:36 +0300 Subject: [PATCH] Fix full-outer join adding extraneous fields (#165) * Fix issue with join and differently named keys * Add test * Fix full-outer join adding extraneous fields Co-authored-by: Conrad S --- dataflows/processors/join.py | 11 +++++++---- tests/test_edge_cases.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/dataflows/processors/join.py b/dataflows/processors/join.py index 37dbd00..48d7794 100644 --- a/dataflows/processors/join.py +++ b/dataflows/processors/join.py @@ -214,8 +214,7 @@ def indexer(resource): elif field not in current: current[field] = None if mode == 'full-outer': - for field in source_key.key_list: - current[field] = row.get(field) + current['__key__'] = [row.get(field) for field in source_key.key_list] db.set(key, current) db_keys_usage.set(key, False) yield row @@ -258,11 +257,15 @@ def process_target(resource): # Creates extra by key def create_extra_by_key(key): extra = db.get(key) - extra.update(dict( + key = extra.pop('__key__', None) + extra = dict( (k, AGGREGATORS[fields[k]['aggregate']].finaliser(v)) for k, v in extra.items() if k in fields - )) + ) + if key: + for k, v in zip(target_key.key_list, key): + extra[k] = v return extra # Yields the new resources diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py index 3632a61..9b92dcc 100644 --- a/tests/test_edge_cases.py +++ b/tests/test_edge_cases.py @@ -100,3 +100,36 @@ def func(): assert str(excinfo.value.cause) == 'custom-iterable-error' assert excinfo.value.processor_name == 'iterable_loader' assert excinfo.value.processor_position == 1 + +def test_fullouter_join_dump_different_keys(): + from dataflows import Flow, join, dump_to_path + from decimal import Decimal + + data1 = [ + {"col1": 1.531, "col2": "hello"}, + {"col1": 1.132, "col2": "goodbye"}, + ] + data2 = [ + {"colA": 1.531, "colB": "123"}, + {"colA": 1.132, "colB": 1.132}, + ] + f = Flow( + data1, + data2, + join( + "res_1", + ["col1"], + "res_2", + ["colA"], + {"col2": {"name": "col2", "aggregate": "first"}}, + mode="full-outer" + ), + dump_to_path(out_path='out/test_join_dump'), + ) + results = f.results()[0][0] + assert results == [ + {'colA': Decimal('1.531'), 'col2': 'hello', 'colB': '123'}, + {'colA': Decimal('1.132'), 'col2': 'goodbye', 'colB': 1.132}, + ] + +