Skip to content

Commit

Permalink
Merge 9c26255 into 9e4433b
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Oct 23, 2018
2 parents 9e4433b + 9c26255 commit 57c113e
Show file tree
Hide file tree
Showing 69 changed files with 1,119 additions and 1,455 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -95,3 +95,6 @@ tests/env/dummy/type-tests-output2
tests/cli/.code
.dpp
.coverage.*
.code/
.vscode/

360 changes: 229 additions & 131 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion datapackage_pipelines/VERSION
@@ -1 +1 @@
1.7.2
2.0.0
90 changes: 13 additions & 77 deletions datapackage_pipelines/lib/add_computed_field.py
@@ -1,81 +1,17 @@
import functools
import collections
from dataflows import Flow, add_computed_field
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.flow_utils import spew_flow

from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher

parameters, datapackage, resource_iterator = ingest()
def flow(parameters):
return Flow(
add_computed_field(
parameters.get('fields', []),
parameters.get('resources')
),
)

resources = ResourceMatcher(parameters.get('resources'))
fields = parameters.get('fields', [])


def modify_datapackage(datapackage_):
dp_resources = datapackage_.get('resources', [])
for resource_ in dp_resources:
if resources.match(resource_['name']):
new_fields = [
{
'name': f['target'],
'type': get_type(resource_['schema']['fields'],
f.get('source', []),
f['operation'])
} for f in fields
]
resource_['schema']['fields'] += new_fields
return datapackage_


def get_type(res_fields, operation_fields, operation):
types = [f.get('type') for f in res_fields if f['name'] in operation_fields]
if 'any' in types:
return 'any'
if (operation == 'format') or (operation == 'join'):
return 'string'
if ('number' in types) or (operation == 'avg'):
return 'number'
# integers
if len(types):
return types[0]
# constant
return 'any'


def process_resource(rows):
for row in rows:
for field in fields:
values = [
row.get(c) for c in field.get('source', []) if row.get(c) is not None
]
with_ = field.get('with', '')
new_col = AGGREGATORS[field['operation']].func(values, with_, row)
row[field['target']] = new_col
yield row


def process_resources(resource_iterator_):
for resource in resource_iterator_:
spec = resource.spec
if not resources.match(spec['name']):
yield resource
else:
yield process_resource(resource)


Aggregator = collections.namedtuple('Aggregator', ['func'])

AGGREGATORS = {
'sum': Aggregator(lambda values, fstr, row: sum(values)),
'avg': Aggregator(lambda values, fstr, row: sum(values) / len(values)),
'max': Aggregator(lambda values, fstr, row: max(values)),
'min': Aggregator(lambda values, fstr, row: min(values)),
'multiply': Aggregator(
lambda values, fstr, row: functools.reduce(lambda x, y: x*y, values)),
'constant': Aggregator(lambda values, fstr, row: fstr),
'join': Aggregator(
lambda values, fstr, row: fstr.join([str(x) for x in values])),
'format': Aggregator(lambda values, fstr, row: fstr.format(**row)),
}


spew(modify_datapackage(datapackage), process_resources(resource_iterator))
if __name__ == '__main__':
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)
20 changes: 13 additions & 7 deletions datapackage_pipelines/lib/add_metadata.py
@@ -1,9 +1,15 @@
from datapackage_pipelines.wrapper import ingest, spew
import warnings

parameters, datapackage, res_iter = ingest()
if datapackage is None:
datapackage = parameters
else:
datapackage.update(parameters)
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.flow_utils import spew_flow

spew(datapackage, res_iter)
from datapackage_pipelines.lib.update_package import flow


if __name__ == '__main__':
warnings.warn(
'add_metadata will be removed in the future, use "update_package" instead',
DeprecationWarning
)
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)
147 changes: 24 additions & 123 deletions datapackage_pipelines/lib/concatenate.py
@@ -1,124 +1,25 @@
import itertools

from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher
from dataflows import Flow, concatenate, update_resource
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.resources import PROP_STREAMING

parameters, datapackage, resource_iterator = ingest()

sources = ResourceMatcher(parameters.get('sources'))

target = parameters.get('target', {})
if 'name' not in target:
target['name'] = 'concat'
if 'path' not in target:
target['path'] = 'data/' + target['name'] + '.csv'
target.update(dict(
mediatype='text/csv',
schema=dict(fields=[], primaryKey=[]),
))
target[PROP_STREAMING] = True

fields = parameters['fields']

# Create mapping between source field names to target field names
field_mapping = {}
for target_field, source_fields in fields.items():
if source_fields is not None:
for source_field in source_fields:
if source_field in field_mapping:
raise RuntimeError('Duplicate appearance of %s (%r)' % (source_field, field_mapping))
field_mapping[source_field] = target_field

