Skip to content

Commit

Permalink
Control validation on load, don't auto cast
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Oct 17, 2018
1 parent 8f8d1c7 commit 55d0fe6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
3 changes: 2 additions & 1 deletion PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin
Loads data from various source types (local files, remote URLS, Google Spreadsheets, databases...)

```python
def load(source, name=None, resources=None, **options):
def load(source, name=None, resources=None, validate=False, **options):
pass
```

Expand All @@ -55,6 +55,7 @@ def load(source, name=None, resources=None, **options):
- A list of resource names to load
- `None` indicates to load all resources
- The index of the resource in the package
- `validate` - should data be casted to the inferred data-types or not
- `options` - based on the loaded file, extra options (e.g. `sheet` for Excel files etc., see the link to tabulator above)

#### printer
Expand Down
13 changes: 10 additions & 3 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

from datapackage import Package, Resource
from .. import DataStreamProcessor
from ..base.schema_validator import schema_validator
from ..helpers.resource_matcher import ResourceMatcher


class load(DataStreamProcessor):

def __init__(self, load_source, name=None, resources=None, **options):
def __init__(self, load_source, name=None, resources=None, validate=False, **options):
super(load, self).__init__()
self.load_source = load_source
self.options = options
self.name = name
self.resources = resources
self.load_dp = None
self.validate = validate

def process_datapackage(self, dp: Package):
if isinstance(self.load_source, tuple):
Expand Down Expand Up @@ -43,9 +45,11 @@ def process_datapackage(self, dp: Package):
else:
base_path = None
descriptor = dict(path=self.load_source,
profile='tabular-data-resource')
profile='tabular-data-resource')
if 'format' in self.options:
descriptor['format'] = self.options['format']
self.options.setdefault('ignore_blank_headers', True)
self.options.setdefault('headers', 1)
self.res = Resource(descriptor,
base_path=base_path,
**self.options)
Expand All @@ -66,4 +70,7 @@ def process_resources(self, resources):
yield from (resource.iter(keyed=True) for resource in self.load_dp.resources
if self.resource_matcher.match(resource.name))
else:
yield self.res.iter(keyed=True)
it = self.res.iter(keyed=True, cast=False)
if self.validate:
it = schema_validator(self.res, it)
yield it
29 changes: 29 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,32 @@ def test_add_field():
'title': 'mybool',
'type': 'boolean'}],
'missingValues': ['']}}]}


def test_load_empty_headers():
from dataflows import Flow, load, printer

def ensure_type(t):
def func(row):
assert isinstance(row['a'], t)
return func

results, dp, stats = Flow(load('data/empty_headers.csv'),
ensure_type(str)).results()
assert results[0] == [
{'a': 1, 'b': 2},
{'a': 2, 'b': 3},
{'a': 3, 'b': 4},
{'a': 5, 'b': 6}
]
assert len(dp.resources[0].schema.fields) == 2

results, dp, stats = Flow(load('data/empty_headers.csv', validate=True),
ensure_type(int)).results()
assert results[0] == [
{'a': 1, 'b': 2},
{'a': 2, 'b': 3},
{'a': 3, 'b': 4},
{'a': 5, 'b': 6}
]
assert len(dp.resources[0].schema.fields) == 2

0 comments on commit 55d0fe6

Please sign in to comment.