diff --git a/dataflows/VERSION b/dataflows/VERSION index 31c01c9..1df5b7e 100644 --- a/dataflows/VERSION +++ b/dataflows/VERSION @@ -1 +1 @@ -0.0.45 +0.0.46 diff --git a/dataflows/base/datastream_processor.py b/dataflows/base/datastream_processor.py index 28af4d3..d5525a0 100644 --- a/dataflows/base/datastream_processor.py +++ b/dataflows/base/datastream_processor.py @@ -11,6 +11,15 @@ from .schema_validator import schema_validator +class LazyIterator: + + def __init__(self, get_iterator): + self.get_iterator = get_iterator + + def __iter__(self): + return self.get_iterator() + + class DataStreamProcessor: def __init__(self): @@ -38,31 +47,37 @@ def process_row(self, row): def process_datapackage(self, dp: Package): return dp + def get_res(self, current_dp, name): + ret = self.datapackage.get_resource(name) + if ret is None: + ret = current_dp.get_resource(name) + assert ret is not None + return ret + + def get_iterator(self, datastream): + current_dp = datastream.dp + res_iter_ = datastream.res_iter + + def func(): + res_iter = (ResourceWrapper(self.get_res(current_dp, rw.res.name), rw.it) + for rw in res_iter_) + res_iter = self.process_resources(res_iter) + res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it) + for res, it + in itertools.zip_longest(self.datapackage.resources, res_iter)) + return res_iter + return func + def _process(self): datastream = self.source._process() - current_dp = datastream.dp self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor)) self.datapackage = self.process_datapackage(self.datapackage) self.datapackage.commit() - res_iter = datastream.res_iter - - def get_res(name): - ret = self.datapackage.get_resource(name) - if ret is None: - ret = current_dp.get_resource(name) - assert ret is not None - return ret - - res_iter = (ResourceWrapper(get_res(rw.res.name), rw.it) - for rw in res_iter) - res_iter = self.process_resources(res_iter) - res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it) - for res, it - in itertools.zip_longest(self.datapackage.resources, res_iter)) - - return DataStream(self.datapackage, res_iter, datastream.stats + [self.stats]) + return DataStream(self.datapackage, + LazyIterator(self.get_iterator(datastream)), + datastream.stats + [self.stats]) def process(self): ds = self._process() diff --git a/dataflows/helpers/iterable_loader.py b/dataflows/helpers/iterable_loader.py index ad6d9cf..570ab9a 100644 --- a/dataflows/helpers/iterable_loader.py +++ b/dataflows/helpers/iterable_loader.py @@ -77,7 +77,7 @@ def handle_iterable(self): try: for x in self.iterable: if mode is None: - assert isinstance(x, (dict, list)) + assert isinstance(x, (dict, list)), 'Bad item %r' % x mode = dict if isinstance(x, dict) else list assert isinstance(x, mode) if mode == dict: diff --git a/dataflows/processors/delete_fields.py b/dataflows/processors/delete_fields.py index 0927979..2461408 100644 --- a/dataflows/processors/delete_fields.py +++ b/dataflows/processors/delete_fields.py @@ -3,9 +3,11 @@ def process_resource(rows, fields): for row in rows: - for field in fields: - del row[field] - yield row + yield dict( + (k, v) + for k, v in row.items() + if k not in fields + ) def delete_fields(fields, resources=None): diff --git a/dataflows/processors/printer.py b/dataflows/processors/printer.py index e9a9cda..749a7b4 100644 --- a/dataflows/processors/printer.py +++ b/dataflows/processors/printer.py @@ -59,10 +59,10 @@ def func(rows): x = 1 for i, row in enumerate(rows): - yield row index = i + 1 - row = [index] + [truncate_cell(row[f], max_cell_size) for f in field_names] + prow = [index] + [truncate_cell(row[f], max_cell_size) for f in field_names] + yield row if index - x == (num_rows + 1): x *= num_rows @@ -71,9 +71,9 @@ def func(rows): last.clear() if toprint and toprint[-1][0] != index - 1: toprint.append(['...']) - toprint.append(row) + toprint.append(prow) else: - last.append(row) + last.append(prow) if len(last) > (last_rows or num_rows): last = last[1:]