Skip to content

Commit

Permalink
Merge 5638d43 into 61f1aac
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed May 23, 2019
2 parents 61f1aac + 5638d43 commit fe765c5
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 14 deletions.
24 changes: 21 additions & 3 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin
Loads data from various source types (local files, remote URLS, Google Spreadsheets, databases...)

```python
def load(source, name=None, resources=None, validate=False, strip=True, force_strings=False, limit_rows=None, **options):
def load(source, name=None, resources=None, strip=True, limit_rows=None,
infer_strategy=None, cast_strategy=None, on_error=raise_exception,
**options)
pass
```

Expand All @@ -60,10 +62,26 @@ def load(source, name=None, resources=None, validate=False, strip=True, force_st
- `options` - based on the loaded file, extra options (e.g. `sheet` for Excel files etc., see the link to tabulator above)

Relevant only when _not_ loading data from a datapackage:
- `force_strings` - Don't infer data types, assume everything is a string.
- `validate` - Attempt to cast data to the inferred data-types.
- `strip` - Should string values be stripped from whitespace characters surrounding them.
- `limit_rows` - If provided, will limit the number of rows fetched from the source. Takes an integer value which specifies how many rows of the source to stream.
- `infer_strategy` - Dictates if and how `load` will try to guess the datatypes in the source data:
- `load.INFER_STRINGS` - All columns will get a `string` datatype
- `load.INFER_PYTHON_TYPES` - All columns will get a datatype matching their python type
- `load.INFER_FULL` - All columns will get a datatype matching their python type, except strings which will be attempted to be parsed (e.g `"1" -> 1` etc.) (default)
- `cast_strategy` - Dictates if and how `load` will parse and validate the data against the datatypes in the inferred schema:
- `load.CAST_TO_STRINGS` - All data will be casted to strings (regardless of how it's stored in the source file) and won't be validated using the schema.
- `load.CAST_DO_NOTHING` - Data will be passed as-is without modifications or validation
- `load.CAST_WITH_SCHEMA` - Data will be parsed and casted using the schema and will error in case of faulty data
- `on_error` - Dictates how `load` will behave in case of a validation error.
Options are identical to `on_error` in `set_type` and `validate`


Some deprecated options:
- `force_strings` - Don't infer data types, assume everything is a string.
(equivalent to `infer_strategy = INFER_STRINGS, cast_strategy = CAST_TO_STRINGS`)
- `validate` - Attempt to cast data to the inferred data-types.
(equivalent to `cast_strategy = CAST_WITH_SCHEMA, on_error = raise_exception`)


#### printer
Just prints whatever it sees. Good for debugging.
Expand Down
4 changes: 3 additions & 1 deletion dataflows/processors/dumpers/dumper_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, options={}):
self.resource_hash = counters.get('resource-hash', 'hash')
self.add_filehash_to_path = options.get('add_filehash_to_path', False)
self.pretty_descriptor = options.get('pretty_descriptor', True)
self.schema_validator_options = options.get('validator_options', {})

@staticmethod
def get_attr(obj, prop, default=None):
Expand Down Expand Up @@ -81,7 +82,8 @@ def process_resources(self, resources):
ret = self.process_resource(
ResourceWrapper(
resource.res,
schema_validator(resource.res, resource)
schema_validator(resource.res, resource,
**self.schema_validator_options)
)
)
ret = self.row_counter(resource, ret)
Expand Down
88 changes: 78 additions & 10 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import warnings
import datetime

from datapackage import Package
from tabulator import Stream
from tabulator.parser import Parser
from tabulator.helpers import reset_stream
from tableschema.schema import Schema
from .. import DataStreamProcessor
from ..base.schema_validator import schema_validator
from ..base.schema_validator import schema_validator, ignore, drop, raise_exception
from ..helpers.resource_matcher import ResourceMatcher


Expand Down Expand Up @@ -65,25 +67,86 @@ def __iter_extended_rows(self):
yield (row_number, list(keys), list(values))


class StringsGuesser():
def cast(self, value):
return [('string', 'default', 0)]


class TypesGuesser():
def cast(self, value):
jts_type = {
str: 'string',
int: 'integer',
float: 'number',
list: 'array',
dict: 'object',
tuple: 'array',
bool: 'boolean',
datetime.datetime: 'datetime',
datetime.date: 'date',
}.get(type(value))
ret = [('any', 'default', 0)]
if jts_type is not None:
ret.append(('jts_type', 'default', 1))
return ret


class load(DataStreamProcessor):

def __init__(self, load_source, name=None, resources=None, validate=False, strip=True, limit_rows=None, **options):
INFER_STRINGS = 'strings'
INFER_PYTHON_TYPES = 'pytypes'
INFER_FULL = 'full'

CAST_TO_STRINGS = 'strings'
CAST_DO_NOTHING = 'nothing'
CAST_WITH_SCHEMA = 'schema'

ERRORS_IGNORE = ignore
ERRORS_DROP = drop
ERRORS_RAISE = raise_exception

def __init__(self, load_source, name=None, resources=None, strip=True, limit_rows=None,
infer_strategy=None, cast_strategy=None, on_error=raise_exception,
**options):
super(load, self).__init__()
self.load_source = load_source

self.name = name
self.validate = validate
self.strip = strip
self.limit_rows = limit_rows
self.options = options

self.resources = resources

self.load_dp = None
self.force_strings = options.get('force_strings') is True
self.resource_descriptors = []
self.iterators = []

if 'force_strings' in options:
warnings.warn('force_strings is being deprecated, use infer_strategy & cast_strategy instead',
DeprecationWarning)
if options['force_strings']:
infer_strategy = self.INFER_STRINGS
cast_strategy = self.CAST_TO_STRINGS

if 'validate' in options:
warnings.warn('validate is being deprecated, use cast_strategy & on_error instead',
DeprecationWarning)
if options['validate']:
cast_strategy = self.CAST_WITH_SCHEMA

self.guesser = {
self.INFER_FULL: None,
self.INFER_PYTHON_TYPES: TypesGuesser,
self.INFER_STRINGS: StringsGuesser,
}[infer_strategy or self.INFER_FULL]

self.caster = {
self.CAST_DO_NOTHING: lambda res, it: it,
self.CAST_WITH_SCHEMA: lambda res, it: schema_validator(res, it, on_error=on_error),
self.CAST_TO_STRINGS: lambda res, it: self.stringer(it)
}[cast_strategy or self.CAST_DO_NOTHING]

def process_datapackage(self, dp: Package):

# If loading from datapackage & resource iterator:
Expand Down Expand Up @@ -129,10 +192,9 @@ def process_datapackage(self, dp: Package):
self.options.setdefault('ignore_blank_headers', True)
self.options.setdefault('headers', 1)
stream: Stream = Stream(self.load_source, **self.options).open()
descriptor['schema'] = Schema().infer(stream.sample, headers=stream.headers, confidence=1)
if self.force_strings:
for f in descriptor['schema']['fields']:
f['type'] = 'string'
descriptor['schema'] = \
Schema().infer(stream.sample, headers=stream.headers, confidence=1,
guesser_cls=self.guesser)
descriptor['format'] = self.options.get('format', stream.format)
descriptor['path'] += '.{}'.format(stream.format)
self.iterators.append(stream.iter(keyed=True))
Expand All @@ -154,11 +216,17 @@ def limiter(self, iterator):
if count >= self.limit_rows:
break

def stringer(self, iterator):
for r in iterator:
yield dict(
(k, str(v)) if not isinstance(v, str) else (k, v)
for k, v in r.items()
)

def process_resources(self, resources):
yield from super(load, self).process_resources(resources)
for descriptor, it in zip(self.resource_descriptors, self.iterators):
if self.validate:
it = schema_validator(descriptor, it)
it = self.caster(descriptor, it)
if self.strip:
it = self.stripper(it)
if self.limit_rows:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def read(*paths):
NAME = PACKAGE.replace('_', '-')
INSTALL_REQUIRES = [
'datapackage>=1.5.0',
'tableschema>=1.5',
'kvfile>=0.0.6',
'click',
'jinja2',
Expand Down

0 comments on commit fe765c5

Please sign in to comment.