Skip to content

Commit

Permalink
Fix full-outer join adding extraneous fields (#165)
Browse files Browse the repository at this point in the history
* Fix issue with join and differently named keys

* Add test

* Fix full-outer join adding extraneous fields

Co-authored-by: Conrad S <conrad.schloer@gmail.com>
  • Loading branch information
akariv and cschloer committed Aug 23, 2021
1 parent 5c04422 commit 400b96f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
11 changes: 7 additions & 4 deletions dataflows/processors/join.py
Expand Up @@ -214,8 +214,7 @@ def indexer(resource):
elif field not in current:
current[field] = None
if mode == 'full-outer':
for field in source_key.key_list:
current[field] = row.get(field)
current['__key__'] = [row.get(field) for field in source_key.key_list]
db.set(key, current)
db_keys_usage.set(key, False)
yield row
Expand Down Expand Up @@ -258,11 +257,15 @@ def process_target(resource):
# Creates extra by key
def create_extra_by_key(key):
extra = db.get(key)
extra.update(dict(
key = extra.pop('__key__', None)
extra = dict(
(k, AGGREGATORS[fields[k]['aggregate']].finaliser(v))
for k, v in extra.items()
if k in fields
))
)
if key:
for k, v in zip(target_key.key_list, key):
extra[k] = v
return extra

# Yields the new resources
Expand Down
33 changes: 33 additions & 0 deletions tests/test_edge_cases.py
Expand Up @@ -100,3 +100,36 @@ def func():
assert str(excinfo.value.cause) == 'custom-iterable-error'
assert excinfo.value.processor_name == 'iterable_loader'
assert excinfo.value.processor_position == 1

def test_fullouter_join_dump_different_keys():
from dataflows import Flow, join, dump_to_path
from decimal import Decimal

data1 = [
{"col1": 1.531, "col2": "hello"},
{"col1": 1.132, "col2": "goodbye"},
]
data2 = [
{"colA": 1.531, "colB": "123"},
{"colA": 1.132, "colB": 1.132},
]
f = Flow(
data1,
data2,
join(
"res_1",
["col1"],
"res_2",
["colA"],
{"col2": {"name": "col2", "aggregate": "first"}},
mode="full-outer"
),
dump_to_path(out_path='out/test_join_dump'),
)
results = f.results()[0][0]
assert results == [
{'colA': Decimal('1.531'), 'col2': 'hello', 'colB': '123'},
{'colA': Decimal('1.132'), 'col2': 'goodbye', 'colB': 1.132},
]


0 comments on commit 400b96f

Please sign in to comment.