Skip to content

Commit

Permalink
Merge 9143bc2 into 8998aa7
Browse files Browse the repository at this point in the history
  • Loading branch information
roll committed Feb 18, 2020
2 parents 8998aa7 + 9143bc2 commit 35fde15
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 0 deletions.
4 changes: 4 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ Relevant only when _not_ loading data from a datapackage:
- `load.CAST_DO_NOTHING` - Data will be passed as-is without modifications or validation
- `load.CAST_WITH_SCHEMA` - Data will be parsed and casted using the schema and will error in case of faulty data
- `override_schema` - Provided dictionary will be merged into the inferred schema. If `fields` key is set its contents will fully replace the inferred fields array. The same behavior will be applied for all other nested structures.
- `extract_missing_values (bool|dict)` - If `True` it will extract missing values defined in a schema and place in to a new field called `missingValues` with a type `object` in a form of `{field1: value1, field2: value2}`. If a row doesn't have any missing values the field will get an empty object. This option can be a hash with 3 optional keys `source`, `target` and `values` where:
- `source (str|str[])` - a field or list of fields to extract missing values (default: all fields)
- `target (str)` - a field to place a missing values mapping (default: `missingValues`)
- `values (str[])` - an alternative list of missing values (default: `schema['missingValues']`)
- `override_fields` - Provided mapping will patch the inferred `schema.fields` array. In the mapping keys must be field names and values must be dictionaries intended to be merged into the corresponding field's metadata.
- `deduplicate_headers` - (default `False`) If there are duplicate headers and the flag is set to `True` it will rename them using a `header (1), header (2), etc` approach. If there are duplicate headers and the flag is set to `False` it will raise an error.
- `on_error` - Dictates how `load` will behave in case of a validation error.
Expand Down
8 changes: 8 additions & 0 deletions data/missing_values.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
col1,col2
1,1
err1,2
3,3
4,err2
5,5
mis1,mis2
7,7
39 changes: 39 additions & 0 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class load(DataStreamProcessor):
def __init__(self, load_source, name=None, resources=None, strip=True, limit_rows=None,
infer_strategy=None, cast_strategy=None,
override_schema=None, override_fields=None,
extract_missing_values=None,
deduplicate_headers=False,
on_error=raise_exception,
**options):
Expand All @@ -123,6 +124,18 @@ def __init__(self, load_source, name=None, resources=None, strip=True, limit_row
self.override_fields = override_fields
self.deduplicate_headers = deduplicate_headers

# Extract missing values
self.extract_missing_values = None
if extract_missing_values is not None:
if isinstance(extract_missing_values, bool):
extract_missing_values = {}
extract_missing_values.setdefault('source', None)
extract_missing_values.setdefault('target', 'missingValues')
extract_missing_values.setdefault('values', [])
if isinstance(extract_missing_values.get('source'), str):
extract_missing_values['source'] = [extract_missing_values['source']]
self.extract_missing_values = extract_missing_values

self.load_dp = None
self.resource_descriptors = []
self.iterators = []
Expand Down Expand Up @@ -223,6 +236,16 @@ def safe_process_datapackage(self, dp: Package):
fields = schema.get('fields', [])
for field in fields:
field.update(self.override_fields.get(field['name'], {}))
if self.extract_missing_values:
missing_values = schema.get('missingValues', [])
if not self.extract_missing_values['values']:
self.extract_missing_values['values'] = missing_values
schema['fields'].append({
'name': self.extract_missing_values['target'],
'type': 'object',
'format': 'default',
'values': self.extract_missing_values['values'],
})
descriptor['schema'] = schema
descriptor['format'] = self.options.get('format', stream.format)
descriptor['path'] += '.{}'.format(stream.format)
Expand Down Expand Up @@ -252,9 +275,25 @@ def stringer(self, iterator):
for k, v in r.items()
)

def missing_values_extractor(self, iterator):
source = self.extract_missing_values['source']
target = self.extract_missing_values['target']
values = self.extract_missing_values['values']
for row in iterator:
mapping = {}
if values:
for key, value in row.items():
if not source or key in source:
if value in values:
mapping[key] = value
row[target] = mapping
yield row

def process_resources(self, resources):
yield from super(load, self).process_resources(resources)
for descriptor, it in zip(self.resource_descriptors, self.iterators):
if self.extract_missing_values:
it = self.missing_values_extractor(it)
it = self.caster(descriptor, it)
if self.strip:
it = self.stripper(it)
Expand Down
101 changes: 101 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1520,3 +1520,104 @@ def test_force_temporal_format():
'time': datetime.time(8, 10, 4),
}
]]


