Skip to content

Commit

Permalink
Fix lint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Jun 8, 2018
1 parent 554e2be commit 5f8b303
Show file tree
Hide file tree
Showing 20 changed files with 32 additions and 60 deletions.
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ dist: trusty
sudo: required
language: python
python:
- 3.4
- 3.5
- 3.6
- 3.7
env:
global:
- TOXENV="py${PYTHON_VERSION//./}"
Expand Down
3 changes: 1 addition & 2 deletions dataflows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .base import DataStream, DataStreamProcessor, schema_validator
from .base import ResourceWrapper, PackageWrapper
from .base import Flow
from .processors import *

from .processors import * # noqa
12 changes: 5 additions & 7 deletions dataflows/base/datastream_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import itertools
import collections
from copy import deepcopy

from datapackage import Package
from tableschema.exceptions import CastError
Expand Down Expand Up @@ -44,19 +43,19 @@ def _process(self):
self.datapackage = self.process_datapackage(datastream.dp)
self.datapackage.commit()
res_iter = datastream.res_iter

def get_res(name):
ret = self.datapackage.get_resource(name)
if ret is None:
ret = dp_copy.get_resource(name)
assert ret is not None
return ret

res_iter = (ResourceWrapper(get_res(rw.res.name), rw.it)
for rw in res_iter)
res_iter = self.process_resources(res_iter)
res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it)
for res, it
res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it)
for res, it
in itertools.zip_longest(self.datapackage.resources, res_iter))

return DataStream(self.datapackage, res_iter, datastream.stats + [self.stats])
Expand All @@ -70,12 +69,11 @@ def process(self):
for err in e.errors:
logging.error('%s', err)
return ds.dp, ds.merge_stats()

def results(self):
ds = self._process()
results = [
list(schema_validator(res.res, res))
for res in ds.res_iter
]
return results, ds.dp, ds.merge_stats()

3 changes: 2 additions & 1 deletion dataflows/base/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from .datastream_processor import DataStreamProcessor


class Flow:
def __init__(self, *args):
self.chain = args
Expand Down Expand Up @@ -37,4 +38,4 @@ def _chain(self):
elif isinstance(link, Iterable):
ds = iterable_loader(link)(ds)

return ds
return ds
1 change: 0 additions & 1 deletion dataflows/base/schema_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@ def schema_validator(resource: Resource, iterator):
logging.warning('Encountered field %r, not in schema', k)

yield row

12 changes: 6 additions & 6 deletions dataflows/helpers/iterable_loader.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import itertools
import itertools
import decimal
import datetime

Expand All @@ -7,6 +7,7 @@

from .. import DataStreamProcessor


class iterable_storage(Storage):

def __init__(self, iterable):
Expand Down Expand Up @@ -38,7 +39,7 @@ def field_type(self, value):
return 'datetime'
elif isinstance(value, datetime.date):
return 'date'
assert 'Unknown Python type: %r' % value
assert 'Unknown Python type: %r' % value

def describe(self, _, descriptor=None):
if descriptor is not None:
Expand All @@ -53,10 +54,10 @@ def describe(self, _, descriptor=None):
for name, value in rec.items()
]
)
except:
except Exception:
self.schema = dict(fields=[])
return self.schema

def iter(self, _):
return self.iterable

Expand All @@ -81,7 +82,7 @@ def handle_iterable(self):
yield dict(zip(('col{}'.format(i) for i in range(len(x))), x))

