Skip to content

Commit

Permalink
Merge af7200d into eb19b32
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Mar 30, 2019
2 parents eb19b32 + af7200d commit c53c38a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 27 deletions.
2 changes: 1 addition & 1 deletion dataflows/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.45
0.0.46
51 changes: 33 additions & 18 deletions dataflows/base/datastream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@
from .schema_validator import schema_validator


class LazyIterator:

def __init__(self, get_iterator):
self.get_iterator = get_iterator

def __iter__(self):
return self.get_iterator()


class DataStreamProcessor:

def __init__(self):
Expand Down Expand Up @@ -38,31 +47,37 @@ def process_row(self, row):
def process_datapackage(self, dp: Package):
return dp

def get_res(self, current_dp, name):
ret = self.datapackage.get_resource(name)
if ret is None:
ret = current_dp.get_resource(name)
assert ret is not None
return ret

def get_iterator(self, datastream):
current_dp = datastream.dp
res_iter_ = datastream.res_iter

def func():
res_iter = (ResourceWrapper(self.get_res(current_dp, rw.res.name), rw.it)
for rw in res_iter_)
res_iter = self.process_resources(res_iter)
res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it)
for res, it
in itertools.zip_longest(self.datapackage.resources, res_iter))
return res_iter
return func

def _process(self):
datastream = self.source._process()
current_dp = datastream.dp

self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor))
self.datapackage = self.process_datapackage(self.datapackage)
self.datapackage.commit()

res_iter = datastream.res_iter

def get_res(name):
ret = self.datapackage.get_resource(name)
if ret is None:
ret = current_dp.get_resource(name)
assert ret is not None
return ret

res_iter = (ResourceWrapper(get_res(rw.res.name), rw.it)
for rw in res_iter)
res_iter = self.process_resources(res_iter)
res_iter = (it if isinstance(it, ResourceWrapper) else ResourceWrapper(res, it)
for res, it
in itertools.zip_longest(self.datapackage.resources, res_iter))

return DataStream(self.datapackage, res_iter, datastream.stats + [self.stats])
return DataStream(self.datapackage,
LazyIterator(self.get_iterator(datastream)),
datastream.stats + [self.stats])

def process(self):
ds = self._process()
Expand Down
2 changes: 1 addition & 1 deletion dataflows/helpers/iterable_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def handle_iterable(self):
try:
for x in self.iterable:
if mode is None:
assert isinstance(x, (dict, list))
assert isinstance(x, (dict, list)), 'Bad item %r' % x
mode = dict if isinstance(x, dict) else list
assert isinstance(x, mode)
if mode == dict:
Expand Down
8 changes: 5 additions & 3 deletions dataflows/processors/delete_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

def process_resource(rows, fields):
for row in rows:
for field in fields:
del row[field]
yield row
yield dict(
(k, v)
for k, v in row.items()
if k not in fields
)


def delete_fields(fields, resources=None):
Expand Down
8 changes: 4 additions & 4 deletions dataflows/processors/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ def func(rows):
x = 1

for i, row in enumerate(rows):
yield row

index = i + 1
row = [index] + [truncate_cell(row[f], max_cell_size) for f in field_names]
prow = [index] + [truncate_cell(row[f], max_cell_size) for f in field_names]
yield row

if index - x == (num_rows + 1):
x *= num_rows
Expand All @@ -71,9 +71,9 @@ def func(rows):
last.clear()
if toprint and toprint[-1][0] != index - 1:
toprint.append(['...'])
toprint.append(row)
toprint.append(prow)
else:
last.append(row)
last.append(prow)
if len(last) > (last_rows or num_rows):
last = last[1:]

Expand Down

0 comments on commit c53c38a

Please sign in to comment.