Skip to content

Commit

Permalink
Join on row number (#132)
Browse files Browse the repository at this point in the history
* Implemented POC

* Bump tabulator to fix tests

* Upgrade to tabulator@1.38.4

* Fixed linting/restart Travis

* Added to processors.md

* Added an example
  • Loading branch information
roll committed May 26, 2020
1 parent 3bdc37f commit 99f5215
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 8 deletions.
33 changes: 32 additions & 1 deletion PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def join_with_self(resource_name, join_key, fields):
- `source_name` - name of the _source_ resource
- `source_key` - One of
- List of field names which should be used as the lookup key
- String, which would be interpreted as a Python format string used to form the key (e.g. `{<field_name_1>}:{field_name_2}`)
- String, which would be interpreted as a Python format string used to form the key (e.g. `{<field_name_1>}:{field_name_2}`). It's possible to use `#` as a special field name to include a row number (startring from the first row after the headers row) e.g. `{#}:{field_name_2}`.
- `source_delete` - delete source from data-package after joining (`True` by default)

- `target_name` - name of the _target_ resource to hold the joined data.
Expand Down Expand Up @@ -934,3 +934,34 @@ age|first_name |last_name |the_house
27|Tyrion |Lannister |Lannister
5|Rickon |Stark |Stark
16|Daenerys |Targaryen |Targaryen

*Joining using row numbers*:
`source`:
| values |
|--------|
| value1 |
| value2 |

`target`:
| id | names |
|----|-------|
| 01 | name1 |
| 02 | name2 |

```python
Flow(#...
join(
source_name='source',
source_key=['#'],
target_name='target',
target_key=['#'],
fields={'values': {'name': 'values'}}
),
)
```

Output:
| id | names | values |
|----|-------|--------|
| 01 | name1 | value1 |
| 02 | name2 | value2 |
4 changes: 4 additions & 0 deletions data/cities_comment.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
city,comment
paris,city with population in row 2
london,city with population in row 1
rome,city with population in row 3
3 changes: 3 additions & 0 deletions data/names.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id,names
01,name1
02,name2
3 changes: 3 additions & 0 deletions data/values.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
values
value1
value2
12 changes: 6 additions & 6 deletions dataflows/processors/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(self, key_spec):
self.key_spec = key_spec
self.key_list = key_list

def __call__(self, row):
return self.key_spec.format(**row)
def __call__(self, row, row_number):
return self.key_spec.format(**{**row, '#': row_number})


# Aggregator helpers
Expand Down Expand Up @@ -183,8 +183,8 @@ def join_aux(source_name, source_key, source_delete, # noqa: C901

# Indexes the source data
def indexer(resource):
for row in resource:
key = source_key(row)
for row_number, row in enumerate(resource, start=1):
key = source_key(row, row_number)
try:
current = db.get(key)
except KeyError:
Expand Down Expand Up @@ -223,8 +223,8 @@ def process_target(resource):
))
yield row
else:
for row in resource:
key = target_key(row)
for row_number, row in enumerate(resource, start=1):
key = target_key(row, row_number)
try:
extra = create_extra_by_key(key)
db_keys_usage.set(key, True)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def read(*paths):
PACKAGE = 'dataflows'
NAME = PACKAGE.replace('_', '-')
INSTALL_REQUIRES = [
'tabulator>=1.23.0',
'tabulator>=1.38.4',
'datapackage>=1.5.0',
'tableschema>=1.5',
'kvfile>=0.0.8',
Expand Down
64 changes: 64 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,7 @@ def test_load_override_schema_and_fields():
{'name': None, 'age': '22'},
]]


def test_delete_fields_regex():
from dataflows import load, delete_fields
flow = Flow(
Expand All @@ -1433,6 +1434,7 @@ def test_delete_fields_regex():
{'city': 'rome'},
]]


def test_join_full_outer():
from dataflows import load, set_type, join
flow = Flow(
Expand All @@ -1456,6 +1458,68 @@ def test_join_full_outer():
]]


def test_join_row_number():
from dataflows import load, set_type, join
flow = Flow(
load('data/population.csv'),
load('data/cities.csv'),
join(
source_name='population',
source_key=['#'],
target_name='cities',
target_key=['#'],
fields={'population': {'name': 'population'}}
),
)
data = flow.results()[0]
assert data == [[
{'id': 1, 'city': 'london', 'population': 8},
{'id': 2, 'city': 'paris', 'population': 2},
{'id': 3, 'city': 'rome', 'population': 3},
]]


def test_join_row_number_readme_example():
from dataflows import load, set_type, join
flow = Flow(
load('data/values.csv'),
load('data/names.csv'),
join(
source_name='values',
source_key=['#'],
target_name='names',
target_key=['#'],
fields={'values': {'name': 'values'}}
),
)
data = flow.results()[0]
assert data == [[
{'id': 1, 'names': 'name1', 'values': 'value1'},
{'id': 2, 'names': 'name2', 'values': 'value2'},
]]


def test_join_row_number_format_string():
from dataflows import load, set_type, join
flow = Flow(
load('data/population.csv'),
load('data/cities_comment.csv'),
join(
source_name='population',
source_key='city with population in row {#}',
target_name='cities_comment',
target_key='{comment}',
fields={'population': {'name': 'population'}}
),
)
data = flow.results()[0]
assert data == [[
{'city': 'paris', 'population': 2, 'comment': 'city with population in row 2'},
{'city': 'london', 'population': 8, 'comment': 'city with population in row 1'},
{'city': 'rome', 'population': 3, 'comment': 'city with population in row 3'},
]]


def test_load_duplicate_headers():
from dataflows import load
flow = Flow(
Expand Down

0 comments on commit 99f5215

Please sign in to comment.