if target_field in field_mapping:
raise RuntimeError('Duplicate appearance of %s' % target_field)

field_mapping[target_field] = target_field

# Create the schema for the target resource
needed_fields = sorted(fields.keys())
for resource in datapackage['resources']:
if not sources.match(resource['name']):
continue

schema = resource.get('schema', {})
pk = schema.get('primaryKey', [])
for field in schema.get('fields', []):
orig_name = field['name']
if orig_name in field_mapping:
name = field_mapping[orig_name]
if name not in needed_fields:
continue
if orig_name in pk:
target['schema']['primaryKey'].append(name)
target['schema']['fields'].append(field)
field['name'] = name
needed_fields.remove(name)

if len(target['schema']['primaryKey']) == 0:
del target['schema']['primaryKey']

for name in needed_fields:
target['schema']['fields'].append(dict(
name=name, type='string'
))

# Update resources in datapackage (make sure they are consecutive)
prefix = True
suffix = False
num_concatenated = 0
new_resources = []
for resource in datapackage['resources']:
name = resource['name']
match = sources.match(name)
if prefix:
if match:
prefix = False
num_concatenated += 1
else:
new_resources.append(resource)
elif suffix:
assert not match
new_resources.append(resource)
else:
if not match:
suffix = True
new_resources.append(target)
new_resources.append(resource)
else:
num_concatenated += 1
if not suffix:
new_resources.append(target)


datapackage['resources'] = new_resources

all_target_fields = set(fields.keys())


def concatenator(resources):
for resource_ in resources:
for row in resource_:
processed = dict((k, '') for k in all_target_fields)
values = [(field_mapping[k], v) for (k, v)
in row.items()
if k in field_mapping]
assert len(values) > 0
processed.update(dict(values))
yield processed


def new_resource_iterator(resource_iterator_):
while True:
resource_ = next(resource_iterator_)
if sources.match(resource_.spec['name']):
resource_chain = \
itertools.chain([resource_],
itertools.islice(resource_iterator_,
num_concatenated-1))
yield concatenator(resource_chain)
else:
yield resource_


spew(datapackage, new_resource_iterator(resource_iterator))
from datapackage_pipelines.utilities.flow_utils import spew_flow


def flow(parameters):
return Flow(
concatenate(
parameters.get('fields', {}),
parameters.get('target', {}),
parameters.get('sources')
),
update_resource(
parameters.get('target', {}).get('name', 'concat'),
**{
PROP_STREAMING: True
}
)
)


if __name__ == '__main__':
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)
52 changes: 13 additions & 39 deletions datapackage_pipelines/lib/delete_fields.py
@@ -1,43 +1,17 @@
from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher
from dataflows import Flow, delete_fields
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.flow_utils import spew_flow

parameters, datapackage, resource_iterator = ingest()

resources = ResourceMatcher(parameters.get('resources'))
fields = parameters.get('fields', [])
def flow(parameters):
return Flow(
delete_fields(
parameters.get('fields', []),
parameters.get('resources')
)
)


def delete_fields(datapackage_):
dp_resources = datapackage_.get('resources', [])
for resource_ in dp_resources:
if resources.match(resource_['name']):
dp_fields = resource_['schema'].get('fields', [])
field_names = [f['name'] for f in dp_fields]
non_existings = [f for f in fields if f not in field_names]

assert len(non_existings) == 0, \
"Can't find following field(s): %s" % '; '.join(non_existings)

new_fields = list(
filter(lambda x: x['name'] not in fields, dp_fields))
resource_['schema']['fields'] = new_fields
return datapackage_


def process_resource(rows):
for row in rows:
for field in fields:
del row[field]
yield row


def process_resources(resource_iterator_):
for resource in resource_iterator_:
spec = resource.spec
if not resources.match(spec['name']):
yield resource
else:
yield process_resource(resource)


spew(delete_fields(datapackage), process_resources(resource_iterator))
if __name__ == '__main__':
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)
5 changes: 5 additions & 0 deletions datapackage_pipelines/lib/dump/to_path.py
@@ -1,5 +1,6 @@
import os
import shutil
import warnings

from datapackage_pipelines.lib.dump.dumper_base import FileDumper

Expand Down Expand Up @@ -29,4 +30,8 @@ def __makedirs(path):


if __name__ == '__main__':
warnings.warn(
'dump.to_path will be removed in the future, use "dump_to_path" instead',
DeprecationWarning
)
PathDumper()()

0 comments on commit 57c113e

Please sign in to comment.