Skip to content

Commit

Permalink
v0.0.51 Load to use tabulator directly
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed May 2, 2019
1 parent 2c5e5e0 commit b61f3fe
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 46 deletions.
2 changes: 1 addition & 1 deletion dataflows/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.50
0.0.51
8 changes: 6 additions & 2 deletions dataflows/base/schema_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ def drop(res_name, row, i, e):
return False


def schema_validator(resource: Resource, iterator,
def schema_validator(resource, iterator,
field_names=None, on_error=None):
if on_error is None:
on_error = raise_exception
schema: Schema = resource.schema
if isinstance(resource, Resource):
schema: Schema = resource.schema
assert schema is not None
else:
schema: Schema = Schema(resource.get('schema', {}))
if field_names is None:
field_names = [f.name for f in schema.fields]
schema_fields = [f for f in schema.fields if f.name in field_names]
Expand Down
90 changes: 47 additions & 43 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os

from datapackage import Package, Resource
from datapackage import Package
from tabulator import Stream
from tabulator.parser import Parser
from tabulator.helpers import reset_stream
from tableschema.schema import Schema
from .. import DataStreamProcessor
from ..base.schema_validator import schema_validator
from ..helpers.resource_matcher import ResourceMatcher
Expand Down Expand Up @@ -68,62 +70,72 @@ class load(DataStreamProcessor):
def __init__(self, load_source, name=None, resources=None, validate=False, strip=True, **options):
super(load, self).__init__()
self.load_source = load_source
self.options = options

self.name = name
self.resources = resources
self.load_dp = None
self.validate = validate
self.strip = strip
self.options = options

self.resources = resources

self.load_dp = None
self.force_strings = options.get('force_strings') is True
self.resource_descriptors = []
self.iterators = []

def process_datapackage(self, dp: Package):

# If loading from datapackage & resource iterator:
if isinstance(self.load_source, tuple):
datapackage_descriptor, _ = self.load_source
dp.descriptor.setdefault('resources', [])
self.resource_matcher = ResourceMatcher(self.resources, datapackage_descriptor)
datapackage_descriptor, resource_iterator = self.load_source
resources = datapackage_descriptor['resources']
resource_matcher = ResourceMatcher(self.resources, datapackage_descriptor)
for resource_descriptor in datapackage_descriptor['resources']:
if self.resource_matcher.match(resource_descriptor['name']):
dp.add_resource(resource_descriptor)
else: # load_source is string:
if resource_matcher.match(resource_descriptor['name']):
self.resource_descriptors.append(resource_descriptor)
self.iterators = (resource for resource, descriptor in zip(resource_iterator, resources)
if resource_matcher.match(descriptor['name']))

# If load_source is string:
else:
# Handle Environment vars if necessary:
if self.load_source.startswith('env://'):
env_var = self.load_source[6:]
self.load_source = os.environ.get(env_var)
if self.load_source is None:
raise ValueError(f"Couldn't find value for env var '{env_var}'")

# Loading from datapackage:
if os.path.basename(self.load_source) == 'datapackage.json':
self.load_dp = Package(self.load_source)
self.resource_matcher = ResourceMatcher(self.resources, self.load_dp)
dp.descriptor.setdefault('resources', [])
resource_matcher = ResourceMatcher(self.resources, self.load_dp)
for resource in self.load_dp.resources:
if self.resource_matcher.match(resource.name):
dp.add_resource(resource.descriptor)
if resource_matcher.match(resource.name):
self.resource_descriptors.append(resource.descriptor)
self.iterators.append(resource.iter(keyed=True, cast=False))

# Loading for any other source
else:
if os.path.exists(self.load_source):
base_path = os.path.dirname(self.load_source) or '.'
self.load_source = os.path.basename(self.load_source)
else:
base_path = None
descriptor = dict(path=self.load_source,
path = os.path.basename(self.load_source)
path = os.path.splitext(path)[0]
descriptor = dict(path=path,
profile='tabular-data-resource')
descriptor['format'] = self.options.get('format')
self.resource_descriptors.append(descriptor)
descriptor['name'] = self.name or path
if 'encoding' in self.options:
descriptor['encoding'] = self.options['encoding']
if descriptor['format'] == 'xml' or self.load_source.endswith('.xml'):
self.options.setdefault('custom_parsers', {})['xml'] = XMLParser
self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser)
self.options.setdefault('ignore_blank_headers', True)
self.options.setdefault('headers', 1)
self.res = Resource(descriptor,
base_path=base_path,
**self.options)
self.res.infer(confidence=1, limit=1000)
if self.name is not None:
self.res.descriptor['name'] = self.name
stream: Stream = Stream(self.load_source, **self.options).open()
descriptor['schema'] = Schema().infer(stream.sample, headers=stream.headers, confidence=1)
if self.force_strings:
for f in self.res.descriptor['schema']['fields']:
for f in descriptor['schema']['fields']:
f['type'] = 'string'
self.res.commit()
self.res.descriptor['path'] = '{name}.{format}'.format(**self.res.descriptor)
dp.add_resource(self.res.descriptor)
descriptor['format'] = self.options.get('format', stream.format)
descriptor['path'] += '.{}'.format(stream.format)
self.iterators.append(stream.iter(keyed=True))
dp.descriptor.setdefault('resources', []).extend(self.resource_descriptors)
return dp

def stripper(self, iterator):
Expand All @@ -135,17 +147,9 @@ def stripper(self, iterator):

def process_resources(self, resources):
yield from super(load, self).process_resources(resources)
if isinstance(self.load_source, tuple):
datapackage_descriptor, resources = self.load_source
yield from (resource for resource, descriptor in zip(resources, datapackage_descriptor['resources'])
if self.resource_matcher.match(descriptor['name']))
elif self.load_dp is not None:
yield from (resource.iter(keyed=True) for resource in self.load_dp.resources
if self.resource_matcher.match(resource.name))
else:
it = self.res.iter(keyed=True, cast=False)
for descriptor, it in zip(self.resource_descriptors, self.iterators):
if self.validate:
it = schema_validator(self.res, it)
it = schema_validator(descriptor, it)
if self.strip:
it = self.stripper(it)
yield it
2 changes: 2 additions & 0 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def lowerData(row):

# [[{'data': 'hello'}, {'data': 'world'}]]


def test_example_2():
from dataflows import Flow, load

Expand All @@ -34,6 +35,7 @@ def titleName(row):

print(data)


def country_population():
from xml.etree import ElementTree
from urllib.request import urlopen
Expand Down

0 comments on commit b61f3fe

Please sign in to comment.