Skip to content

Commit

Permalink
Merge 1cd136b into 30f627e
Browse files Browse the repository at this point in the history
  • Loading branch information
roll committed Jun 21, 2020
2 parents 30f627e + 1cd136b commit 1578f3d
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 4 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Here's an example of a `pipeline-spec.yaml` file:
worldbank-co2-emissions:
title: CO2 emission data from the World Bank
description: Data per year, provided in metric tons per capita.
environment:
DEBUG: true
pipeline:
-
run: update_package
Expand Down Expand Up @@ -61,12 +63,20 @@ worldbank-co2-emissions:

In this example we see one pipeline called `worldbank-co2-emissions`. Its pipeline consists of 4 steps:

- `metadata`: This is a library processor (see below), which modifies the data-package's descriptor (in our case: the initial, empty descriptor) - adding `name`, `title` and other properties to the datapackage.
- `update_package`: This is a library processor (see below), which modifies the data-package's descriptor (in our case: the initial, empty descriptor) - adding `name`, `title` and other properties to the datapackage.
- `load`: This is another library processor, which loads data into the data-package.
This resource has a `name` and a `from` property, pointing to the remote location of the data.
- `set_types`: This processor assigns data types to fields in the data. In this example, field headers looking like years will be assigned the `number` type.
- `dump_to_zip`: Create a zipped and validated datapackage with the provided file name.

Also, we have provided some metadata:

- `title`: Title of a pipeline
- `description`: Description of a pipeline
- `environment`: Dictionary of environment variables to be set for all the pipeline's steps. For examples, it can be used to change the behaviour of the underlaying `requests` library - https://requests.readthedocs.io/en/master/user/advanced/#ssl-cert-verification

> Full JSONSchema of the `pipeline-spec.yaml` file can be found [here](https://github.com/frictionlessdata/datapackage-pipelines/blob/master/datapackage_pipelines/specs/schemas/pipeline-spec.schema.json)
### Mechanics

An important aspect of how the pipelines are run is the fact that data is passed in streams from one processor to another. If we get "technical" here, then each processor is run in its own dedicated process, where the datapackage is read from its `stdin` and output to its `stdout`. The important thing to note here is that no processor holds the entire data set at any point.
Expand Down
3 changes: 3 additions & 0 deletions datapackage_pipelines/manager/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ def run_pipelines(pipeline_id_pattern,
continue

if slave:
# Set environment variables for the pipeline
for key, value in spec.environment.items():
os.environ[key] = str(value)
ps = status_manager.get(spec.pipeline_id)
ps.init(spec.pipeline_details,
spec.source_details,
Expand Down
4 changes: 3 additions & 1 deletion datapackage_pipelines/specs/parsers/base_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ def __init__(self,
validation_errors=None,
dependencies=None,
cache_hash='',
schedule=None):
schedule=None,
environment=None):
self.path = path
self.pipeline_id = pipeline_id
self.pipeline_details = pipeline_details
Expand All @@ -16,6 +17,7 @@ def __init__(self,
self.dependencies = [] if dependencies is None else dependencies
self.cache_hash = cache_hash
self.schedule = schedule
self.environment = environment

def __str__(self):
return 'PipelineSpec({}, validation_errors={}, ' \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"description": {
"type": "string"
},
"environment": {
"type": "object"
},
"schedule": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -69,4 +72,4 @@
}
}
}
}
}
7 changes: 7 additions & 0 deletions datapackage_pipelines/specs/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ def process_schedules(spec: PipelineSpec):
spec.schedule = schedule


def process_environment(spec: PipelineSpec):
if spec.environment is None:
environment = spec.pipeline_details.get('environment', {})
spec.environment = environment


def find_specs(root_dir='.') -> PipelineSpec:
for dirpath, dirnames, filenames in dirtools.Dir(root_dir,
exclude_file='.dpp_spec_ignore',
Expand Down Expand Up @@ -83,6 +89,7 @@ def pipelines(prefixes=None, ignore_missing_deps=False, root_dir='.', status_man

resolve_processors(spec)
process_schedules(spec)
process_environment(spec)

try:
hasher.calculate_hash(spec, status_manager, ignore_missing_deps)
Expand Down
2 changes: 1 addition & 1 deletion pylama.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[pylama]
linters = pyflakes,mccabe,pep8
ignore = E128,E301
ignore = E128,E301,E741

[pylama:pep8]
max_line_length = 120
Expand Down

0 comments on commit 1578f3d

Please sign in to comment.