Skip to content

Commit

Permalink
Complete exception work (#137)
Browse files Browse the repository at this point in the history
* Added additional information to exceptions (#134)

* Added additional information to exceptions

* Updated implementation

Co-authored-by: Adam Kariv <akariv@users.noreply.github.com>

* Complete exceptions work on other dsp kinds

* Fix tests

Co-authored-by: roll <roll@users.noreply.github.com>
  • Loading branch information
akariv and roll committed May 26, 2020
1 parent 99f5215 commit 13a8489
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 60 deletions.
2 changes: 1 addition & 1 deletion dataflows/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.74
0.1.0
3 changes: 2 additions & 1 deletion dataflows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import DataStream, DataStreamProcessor, schema_validator, ValidationError
from .base import ResourceWrapper, PackageWrapper
from .base import exceptions
from .base import Flow
from .processors import * # noqa
from .processors import * # noqa
1 change: 1 addition & 0 deletions dataflows/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import exceptions
from .datastream import DataStream
from .datastream_processor import DataStreamProcessor
from .resource_wrapper import ResourceWrapper
Expand Down
50 changes: 36 additions & 14 deletions dataflows/base/datastream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datapackage import Package
from tableschema.exceptions import CastError

from . import exceptions
from .datastream import DataStream
from .resource_wrapper import ResourceWrapper
from .schema_validator import schema_validator
Expand All @@ -26,11 +27,13 @@ def __init__(self):
self.stats = {}
self.source = None
self.datapackage = None
self.position = None

def __call__(self, source=None):
def __call__(self, source=None, position=None):
if source is None:
source = DataStream()
self.source = source
self.position = position
return self

def process_resource(self, resource: ResourceWrapper):
Expand Down Expand Up @@ -71,28 +74,47 @@ def func():
def _process(self):
datastream = self.source._process()

self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor))
self.datapackage = self.process_datapackage(self.datapackage)
self.datapackage.commit()

return DataStream(self.datapackage,
LazyIterator(self.get_iterator(datastream)),
datastream.stats + [self.stats])
try:
self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor))
self.datapackage = self.process_datapackage(self.datapackage)
self.datapackage.commit()

return DataStream(self.datapackage,
LazyIterator(self.get_iterator(datastream)),
datastream.stats + [self.stats])
except Exception as exception:
self.raise_exception(exception)

def raise_exception(self, cause):
if not isinstance(cause, exceptions.ProcessorError):
error = exceptions.ProcessorError(
cause,
processor_name=self.__class__.__name__,
processor_object=self,
processor_position=self.position
)
raise error from cause
raise cause

def process(self):
ds = self._process()
try:
ds = self._process()
for res in ds.res_iter:
collections.deque(res, maxlen=0)
except CastError as e:
for err in e.errors:
logging.error('%s', err)
except Exception as exception:
self.raise_exception(exception)
return ds.dp, ds.merge_stats()

def results(self, on_error=None):
ds = self._process()
results = [
list(schema_validator(res.res, res, on_error=on_error))
for res in ds.res_iter
]
try:
ds = self._process()
results = [
list(schema_validator(res.res, res, on_error=on_error))
for res in ds.res_iter
]
except Exception as exception:
self.raise_exception(exception)
return results, ds.dp, ds.merge_stats()
20 changes: 20 additions & 0 deletions dataflows/base/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class DataflowsException(Exception):
pass


class ProcessorError(DataflowsException):

def __init__(self, cause, *, processor_name, processor_object, processor_position):
self.cause = cause
self.processor_name = processor_name
self.processor_object = processor_object
self.processor_position = processor_position
super().__init__(str(cause))

def __str__(self):
return 'Errored in processor %s in position #%s: %s' % \
(self.processor_name, self.processor_position, self.cause)


class SourceLoadError(DataflowsException):
pass
12 changes: 6 additions & 6 deletions dataflows/base/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,26 @@ def _preprocess_chain(self):
def _chain(self, ds=None):
from ..helpers import datapackage_processor, rows_processor, row_processor, iterable_loader

for link in self._preprocess_chain():
for position, link in enumerate(self._preprocess_chain(), start=1):
if isinstance(link, Flow):
ds = link._chain(ds)
elif isinstance(link, DataStreamProcessor):
ds = link(ds)
ds = link(ds, position=position)
elif isfunction(link):
sig = signature(link)
params = list(sig.parameters)
if len(params) == 1:
if params[0] == 'row':
ds = row_processor(link)(ds)
ds = row_processor(link)(ds, position=position)
elif params[0] == 'rows':
ds = rows_processor(link)(ds)
ds = rows_processor(link)(ds, position=position)
elif params[0] == 'package':
ds = datapackage_processor(link)(ds)
ds = datapackage_processor(link)(ds, position=position)
else:
assert False, 'Failed to parse function signature %r' % params
else:
assert False, 'Failed to parse function signature %r' % params
elif isinstance(link, Iterable):
ds = iterable_loader(link)(ds)
ds = iterable_loader(link)(ds, position=position)

