Skip to content

Commit

Permalink
Fix full-outer join adding extraneous fields
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Aug 23, 2021
1 parent ab6ff49 commit da5aa54
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
7 changes: 5 additions & 2 deletions dataflows/processors/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ def indexer(resource):
elif field not in current:
current[field] = None
if mode == 'full-outer':
for field in source_key.key_list:
current[field] = row.get(field)
current['__key__'] = [row.get(field) for field in source_key.key_list]
db.set(key, current)
db_keys_usage.set(key, False)
yield row
Expand Down Expand Up @@ -258,11 +257,15 @@ def process_target(resource):
# Creates extra by key
def create_extra_by_key(key):
extra = db.get(key)
key = extra.pop('__key__', None)
extra = dict(
(k, AGGREGATORS[fields[k]['aggregate']].finaliser(v))
for k, v in extra.items()
if k in fields
)
if key:
for k, v in zip(target_key.key_list, key):
extra[k] = v
return extra

# Yields the new resources
Expand Down
8 changes: 7 additions & 1 deletion tests/test_edge_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def func():

def test_fullouter_join_dump_different_keys():
from dataflows import Flow, join, dump_to_path
from decimal import Decimal

data1 = [
{"col1": 1.531, "col2": "hello"},
{"col1": 1.132, "col2": "goodbye"},
Expand All @@ -124,6 +126,10 @@ def test_fullouter_join_dump_different_keys():
),
dump_to_path(out_path='out/test_join_dump'),
)
f.results()
results = f.results()[0][0]
assert results == [
{'colA': Decimal('1.531'), 'col2': 'hello', 'colB': '123'},
{'colA': Decimal('1.132'), 'col2': 'goodbye', 'colB': 1.132},
]


0 comments on commit da5aa54

Please sign in to comment.