# Extract missing values

def test_extract_missing_values():
from dataflows import load
schema = {
'missingValues': ['err1', 'err2', 'mis1', 'mis2'],
'fields': [
{'name': 'col1', 'type': 'number', 'format': 'default'},
{'name': 'col2', 'type': 'number', 'format': 'default'},
]
}
flow = Flow(
load('data/missing_values.csv', override_schema=schema, extract_missing_values=True),
)
data, package, stats = flow.results()
assert package.descriptor['resources'][0]['schema']['fields'][0] == schema['fields'][0]
assert package.descriptor['resources'][0]['schema']['fields'][1] == schema['fields'][1]
assert package.descriptor['resources'][0]['schema']['fields'][2] == {
'name': 'missingValues',
'type': 'object',
'format': 'default',
'values': schema['missingValues'],
}
assert data == [[
{'col1': 1, 'col2': 1, 'missingValues': {}},
{'col1': None, 'col2': 2, 'missingValues': {'col1': 'err1'}},
{'col1': 3, 'col2': 3, 'missingValues': {}},
{'col1': 4, 'col2': None, 'missingValues': {'col2': 'err2'}},
{'col1': 5, 'col2': 5, 'missingValues': {}},
{'col1': None, 'col2': None, 'missingValues': {'col1': 'mis1', 'col2': 'mis2'}},
{'col1': 7, 'col2': 7, 'missingValues': {}},
]]

def test_extract_missing_values_options():
from dataflows import load
schema = {
'missingValues': ['err1', 'err2', 'mis1', 'mis2'],
'fields': [
{'name': 'col1', 'type': 'number', 'format': 'default'},
{'name': 'col2', 'type': 'number', 'format': 'default'},
]
}
flow = Flow(
load('data/missing_values.csv', override_schema=schema, extract_missing_values={
'source': 'col1',
'target': 'notes'
}),
)
data, package, stats = flow.results()
assert package.descriptor['resources'][0]['schema']['fields'][0] == schema['fields'][0]
assert package.descriptor['resources'][0]['schema']['fields'][1] == schema['fields'][1]
assert package.descriptor['resources'][0]['schema']['fields'][2] == {
'name': 'notes',
'type': 'object',
'format': 'default',
'values': schema['missingValues'],
}
assert data == [[
{'col1': 1, 'col2': 1, 'notes': {}},
{'col1': None, 'col2': 2, 'notes': {'col1': 'err1'}},
{'col1': 3, 'col2': 3, 'notes': {}},
{'col1': 4, 'col2': None, 'notes': {}},
{'col1': 5, 'col2': 5, 'notes': {}},
{'col1': None, 'col2': None, 'notes': {'col1': 'mis1'}},
{'col1': 7, 'col2': 7, 'notes': {}},
]]

def test_extract_missing_values_options_source_is_list():
from dataflows import load
schema = {
'missingValues': ['err1', 'err2', 'mis1', 'mis2'],
'fields': [
{'name': 'col1', 'type': 'number', 'format': 'default'},
{'name': 'col2', 'type': 'number', 'format': 'default'},
]
}
flow = Flow(
load('data/missing_values.csv', override_schema=schema, extract_missing_values={
'source': ['col1', 'col2'],
}),
)
data, package, stats = flow.results()
assert package.descriptor['resources'][0]['schema']['fields'][0] == schema['fields'][0]
assert package.descriptor['resources'][0]['schema']['fields'][1] == schema['fields'][1]
assert package.descriptor['resources'][0]['schema']['fields'][2] == {
'name': 'missingValues',
'type': 'object',
'format': 'default',
'values': schema['missingValues'],
}
assert data == [[
{'col1': 1, 'col2': 1, 'missingValues': {}},
{'col1': None, 'col2': 2, 'missingValues': {'col1': 'err1'}},
{'col1': 3, 'col2': 3, 'missingValues': {}},
{'col1': 4, 'col2': None, 'missingValues': {'col2': 'err2'}},
{'col1': 5, 'col2': 5, 'missingValues': {}},
{'col1': None, 'col2': None, 'missingValues': {'col1': 'mis1', 'col2': 'mis2'}},
{'col1': 7, 'col2': 7, 'missingValues': {}},
]]

0 comments on commit 35fde15

Please sign in to comment.