return ds
5 changes: 3 additions & 2 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from tabulator.helpers import reset_stream
from tableschema.schema import Schema
from .. import DataStreamProcessor
from ..base.exceptions import SourceLoadError
from ..base.schema_validator import schema_validator, ignore, drop, raise_exception
from ..helpers.resource_matcher import ResourceMatcher

Expand Down Expand Up @@ -173,8 +174,8 @@ def process_datapackage(self, dp: Package):
try:
return self.safe_process_datapackage(dp)
except Exception as e:
raise e from Exception('Failed to run load with load source {!r} and options {!r}'
.format(self.load_source, self.options))
raise SourceLoadError('Failed to load source {!r} and options {!r}: {}'
.format(self.load_source, self.options, e)) from e

def safe_process_datapackage(self, dp: Package):

Expand Down
94 changes: 89 additions & 5 deletions tests/test_edge_cases.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,99 @@
import pytest

def test_exception_in_generator():
from dataflows import Flow, printer

class MyException(Exception):
pass
class MyException(Exception):
pass


def test_exception_in_generator():
from dataflows import Flow, printer, exceptions

def generator():
for i in range(5):
raise MyException()
yield {"i": i}

with pytest.raises(MyException):
with pytest.raises(exceptions.ProcessorError) as excinfo:
Flow(generator(), printer()).process()
assert isinstance(excinfo.value.cause, MyException)


def test_exception_information():
from dataflows import Flow, load, exceptions
flow = Flow(
load('data/bad-path1.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == "Failed to load source 'data/bad-path1.csv' and options {'custom_parsers': {'xml': <class 'dataflows.processors.load.XMLParser'>}, 'ignore_blank_headers': True, 'headers': 1}: [Errno 2] No such file or directory: 'data/bad-path1.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv'
assert excinfo.value.processor_position == 1


def test_exception_information_multiple_processors_simple():
from dataflows import Flow, load, exceptions
flow = Flow(
load('data/bad-path1.csv'),
load('data/bad-path2.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == "Failed to load source 'data/bad-path1.csv' and options {'custom_parsers': {'xml': <class 'dataflows.processors.load.XMLParser'>}, 'ignore_blank_headers': True, 'headers': 1}: [Errno 2] No such file or directory: 'data/bad-path1.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv'
assert excinfo.value.processor_position == 1


def test_exception_information_multiple_processors_last_errored():
from dataflows import Flow, load, exceptions
flow = Flow(
load('data/academy.csv'),
load('data/bad-path2.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == "Failed to load source 'data/bad-path2.csv' and options {'custom_parsers': {'xml': <class 'dataflows.processors.load.XMLParser'>}, 'ignore_blank_headers': True, 'headers': 1}: [Errno 2] No such file or directory: 'data/bad-path2.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path2.csv'
assert excinfo.value.processor_position == 2


def test_exception_information_multiple_processors_function_error():
from dataflows import Flow, load, exceptions

def func(rows):
for i, row in enumerate(rows):
if i == 1:
raise MyException('custom-error')
yield row

flow = Flow(
load('data/academy.csv'),
func
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == 'custom-error'
assert excinfo.value.processor_name == 'rows_processor'
assert excinfo.value.processor_position == 2


def test_exception_information_multiple_processors_iterable_error():
from dataflows import Flow, printer, exceptions

def func():
for i in range(10):
if i == 1:
raise MyException('custom-iterable-error')
yield dict(a=i)

flow = Flow(
func(),
printer()
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
flow.results()
assert str(excinfo.value.cause) == 'custom-iterable-error'
assert excinfo.value.processor_name == 'iterable_loader'
assert excinfo.value.processor_position == 1
22 changes: 8 additions & 14 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

def test_example_1():
from dataflows import Flow

Expand All @@ -16,8 +18,6 @@ def lowerData(row):
)
data, *_ = f.results()

print(data)

# [[{'data': 'hello'}, {'data': 'world'}]]


Expand All @@ -33,8 +33,6 @@ def titleName(row):
)
data, *_ = f.results()

print(data)


def country_population():
from xml.etree import ElementTree
Expand Down Expand Up @@ -63,8 +61,6 @@ def test_example_3():
)
data, *_ = f.results()

print(data)

def test_example_4():
from dataflows import Flow, set_type

Expand All @@ -74,8 +70,6 @@ def test_example_4():
)
data, dp, _ = f.results()

print(data[0][:10])

def test_example_5():
from dataflows import Flow, set_type, dump_to_path

Expand Down Expand Up @@ -112,8 +106,9 @@ def filter_pythagorean_triplets(rows):
)
_ = f.process()


def test_validate():
from dataflows import Flow, validate, set_type, printer, ValidationError
from dataflows import Flow, validate, set_type, printer, ValidationError, exceptions

def adder(row):
row['a'] += 0.5
Expand All @@ -127,11 +122,10 @@ def adder(row):
validate(),
printer()
)
try:
_ = f.process()
assert False
except ValidationError:
pass

with pytest.raises(exceptions.ProcessorError) as excinfo:
f.process()
assert isinstance(excinfo.value.cause, ValidationError)


def test_example_7():
Expand Down
Loading

0 comments on commit 13a8489

Please sign in to comment.