Skip to content

Commit

Permalink
Merge 7d6fdc4 into 9e4433b
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed Oct 18, 2018
2 parents 9e4433b + 7d6fdc4 commit 6d7871e
Show file tree
Hide file tree
Showing 69 changed files with 940 additions and 1,316 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,6 @@ tests/env/dummy/type-tests-output2
tests/cli/.code
.dpp
.coverage.*
.code/
.vscode/

93 changes: 47 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ worldbank-co2-emissions:
description: Data per year, provided in metric tons per capita.
pipeline:
-
run: add_metadata
run: update_package
parameters:
name: 'co2-emissions'
title: 'CO2 emissions (metric tons per capita)'
Expand All @@ -57,7 +57,7 @@ worldbank-co2-emissions:
"[12][0-9]{3}":
type: number
-
run: dump.to_zip
run: dump_to_zip
parameters:
out-file: co2-emissions-wb.zip
```
Expand All @@ -69,7 +69,7 @@ In this example we see one pipeline called `worldbank-co2-emissions`. Its pipeli
This resource has a `name` and a `url`, pointing to the remote location of the data.
- `stream_remote_resources`: This processor will stream data from resources (like the one we defined in the 1st step) into the pipeline, on to processors further down the pipeline (see more about streaming below).
- `set_types`: This processor assigns data types to fields in the data. In this example, field headers looking like years will be assigned the `number` type.
- `dump.to_zip`: Create a zipped and validated datapackage with the provided file name.
- `dump_to_zip`: Create a zipped and validated datapackage with the provided file name.

### Mechanics

Expand All @@ -96,17 +96,17 @@ Available Pipelines:

$ dpp run ./worldbank-co2-emissions
INFO :Main:RUNNING ./worldbank-co2-emissions
INFO :Main:- lib/add_metadata.py
INFO :Main:- lib/update_package.py
INFO :Main:- lib/add_resource.py
INFO :Main:- lib/stream_remote_resources.py
INFO :Main:- lib/dump/to_zip.py
INFO :Main:DONE lib/add_metadata.py
INFO :Main:DONE lib/update_package.py
INFO :Main:DONE lib/add_resource.py
INFO :Main:stream_remote_resources: OPENING http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel
INFO :Main:stream_remote_resources: TOTAL 264 rows
INFO :Main:stream_remote_resources: Processed 264 rows
INFO :Main:DONE lib/stream_remote_resources.py
INFO :Main:dump.to_zip: INFO :Main:Processed 264 rows
INFO :Main:dump_to_zip: INFO :Main:Processed 264 rows
INFO :Main:DONE lib/dump/to_zip.py
INFO :Main:RESULTS:
INFO :Main:SUCCESS: ./worldbank-co2-emissions
Expand Down Expand Up @@ -218,7 +218,7 @@ Pythonic interface to running pipelines. You can integrate dataflows within pipe
instead of `run`. For example, given the following flow file, saved under `my-flow.py`:

```
from dataflows import Flow, dump_to_path, load, add_metadata
from dataflows import Flow, dump_to_path, load, update_package
def flow(parameters, datapackage, resources, stats):
stats['multiplied_fields'] = 0
Expand All @@ -229,8 +229,7 @@ def flow(parameters, datapackage, resources, stats):
stats['multiplied_fields'] += 1
return step
return Flow(add_metadata(name='my-datapackage'),
load((datapackage, resources),
return Flow(update_package(name='my-datapackage'),
multiply('my-field', 2))
```

Expand All @@ -244,7 +243,7 @@ my-flow:
url: http://example.com/my-datapackage/datapackage.json
resource: my-resource
- flow: my-flow
- run: dump.to_path
- run: dump_to_path
```

You can run the pipeline using `dpp run my-flow`.
Expand All @@ -253,7 +252,7 @@ You can run the pipeline using `dpp run my-flow`.

A few built in processors are provided with the library.

### ***`add_metadata`***
### ***`update_package`***

Adds meta-data to the data-package.

Expand All @@ -264,7 +263,7 @@ Any allowed property (according to the [spec]([http://specs.frictionlessdata.io/
*Example*:

```yaml
- run: add_metadata
- run: update_package
parameters:
name: routes-to-mordor
license: CC-BY-SA-4
Expand Down Expand Up @@ -1015,7 +1014,7 @@ parameters:
direction: \\2 # Second member of group from above
```

### ***`dump.to_sql`***
### ***`dump_to_sql`***

Saves the datapackage to an SQL database.

Expand All @@ -1042,7 +1041,7 @@ _Parameters_:
- `false` - row was inserted
- `updated_id_column` - Optional name of a column that will be added to the spewed data and contain the id of the updated row in DB.

### ***`dump.to_path`***
### ***`dump_to_path`***

Saves the datapackage to a filesystem path.

Expand Down Expand Up @@ -1080,23 +1079,23 @@ _Parameters_:
- Note that such changes may make the resulting datapackage incompatible with the frictionlessdata specs and may cause interoperability problems.
- Example usage: [pipeline-spec.yaml](tests/cli/pipeline-spec.yaml) (under the `custom-formatters` pipeline), [XLSXFormat class](tests/cli/custom_formatters/xlsx_format.py)

### ***`dump.to_zip`***
### ***`dump_to_zip`***

Saves the datapackage to a zipped archive.

_Parameters_:

- `out-file` - Name of the output file where the zipped data will be stored
- `force-format` and `format` - Same as in `dump.to_path`
- `handle-non-tabular` - Same as in `dump.to_path`
- `add-filehash-to-path` - Same as in `dump.to_path`
- `counters` - Same as in `dump.to_path`
- `pretty-descriptor` - Same as in `dump.to_path`
- `file-formatters` - Same as in `dump.to_path`
- `force-format` and `format` - Same as in `dump_to_path`
- `handle-non-tabular` - Same as in `dump_to_path`
- `add-filehash-to-path` - Same as in `dump_to_path`
- `counters` - Same as in `dump_to_path`
- `pretty-descriptor` - Same as in `dump_to_path`
- `file-formatters` - Same as in `dump_to_path`

#### *Note*

`dump.to_path` and `dump.to_zip` processors will handle non-tabular resources as well.
`dump_to_path` and `dump_to_zip` processors will handle non-tabular resources as well.
These resources must have both a `url` and `path` properties, and _must not_ contain a `schema` property.
In such cases, the file will be downloaded from the `url` and placed in the provided `path`.

Expand All @@ -1108,8 +1107,6 @@ For that you might need to write your own processor - here's how it's done.

There are two APIs for writing processors - the high level API and the low level API.

**Important**: due to the way that pipeline execution is implemented, you **cannot** `print` from within a processor. In case you need to debug, _only_ use the `logging` module to print out anything you need.

### High Level Processor API

The high-level API is quite useful for most processor kinds:
Expand Down Expand Up @@ -1211,32 +1208,35 @@ In some cases, the high-level API might be too restricting. In these cases you s
```python
from datapackage_pipelines.wrapper import ingest, spew

parameters, datapackage, resource_iterator = ingest()
if __name__ == '__main__':
with ingest() as ctx:

# Initialisation code, if needed
# Initialisation code, if needed

# Do stuff with datapackage
# ...
# Do stuff with datapackage
# ...

stats = {}
stats = {}

# and resources:
def new_resource_iterator(resource_iterator_):
def resource_processor(resource_):
# resource_.spec is the resource descriptor
for row in resource_:
# Do something with row
# Perhaps collect some stats here as well
yield row
for resource in resource_iterator_:
yield resource_processor(resource)
# and resources:
def new_resource_iterator(resource_iterator_):
def resource_processor(resource_):
# resource_.spec is the resource descriptor
for row in resource_:
# Do something with row
# Perhaps collect some stats here as well
yield row
for resource in resource_iterator_:
yield resource_processor(resource)

spew(datapackage, new_resource_iterator(resource_iterator), stats)
spew(ctx.datapackage,
new_resource_iterator(ctx.resource_iterator),
ctx.stats)
```

The above code snippet shows the structure of most low-level processors.

We always start with calling `ingest()` - this method gives us the execution parameters, the data-package descriptor (as outputed from the previous step) and an iterator on all streamed resources' rows.
We always start with calling `ingest()` - this method gives us the context, containing the execution parameters, the data-package descriptor (as outputed from the previous step) and an iterator on all streamed resources' rows.

We finish the processing by calling `spew()`, which sends the processed data to the next processor in the pipeline. `spew` receives:
* A modified data-package descriptor;
Expand Down Expand Up @@ -1265,9 +1265,10 @@ We'll start with the same processors from above, now implemented with the low le
# Add license information
from datapackage_pipelines.wrapper import ingest, spew

_, datapackage, resource_iterator = ingest()
datapackage['license'] = 'MIT'
spew(datapackage, resource_iterator)
if __name__ == '__main__':
with ingest() as ctx:
ctx.datapackage['license'] = 'MIT'
spew(ctx.datapackage, ctx.resource_iterator)
```

```python
Expand Down Expand Up @@ -1431,7 +1432,7 @@ class Generator(GeneratorBase):
('metadata', {
'name': dataset_name
}),
('dump.to_zip', {
('dump_to_zip', {
'out-file': 'ckan-datapackage.zip'
})])
pipeline_details = {
Expand All @@ -1451,7 +1452,7 @@ data-kind: package-list

Then when running `dpp` we will see an available pipeline named `./example-com-list-of-packages`

This pipeline would internally be composed of 3 steps: `ckan.scraper`, `metadata` and `dump.to_zip`.
This pipeline would internally be composed of 3 steps: `ckan.scraper`, `metadata` and `dump_to_zip`.

#### Validating Source Descriptors

Expand Down
2 changes: 1 addition & 1 deletion datapackage_pipelines/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.7.2
2.0.0
90 changes: 13 additions & 77 deletions datapackage_pipelines/lib/add_computed_field.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,17 @@
import functools
import collections
from dataflows import Flow, add_computed_field
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.flow_utils import spew_flow

from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher

parameters, datapackage, resource_iterator = ingest()
def flow(parameters):
return Flow(
add_computed_field(
parameters.get('fields', []),
parameters.get('resources')
),
)

resources = ResourceMatcher(parameters.get('resources'))
fields = parameters.get('fields', [])


def modify_datapackage(datapackage_):
dp_resources = datapackage_.get('resources', [])
for resource_ in dp_resources:
if resources.match(resource_['name']):
new_fields = [
{
'name': f['target'],
'type': get_type(resource_['schema']['fields'],
f.get('source', []),
f['operation'])
} for f in fields
]
resource_['schema']['fields'] += new_fields
return datapackage_


def get_type(res_fields, operation_fields, operation):
types = [f.get('type') for f in res_fields if f['name'] in operation_fields]
if 'any' in types:
return 'any'
if (operation == 'format') or (operation == 'join'):
return 'string'
if ('number' in types) or (operation == 'avg'):
return 'number'
# integers
if len(types):
return types[0]
# constant
return 'any'


def process_resource(rows):
for row in rows:
for field in fields:
values = [
row.get(c) for c in field.get('source', []) if row.get(c) is not None
]
with_ = field.get('with', '')
new_col = AGGREGATORS[field['operation']].func(values, with_, row)
row[field['target']] = new_col
yield row


def process_resources(resource_iterator_):
for resource in resource_iterator_:
spec = resource.spec
if not resources.match(spec['name']):
yield resource
else:
yield process_resource(resource)


Aggregator = collections.namedtuple('Aggregator', ['func'])

AGGREGATORS = {
'sum': Aggregator(lambda values, fstr, row: sum(values)),
'avg': Aggregator(lambda values, fstr, row: sum(values) / len(values)),
'max': Aggregator(lambda values, fstr, row: max(values)),
'min': Aggregator(lambda values, fstr, row: min(values)),
'multiply': Aggregator(
lambda values, fstr, row: functools.reduce(lambda x, y: x*y, values)),
'constant': Aggregator(lambda values, fstr, row: fstr),
'join': Aggregator(
lambda values, fstr, row: fstr.join([str(x) for x in values])),
'format': Aggregator(lambda values, fstr, row: fstr.format(**row)),
}


spew(modify_datapackage(datapackage), process_resources(resource_iterator))
if __name__ == '__main__':
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)
20 changes: 13 additions & 7 deletions datapackage_pipelines/lib/add_metadata.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from datapackage_pipelines.wrapper import ingest, spew
import warnings

parameters, datapackage, res_iter = ingest()
if datapackage is None:
datapackage = parameters
else:
datapackage.update(parameters)
from datapackage_pipelines.wrapper import ingest
from datapackage_pipelines.utilities.flow_utils import spew_flow

spew(datapackage, res_iter)
from datapackage_pipelines.lib.update_package import flow


if __name__ == '__main__':
warnings.warn(
'add_metadata will be removed in the future, use "update_package" instead',
DeprecationWarning
)
with ingest() as ctx:
spew_flow(flow(ctx.parameters), ctx)

0 comments on commit 6d7871e

Please sign in to comment.