def process_datapackage(self, dp: Package):
name = self.name
name = self.name
if name is None:
name = 'res_{}'.format(len(dp.resources) + 1)
self.res = Resource(dict(
Expand All @@ -95,4 +96,3 @@ def process_datapackage(self, dp: Package):
def process_resources(self, resources):
yield from super(iterable_loader, self).process_resources(resources)
yield self.res.iter(keyed=True)

4 changes: 2 additions & 2 deletions dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from .load import load
from .printer import printer
from .set_type import set_type
from .dumpers import *
from .dumpers import dump_to_path, dump_to_zip

from .add_computed_field import add_computed_field
from .add_metadata import add_metadata
from .concatenate import concatenate
from .delete_fields import delete_fields
from .filter_rows import filter_rows
from .find_replace import find_replace
from .unpivot import unpivot
from .unpivot import unpivot
6 changes: 2 additions & 4 deletions dataflows/processors/add_computed_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ def process_resource(fields, rows):


def add_computed_field(fields, resources=None):

matcher = ResourceMatcher(resources)

def func(package):
for resource in package.pkg.descriptor['resources']:
if matcher.match(resource['name']):
descriptor = resource#.descriptor
descriptor = resource
new_fields = [
{
'name': f['target'],
Expand All @@ -72,5 +72,3 @@ def func(package):
yield process_resource(fields, resource)

return func


3 changes: 1 addition & 2 deletions dataflows/processors/concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,4 @@ def func(package):
else:
yield resource


return func
return func
1 change: 0 additions & 1 deletion dataflows/processors/delete_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ def func(package):
yield process_resource(resource, fields)

return func

13 changes: 1 addition & 12 deletions dataflows/processors/dumpers/dumper_base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
import os
import tempfile
import logging
import hashlib
import copy
import json

import requests

from datapackage import Resource
from tableschema import Schema
from tableschema.exceptions import CastError

from ... import DataStreamProcessor, ResourceWrapper, schema_validator

from .file_formats import CSVFormat, JSONFormat


class DumperBase(DataStreamProcessor):

Expand Down Expand Up @@ -91,7 +80,7 @@ def process_resources(self, resources):
for resource in resources:
ret = self.process_resource(
ResourceWrapper(
resource.res,
resource.res,
schema_validator(resource.res, resource)
)
)
Expand Down
5 changes: 1 addition & 4 deletions dataflows/processors/dumpers/file_dumper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import copy
import json
import tempfile
import hashlib
Expand All @@ -9,6 +8,7 @@
from .dumper_base import DumperBase
from .file_formats import CSVFormat, JSONFormat


class FileDumper(DumperBase):

def __init__(self, options):
Expand Down Expand Up @@ -88,9 +88,6 @@ def process_resource(self, resource):
schema = resource.res.schema

temp_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
fields = schema.fields
headers = list(map(lambda field: field.name, fields))

writer = self.file_formatters[resource.res.name](temp_file, schema)

return self.rows_processor(resource,
Expand Down
1 change: 0 additions & 1 deletion dataflows/processors/dumpers/file_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def prepare_resource(cls, resource):
)
super(CSVFormat, cls).prepare_resource(resource)


def write_transformed_row(self, transformed_row):
self.writer.writerow(transformed_row)

Expand Down
2 changes: 0 additions & 2 deletions dataflows/processors/dumpers/to_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,3 @@ def write_file_to_output(self, filename, path):
def finalize(self):
self.zip_file.close()
super(ZipDumper, self).finalize()


3 changes: 2 additions & 1 deletion dataflows/processors/filter_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from ..helpers.resource_matcher import ResourceMatcher


def process_resource(rows, conditions):
for row in rows:
if any(func(row[k], v) for func, k, v in conditions):
Expand All @@ -28,4 +29,4 @@ def func(rows):
else:
return rows

return func
return func
2 changes: 1 addition & 1 deletion dataflows/processors/find_replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ def func(rows):
else:
yield from rows

return func
return func
1 change: 0 additions & 1 deletion dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,3 @@ def process_datapackage(self, dp: Package):
def process_resources(self, resources):
yield from super(load, self).process_resources(resources)
yield self.res.iter(keyed=True)

3 changes: 1 addition & 2 deletions dataflows/processors/printer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datapackage import Package, Resource
from datapackage import Package
from .. import DataStreamProcessor


Expand All @@ -11,4 +11,3 @@ def process_datapackage(self, dp: Package):
# def process_row(self, row):
# print(row)
# return row

4 changes: 2 additions & 2 deletions dataflows/processors/set_type.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from itertools import islice
from datapackage import Package, Resource
from datapackage import Resource
from .. import DataStreamProcessor, schema_validator


class set_type(DataStreamProcessor):

def __init__(self, name, **options):
super(set_type, self).__init__()
self.name = name
Expand Down
10 changes: 5 additions & 5 deletions dataflows/processors/unpivot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def unpivot_rows(rows, fields_to_unpivot, fields_to_keep, extra_value):


def unpivot(unpivot_fields, extra_keys, extra_value, resources=None):

matcher = ResourceMatcher(resources)

def func(package):
Expand Down Expand Up @@ -70,9 +70,9 @@ def func(package):
if not matcher.match(resource.res.name):
yield resource
else:
yield unpivot_rows(resource,
unpivot_fields_without_regex,
yield unpivot_rows(resource,
unpivot_fields_without_regex,
fields_to_keep,
extra_value)
return func

return func

0 comments on commit 5f8b303

Please sign in to comment.