diff --git a/.gitignore b/.gitignore index c284b9ff..7e2e4bde 100644 --- a/.gitignore +++ b/.gitignore @@ -95,3 +95,6 @@ tests/env/dummy/type-tests-output2 tests/cli/.code .dpp .coverage.* +.code/ +.vscode/ + diff --git a/README.md b/README.md index 90b6c90a..8e6e7ce7 100644 --- a/README.md +++ b/README.md @@ -34,21 +34,18 @@ worldbank-co2-emissions: description: Data per year, provided in metric tons per capita. pipeline: - - run: add_metadata + run: update_package parameters: name: 'co2-emissions' title: 'CO2 emissions (metric tons per capita)' homepage: 'http://worldbank.org/' - - run: add_resource + run: load parameters: + from: "http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel" name: 'global-data' - url: "http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel" format: xls headers: 4 - - - run: stream_remote_resources - cache: True - run: set_types parameters: @@ -57,7 +54,7 @@ worldbank-co2-emissions: "[12][0-9]{3}": type: number - - run: dump.to_zip + run: dump_to_zip parameters: out-file: co2-emissions-wb.zip ``` @@ -65,11 +62,10 @@ worldbank-co2-emissions: In this example we see one pipeline called `worldbank-co2-emissions`. Its pipeline consists of 4 steps: - `metadata`: This is a library processor (see below), which modifies the data-package's descriptor (in our case: the initial, empty descriptor) - adding `name`, `title` and other properties to the datapackage. -- `add_resource`: This is another library processor, which adds a single resource to the data-package. - This resource has a `name` and a `url`, pointing to the remote location of the data. -- `stream_remote_resources`: This processor will stream data from resources (like the one we defined in the 1st step) into the pipeline, on to processors further down the pipeline (see more about streaming below). +- `load`: This is another library processor, which loads data into the data-package. + This resource has a `name` and a `from` property, pointing to the remote location of the data. - `set_types`: This processor assigns data types to fields in the data. In this example, field headers looking like years will be assigned the `number` type. -- `dump.to_zip`: Create a zipped and validated datapackage with the provided file name. +- `dump_to_zip`: Create a zipped and validated datapackage with the provided file name. ### Mechanics @@ -94,23 +90,37 @@ $ dpp Available Pipelines: - ./worldbank-co2-emissions (*) -$ dpp run ./worldbank-co2-emissions -INFO :Main:RUNNING ./worldbank-co2-emissions -INFO :Main:- lib/add_metadata.py -INFO :Main:- lib/add_resource.py -INFO :Main:- lib/stream_remote_resources.py -INFO :Main:- lib/dump/to_zip.py -INFO :Main:DONE lib/add_metadata.py -INFO :Main:DONE lib/add_resource.py -INFO :Main:stream_remote_resources: OPENING http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel -INFO :Main:stream_remote_resources: TOTAL 264 rows -INFO :Main:stream_remote_resources: Processed 264 rows -INFO :Main:DONE lib/stream_remote_resources.py -INFO :Main:dump.to_zip: INFO :Main:Processed 264 rows -INFO :Main:DONE lib/dump/to_zip.py -INFO :Main:RESULTS: -INFO :Main:SUCCESS: ./worldbank-co2-emissions - {'dataset-name': 'co2-emissions', 'total_row_count': 264} +$ $ dpp run --verbose ./worldbank-co2-emissions +RUNNING ./worldbank-co2-emissions +Collecting dependencies +Running async task +Waiting for completion +Async task starting +Searching for existing caches +Building process chain: +- update_package +- load +- set_types +- dump_to_zip +- (sink) +DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/update_package.py +load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80 +load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736 +load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736 +load: DEBUG :Starting new HTTP connection (1): api.worldbank.org:80 +load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736 +load: DEBUG :http://api.worldbank.org:80 "GET /v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel HTTP/1.1" 200 308736 +set_types: INFO :(,) +load: INFO :Processed 264 rows +set_types: INFO :Processed 264 rows +DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/load.py +DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/set_types.py +dump_to_zip: INFO :Processed 264 rows +DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/manager/../lib/internal/sink.py +DONE /Users/adam/code/dhq/specstore/dpp_repo/datapackage_pipelines/specs/../lib/dump_to_zip.py +DONE V ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'} +INFO :RESULTS: +INFO :SUCCESS: ./worldbank-co2-emissions {'bytes': 692741, 'count_of_rows': 264, 'dataset_name': 'co2-emissions', 'hash': '4dd18effcdfbf5fc267221b4ffc28fa4'} ``` Alternatively, you could use our [Docker](https://www.docker.com/) image: @@ -218,7 +228,7 @@ Pythonic interface to running pipelines. You can integrate dataflows within pipe instead of `run`. For example, given the following flow file, saved under `my-flow.py`: ``` -from dataflows import Flow, dump_to_path, load, add_metadata +from dataflows import Flow, dump_to_path, load, update_package def flow(parameters, datapackage, resources, stats): stats['multiplied_fields'] = 0 @@ -229,8 +239,7 @@ def flow(parameters, datapackage, resources, stats): stats['multiplied_fields'] += 1 return step - return Flow(add_metadata(name='my-datapackage'), - load((datapackage, resources), + return Flow(update_package(name='my-datapackage'), multiply('my-field', 2)) ``` @@ -244,7 +253,7 @@ my-flow: url: http://example.com/my-datapackage/datapackage.json resource: my-resource - flow: my-flow - - run: dump.to_path + - run: dump_to_path ``` You can run the pipeline using `dpp run my-flow`. @@ -253,7 +262,7 @@ You can run the pipeline using `dpp run my-flow`. A few built in processors are provided with the library. -### ***`add_metadata`*** +### ***`update_package`*** Adds meta-data to the data-package. @@ -264,7 +273,7 @@ Any allowed property (according to the [spec]([http://specs.frictionlessdata.io/ *Example*: ```yaml -- run: add_metadata +- run: update_package parameters: name: routes-to-mordor license: CC-BY-SA-4 @@ -273,77 +282,36 @@ Any allowed property (according to the [spec]([http://specs.frictionlessdata.io/ - samwise gamgee ``` -### ***`add_resource`*** +### ***`load`*** -Adds a new external tabular resource to the data-package. +Loads data into the package, infers the schema and optionally casts values. _Parameters_: - -You should provide a `name` and `url` attributes, and other optional attributes as defined in the [spec]([http://specs.frictionlessdata.io/data-packages/#resource-information). - -`url` indicates where the data for this resource resides. Later on, when `stream_remote_resources` runs, it will use the `url` (which is stored in the resource in the `dpp:streamedFrom` property) to read the data rows and push them into the pipeline. - -Note that `url` also supports `env://`, which indicates that the resource url should be fetched from the indicated environment variable. This is useful in case you are supplying a string with sensitive information (such as an SQL connection string for streaming from a database table). - -Parameters are basically arguments that are passed to a `tabulator.Stream` instance (see the [API](https://github.com/frictionlessdata/tabulator-py#api-reference)). -Other than those, you can pass a `constants` parameter which should be a mapping of headers to string values. -When used in conjunction with `stream_remote_resources`, these constant values will be added to each generated row -(as well as to the default schema). - -You may also provide a schema here, or use the default schema generated by the `stream_remote_resources` processor. -In case `path` is specified, it will be used. If not, the `stream_remote_resources` processor will assign a `path` for you with a `csv` extension. - -*Example*: - -```yaml -- run: add_resource - parameters: - url: http://example.com/my-excel-file.xlsx - sheet: 1 - headers: 2 -- run: add_resource - parameters: - url: http://example.com/my-csv-file.csv - encoding: "iso-8859-2" -``` - -### ***`stream_remote_resources`*** - -Converts external resources to streamed resources. - -External resources are ones that link to a remote data source (url or file path), but are not processed by the pipeline and are kept as-is. - -Streamed resources are ones that can be processed by the pipeline, and their output is saved as part of the resulting datapackage. - -In case a resource has no schema, a default one is generated automatically here by creating a `string` field from each column in the data source. +- `from` - location of the data that is to be loaded. This can be either: + - a local path (e.g. /path/to/the/data.csv) + - a remote URL (e.g. https://path.to/the/data.csv) + - Other supported links, based on the current support of schemes and formats in [tabulator](https://github.com/frictionlessdata/tabulator-py#schemes) + - a local path or remote URL to a datapackage.json file (e.g. https://path.to/data_package/datapackage.json) + - a reference to an environment variable containing the source location, in the form of `env://ENV_VAR` + - a tuple containing (datapackage_descriptor, resources_iterator) +- `resources` - optional, relevant only if source points to a datapackage.json file or datapackage/resource tuple. Value should be one of the following: + - Name of a single resource to load + - A regular expression matching resource names to load + - A list of resource names to load + - 'None' indicates to load all resources + - The index of the resource in the package +- `validate` - Should data be casted to the inferred data-types or not. Relevant only when not loading data from datapackage. +- other options - based on the loaded file, extra options (e.g. sheet for Excel files etc., see the link to tabulator above) + +### ***`printer`*** + +Just prints whatever it sees. Good for debugging. _Parameters_: - -- `resources` - Which resources to stream. Can be: - - - List of strings, interpreted as resource names to stream - - String, interpreted as a regular expression to be used to match resource names - - If omitted, all resources in datapackage are streamed. - -- `ignore-missing` - if true, then missing resources won't raise an error but will be treated as 'empty' (i.e. with zero rows). - Resources with empty URLs will be treated the same (i.e. will generate an 'empty' resource). - -- `limit-rows` - if provided, will limit the number of rows fetched from the source. Takes an integer value which specifies how many rows of the source to stream. - -*Example*: - -```yaml -- run: stream_remote_resources - parameters: - resources: ['2014-data', '2015-data'] -- run: stream_remote_resources - parameters: - resources: '201[67]-data' -``` - -This processor also supports loading plain-text resources (e.g. html pages) and handling them as tabular data - split into rows with a single "data" column. -To enable this behavior, add the following attribute to the resource: `"format": "txt"`. +- `num_rows` - modify the number of rows to preview, printer will print multiple samples of this number of rows from different places in the stream +- `last_rows` - how many of the last rows in the stream to print. optional, defaults to the value of num_rows +- `fields` - optional, list of field names to preview +- `resources` - optional, allows to limit the printed resources, same semantics as load processor resources argument ### ***`set_types`*** @@ -1015,7 +983,7 @@ parameters: direction: \\2 # Second member of group from above ``` -### ***`dump.to_sql`*** +### ***`dump_to_sql`*** Saves the datapackage to an SQL database. @@ -1042,6 +1010,134 @@ _Parameters_: - `false` - row was inserted - `updated_id_column` - Optional name of a column that will be added to the spewed data and contain the id of the updated row in DB. +### ***`dump_to_path`*** + +Saves the datapackage to a filesystem path. + +_Parameters_: + +- `out-path` - Name of the output path where `datapackage.json` will be stored. + + This path will be created if it doesn't exist, as well as internal data-package paths. + + If omitted, then `.` (the current directory) will be assumed. + +- `force-format` - Specifies whether to force all output files to be generated with the same format + - if `True` (the default), all resources will use the same format + - if `False`, format will be deduced from the file extension. Resources with unknown extensions will be discarded. +- `format` - Specifies the type of output files to be generated (if `force-format` is true): `csv` (the default) or `json` +- `add-filehash-to-path`: Specifies whether to include file md5 hash into the resource path. Defaults to `False`. If `True` Embeds hash in path like so: + - If original path is `path/to/the/file.ext` + - Modified path will be `path/to/the/HASH/file.ext` +- `counters` - Specifies whether to count rows, bytes or md5 hash of the data and where it should be stored. An object with the following properties: + - `datapackage-rowcount`: Where should a total row count of the datapackage be stored (default: `count_of_rows`) + - `datapackage-bytes`: Where should a total byte count of the datapackage be stored (default: `bytes`) + - `datapackage-hash`: Where should an md5 hash of the datapackage be stored (default: `hash`) + - `resource-rowcount`: Where should a total row count of each resource be stored (default: `count_of_rows`) + - `resource-bytes`: Where should a total byte count of each resource be stored (default: `bytes`) + - `resource-hash`: Where should an md5 hash of each resource be stored (default: `hash`) + Each of these attributes could be set to null in order to prevent the counting. + Each property could be a dot-separated string, for storing the data inside a nested object (e.g. `stats.rowcount`) +- `pretty-descriptor`: Specifies how datapackage descriptor (`datapackage.json`) file will look like: + - `False` (default) - descriptor will be written in one line. + - `True` - descriptor will have indents and new lines for each key, so it becomes more human-readable. + +### ***`dump_to_zip`*** + +Saves the datapackage to a zipped archive. + +_Parameters_: + +- `out-file` - Name of the output file where the zipped data will be stored +- `force-format` and `format` - Same as in `dump_to_path` +- `add-filehash-to-path` - Same as in `dump_to_path` +- `counters` - Same as in `dump_to_path` +- `pretty-descriptor` - Same as in `dump_to_path` + +## Deprecated Processors + +These processors will be removed in the next major version. + +### ***`add_metadata`*** + +Alias for `update_package`, is kept for backward compatibility reasons. + +### ***`add_resource`*** + +Adds a new external tabular resource to the data-package. + +_Parameters_: + +You should provide a `name` and `url` attributes, and other optional attributes as defined in the [spec]([http://specs.frictionlessdata.io/data-packages/#resource-information). + +`url` indicates where the data for this resource resides. Later on, when `stream_remote_resources` runs, it will use the `url` (which is stored in the resource in the `dpp:streamedFrom` property) to read the data rows and push them into the pipeline. + +Note that `url` also supports `env://`, which indicates that the resource url should be fetched from the indicated environment variable. This is useful in case you are supplying a string with sensitive information (such as an SQL connection string for streaming from a database table). + +Parameters are basically arguments that are passed to a `tabulator.Stream` instance (see the [API](https://github.com/frictionlessdata/tabulator-py#api-reference)). +Other than those, you can pass a `constants` parameter which should be a mapping of headers to string values. +When used in conjunction with `stream_remote_resources`, these constant values will be added to each generated row +(as well as to the default schema). + +You may also provide a schema here, or use the default schema generated by the `stream_remote_resources` processor. +In case `path` is specified, it will be used. If not, the `stream_remote_resources` processor will assign a `path` for you with a `csv` extension. + +*Example*: + +```yaml +- run: add_resource + parameters: + url: http://example.com/my-excel-file.xlsx + sheet: 1 + headers: 2 +- run: add_resource + parameters: + url: http://example.com/my-csv-file.csv + encoding: "iso-8859-2" +``` + +### ***`stream_remote_resources`*** + +Converts external resources to streamed resources. + +External resources are ones that link to a remote data source (url or file path), but are not processed by the pipeline and are kept as-is. + +Streamed resources are ones that can be processed by the pipeline, and their output is saved as part of the resulting datapackage. + +In case a resource has no schema, a default one is generated automatically here by creating a `string` field from each column in the data source. + +_Parameters_: + +- `resources` - Which resources to stream. Can be: + + - List of strings, interpreted as resource names to stream + - String, interpreted as a regular expression to be used to match resource names + + If omitted, all resources in datapackage are streamed. + +- `ignore-missing` - if true, then missing resources won't raise an error but will be treated as 'empty' (i.e. with zero rows). + Resources with empty URLs will be treated the same (i.e. will generate an 'empty' resource). + +- `limit-rows` - if provided, will limit the number of rows fetched from the source. Takes an integer value which specifies how many rows of the source to stream. + +*Example*: + +```yaml +- run: stream_remote_resources + parameters: + resources: ['2014-data', '2015-data'] +- run: stream_remote_resources + parameters: + resources: '201[67]-data' +``` + +This processor also supports loading plain-text resources (e.g. html pages) and handling them as tabular data - split into rows with a single "data" column. +To enable this behavior, add the following attribute to the resource: `"format": "txt"`. + +### ***`dump.to_sql`*** + +Alias for `dump_to_sql`, is kept for backward compatibility reasons. + ### ***`dump.to_path`*** Saves the datapackage to a filesystem path. @@ -1087,12 +1183,12 @@ Saves the datapackage to a zipped archive. _Parameters_: - `out-file` - Name of the output file where the zipped data will be stored -- `force-format` and `format` - Same as in `dump.to_path` -- `handle-non-tabular` - Same as in `dump.to_path` -- `add-filehash-to-path` - Same as in `dump.to_path` -- `counters` - Same as in `dump.to_path` -- `pretty-descriptor` - Same as in `dump.to_path` -- `file-formatters` - Same as in `dump.to_path` +- `force-format` and `format` - Same as in `dump_to_path` +- `handle-non-tabular` - Same as in `dump_to_path` +- `add-filehash-to-path` - Same as in `dump_to_path` +- `counters` - Same as in `dump_to_path` +- `pretty-descriptor` - Same as in `dump_to_path` +- `file-formatters` - Same as in `dump_to_path` #### *Note* @@ -1108,8 +1204,6 @@ For that you might need to write your own processor - here's how it's done. There are two APIs for writing processors - the high level API and the low level API. -**Important**: due to the way that pipeline execution is implemented, you **cannot** `print` from within a processor. In case you need to debug, _only_ use the `logging` module to print out anything you need. - ### High Level Processor API The high-level API is quite useful for most processor kinds: @@ -1211,32 +1305,35 @@ In some cases, the high-level API might be too restricting. In these cases you s ```python from datapackage_pipelines.wrapper import ingest, spew -parameters, datapackage, resource_iterator = ingest() +if __name__ == '__main__': + with ingest() as ctx: -# Initialisation code, if needed + # Initialisation code, if needed -# Do stuff with datapackage -# ... + # Do stuff with datapackage + # ... -stats = {} + stats = {} -# and resources: -def new_resource_iterator(resource_iterator_): - def resource_processor(resource_): - # resource_.spec is the resource descriptor - for row in resource_: - # Do something with row - # Perhaps collect some stats here as well - yield row - for resource in resource_iterator_: - yield resource_processor(resource) + # and resources: + def new_resource_iterator(resource_iterator_): + def resource_processor(resource_): + # resource_.spec is the resource descriptor + for row in resource_: + # Do something with row + # Perhaps collect some stats here as well + yield row + for resource in resource_iterator_: + yield resource_processor(resource) -spew(datapackage, new_resource_iterator(resource_iterator), stats) + spew(ctx.datapackage, + new_resource_iterator(ctx.resource_iterator), + ctx.stats) ``` The above code snippet shows the structure of most low-level processors. -We always start with calling `ingest()` - this method gives us the execution parameters, the data-package descriptor (as outputed from the previous step) and an iterator on all streamed resources' rows. +We always start with calling `ingest()` - this method gives us the context, containing the execution parameters, the data-package descriptor (as outputed from the previous step) and an iterator on all streamed resources' rows. We finish the processing by calling `spew()`, which sends the processed data to the next processor in the pipeline. `spew` receives: * A modified data-package descriptor; @@ -1265,9 +1362,10 @@ We'll start with the same processors from above, now implemented with the low le # Add license information from datapackage_pipelines.wrapper import ingest, spew -_, datapackage, resource_iterator = ingest() -datapackage['license'] = 'MIT' -spew(datapackage, resource_iterator) +if __name__ == '__main__': + with ingest() as ctx: + ctx.datapackage['license'] = 'MIT' + spew(ctx.datapackage, ctx.resource_iterator) ``` ```python @@ -1431,7 +1529,7 @@ class Generator(GeneratorBase): ('metadata', { 'name': dataset_name }), - ('dump.to_zip', { + ('dump_to_zip', { 'out-file': 'ckan-datapackage.zip' })]) pipeline_details = { @@ -1451,7 +1549,7 @@ data-kind: package-list Then when running `dpp` we will see an available pipeline named `./example-com-list-of-packages` -This pipeline would internally be composed of 3 steps: `ckan.scraper`, `metadata` and `dump.to_zip`. +This pipeline would internally be composed of 3 steps: `ckan.scraper`, `metadata` and `dump_to_zip`. #### Validating Source Descriptors diff --git a/datapackage_pipelines/VERSION b/datapackage_pipelines/VERSION index f8a696c8..227cea21 100644 --- a/datapackage_pipelines/VERSION +++ b/datapackage_pipelines/VERSION @@ -1 +1 @@ -1.7.2 +2.0.0 diff --git a/datapackage_pipelines/lib/add_computed_field.py b/datapackage_pipelines/lib/add_computed_field.py index 98990a5d..1f6915af 100644 --- a/datapackage_pipelines/lib/add_computed_field.py +++ b/datapackage_pipelines/lib/add_computed_field.py @@ -1,81 +1,17 @@ -import functools -import collections +from dataflows import Flow, add_computed_field +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher -parameters, datapackage, resource_iterator = ingest() +def flow(parameters): + return Flow( + add_computed_field( + parameters.get('fields', []), + parameters.get('resources') + ), + ) -resources = ResourceMatcher(parameters.get('resources')) -fields = parameters.get('fields', []) - -def modify_datapackage(datapackage_): - dp_resources = datapackage_.get('resources', []) - for resource_ in dp_resources: - if resources.match(resource_['name']): - new_fields = [ - { - 'name': f['target'], - 'type': get_type(resource_['schema']['fields'], - f.get('source', []), - f['operation']) - } for f in fields - ] - resource_['schema']['fields'] += new_fields - return datapackage_ - - -def get_type(res_fields, operation_fields, operation): - types = [f.get('type') for f in res_fields if f['name'] in operation_fields] - if 'any' in types: - return 'any' - if (operation == 'format') or (operation == 'join'): - return 'string' - if ('number' in types) or (operation == 'avg'): - return 'number' - # integers - if len(types): - return types[0] - # constant - return 'any' - - -def process_resource(rows): - for row in rows: - for field in fields: - values = [ - row.get(c) for c in field.get('source', []) if row.get(c) is not None - ] - with_ = field.get('with', '') - new_col = AGGREGATORS[field['operation']].func(values, with_, row) - row[field['target']] = new_col - yield row - - -def process_resources(resource_iterator_): - for resource in resource_iterator_: - spec = resource.spec - if not resources.match(spec['name']): - yield resource - else: - yield process_resource(resource) - - -Aggregator = collections.namedtuple('Aggregator', ['func']) - -AGGREGATORS = { - 'sum': Aggregator(lambda values, fstr, row: sum(values)), - 'avg': Aggregator(lambda values, fstr, row: sum(values) / len(values)), - 'max': Aggregator(lambda values, fstr, row: max(values)), - 'min': Aggregator(lambda values, fstr, row: min(values)), - 'multiply': Aggregator( - lambda values, fstr, row: functools.reduce(lambda x, y: x*y, values)), - 'constant': Aggregator(lambda values, fstr, row: fstr), - 'join': Aggregator( - lambda values, fstr, row: fstr.join([str(x) for x in values])), - 'format': Aggregator(lambda values, fstr, row: fstr.format(**row)), -} - - -spew(modify_datapackage(datapackage), process_resources(resource_iterator)) +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/add_metadata.py b/datapackage_pipelines/lib/add_metadata.py index 2ce3c186..fd18cafc 100644 --- a/datapackage_pipelines/lib/add_metadata.py +++ b/datapackage_pipelines/lib/add_metadata.py @@ -1,9 +1,15 @@ -from datapackage_pipelines.wrapper import ingest, spew +import warnings -parameters, datapackage, res_iter = ingest() -if datapackage is None: - datapackage = parameters -else: - datapackage.update(parameters) +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -spew(datapackage, res_iter) +from datapackage_pipelines.lib.update_package import flow + + +if __name__ == '__main__': + warnings.warn( + 'add_metadata will be removed in the future, use "update_package" instead', + DeprecationWarning + ) + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/concatenate.py b/datapackage_pipelines/lib/concatenate.py index 8d124713..796d9aad 100644 --- a/datapackage_pipelines/lib/concatenate.py +++ b/datapackage_pipelines/lib/concatenate.py @@ -1,124 +1,25 @@ -import itertools - -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher +from dataflows import Flow, concatenate, update_resource +from datapackage_pipelines.wrapper import ingest from datapackage_pipelines.utilities.resources import PROP_STREAMING - -parameters, datapackage, resource_iterator = ingest() - -sources = ResourceMatcher(parameters.get('sources')) - -target = parameters.get('target', {}) -if 'name' not in target: - target['name'] = 'concat' -if 'path' not in target: - target['path'] = 'data/' + target['name'] + '.csv' -target.update(dict( - mediatype='text/csv', - schema=dict(fields=[], primaryKey=[]), -)) -target[PROP_STREAMING] = True - -fields = parameters['fields'] - -# Create mapping between source field names to target field names -field_mapping = {} -for target_field, source_fields in fields.items(): - if source_fields is not None: - for source_field in source_fields: - if source_field in field_mapping: - raise RuntimeError('Duplicate appearance of %s (%r)' % (source_field, field_mapping)) - field_mapping[source_field] = target_field - - if target_field in field_mapping: - raise RuntimeError('Duplicate appearance of %s' % target_field) - - field_mapping[target_field] = target_field - -# Create the schema for the target resource -needed_fields = sorted(fields.keys()) -for resource in datapackage['resources']: - if not sources.match(resource['name']): - continue - - schema = resource.get('schema', {}) - pk = schema.get('primaryKey', []) - for field in schema.get('fields', []): - orig_name = field['name'] - if orig_name in field_mapping: - name = field_mapping[orig_name] - if name not in needed_fields: - continue - if orig_name in pk: - target['schema']['primaryKey'].append(name) - target['schema']['fields'].append(field) - field['name'] = name - needed_fields.remove(name) - -if len(target['schema']['primaryKey']) == 0: - del target['schema']['primaryKey'] - -for name in needed_fields: - target['schema']['fields'].append(dict( - name=name, type='string' - )) - -# Update resources in datapackage (make sure they are consecutive) -prefix = True -suffix = False -num_concatenated = 0 -new_resources = [] -for resource in datapackage['resources']: - name = resource['name'] - match = sources.match(name) - if prefix: - if match: - prefix = False - num_concatenated += 1 - else: - new_resources.append(resource) - elif suffix: - assert not match - new_resources.append(resource) - else: - if not match: - suffix = True - new_resources.append(target) - new_resources.append(resource) - else: - num_concatenated += 1 -if not suffix: - new_resources.append(target) - - -datapackage['resources'] = new_resources - -all_target_fields = set(fields.keys()) - - -def concatenator(resources): - for resource_ in resources: - for row in resource_: - processed = dict((k, '') for k in all_target_fields) - values = [(field_mapping[k], v) for (k, v) - in row.items() - if k in field_mapping] - assert len(values) > 0 - processed.update(dict(values)) - yield processed - - -def new_resource_iterator(resource_iterator_): - while True: - resource_ = next(resource_iterator_) - if sources.match(resource_.spec['name']): - resource_chain = \ - itertools.chain([resource_], - itertools.islice(resource_iterator_, - num_concatenated-1)) - yield concatenator(resource_chain) - else: - yield resource_ - - -spew(datapackage, new_resource_iterator(resource_iterator)) +from datapackage_pipelines.utilities.flow_utils import spew_flow + + +def flow(parameters): + return Flow( + concatenate( + parameters.get('fields', {}), + parameters.get('target', {}), + parameters.get('sources') + ), + update_resource( + parameters.get('target', {}).get('name', 'concat'), + **{ + PROP_STREAMING: True + } + ) + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/delete_fields.py b/datapackage_pipelines/lib/delete_fields.py index 32ee60e3..4be25282 100644 --- a/datapackage_pipelines/lib/delete_fields.py +++ b/datapackage_pipelines/lib/delete_fields.py @@ -1,43 +1,17 @@ -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher +from dataflows import Flow, delete_fields +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -parameters, datapackage, resource_iterator = ingest() -resources = ResourceMatcher(parameters.get('resources')) -fields = parameters.get('fields', []) +def flow(parameters): + return Flow( + delete_fields( + parameters.get('fields', []), + parameters.get('resources') + ) + ) -def delete_fields(datapackage_): - dp_resources = datapackage_.get('resources', []) - for resource_ in dp_resources: - if resources.match(resource_['name']): - dp_fields = resource_['schema'].get('fields', []) - field_names = [f['name'] for f in dp_fields] - non_existings = [f for f in fields if f not in field_names] - - assert len(non_existings) == 0, \ - "Can't find following field(s): %s" % '; '.join(non_existings) - - new_fields = list( - filter(lambda x: x['name'] not in fields, dp_fields)) - resource_['schema']['fields'] = new_fields - return datapackage_ - - -def process_resource(rows): - for row in rows: - for field in fields: - del row[field] - yield row - - -def process_resources(resource_iterator_): - for resource in resource_iterator_: - spec = resource.spec - if not resources.match(spec['name']): - yield resource - else: - yield process_resource(resource) - - -spew(delete_fields(datapackage), process_resources(resource_iterator)) +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/dump/to_path.py b/datapackage_pipelines/lib/dump/to_path.py index eddd5b18..784d6a27 100644 --- a/datapackage_pipelines/lib/dump/to_path.py +++ b/datapackage_pipelines/lib/dump/to_path.py @@ -1,5 +1,6 @@ import os import shutil +import warnings from datapackage_pipelines.lib.dump.dumper_base import FileDumper @@ -29,4 +30,8 @@ def __makedirs(path): if __name__ == '__main__': + warnings.warn( + 'dump.to_path will be removed in the future, use "dump_to_path" instead', + DeprecationWarning + ) PathDumper()() diff --git a/datapackage_pipelines/lib/dump/to_sql.py b/datapackage_pipelines/lib/dump/to_sql.py index 1ecf7f8b..fb36f488 100644 --- a/datapackage_pipelines/lib/dump/to_sql.py +++ b/datapackage_pipelines/lib/dump/to_sql.py @@ -1,144 +1,15 @@ -import datetime -import decimal -import os -import logging +import warnings -import copy -from datapackage_pipelines.utilities.extended_json import json -from tableschema_sql import Storage -from sqlalchemy import create_engine -from sqlalchemy.exc import OperationalError +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -from datapackage_pipelines.lib.dump.dumper_base import DumperBase -from tableschema.exceptions import ValidationError +from datapackage_pipelines.lib.dump_to_sql import flow -def jsonize(obj): - return json.dumps(obj) - - -def strize(obj): - if isinstance(obj, dict): - return dict( - (k, strize(v)) - for k, v in obj.items() - ) - elif isinstance(obj, (str, int, float, bool)): - return obj - elif isinstance(obj, datetime.date): - return obj.isoformat() - elif isinstance(obj, decimal.Decimal): - return float(obj) - elif isinstance(obj, (list, set)): - return [strize(x) for x in obj] - elif obj is None: - return None - assert False, "Don't know how to handle object %r" % obj - - -OBJECT_FIXERS = { - 'sqlite': [strize, jsonize], - 'postgresql': [strize] -} - - -class SQLDumper(DumperBase): - - def initialize(self, parameters): - super(SQLDumper, self).initialize(parameters) - table_to_resource = parameters['tables'] - engine = parameters.get('engine', 'env://DPP_DB_ENGINE') - if engine.startswith('env://'): - env_var = engine[6:] - engine = os.environ.get(env_var) - assert engine is not None, \ - "Couldn't connect to DB - " \ - "Please set your '%s' environment variable" % env_var - self.engine = create_engine(engine) - try: - self.engine.connect() - except OperationalError: - logging.exception('Failed to connect to database %s', engine) - raise - - for k, v in table_to_resource.items(): - v['table-name'] = k - - self.converted_resources = \ - dict((v['resource-name'], v) for v in table_to_resource.values()) - - self.updated_column = parameters.get("updated_column") - self.updated_id_column = parameters.get("updated_id_column") - - def handle_resource(self, resource, spec, parameters, datapackage): - resource_name = spec['name'] - if resource_name not in self.converted_resources: - return resource - else: - converted_resource = self.converted_resources[resource_name] - mode = converted_resource.get('mode', 'rewrite') - table_name = converted_resource['table-name'] - storage = Storage(self.engine, prefix=table_name) - if mode == 'rewrite' and '' in storage.buckets: - storage.delete('') - schema = self.normalise_schema_for_engine(self.engine.dialect.name, - spec['schema']) - if '' not in storage.buckets: - logging.info('Creating DB table %s', table_name) - try: - storage.create('', schema) - except ValidationError as e: - logging.error('Error validating schema %r', spec['schema']) - for err in e.errors: - logging.error('Error validating schema: %s', err) - raise - else: - storage.describe('', schema) - - update_keys = None - if mode == 'update': - update_keys = converted_resource.get('update_keys') - if update_keys is None: - update_keys = spec['schema'].get('primaryKey', []) - logging.info('Writing to DB %s -> %s (mode=%s, keys=%s)', - resource_name, table_name, mode, update_keys) - return map(self.get_output_row, - storage.write( - '', - self.normalise_for_engine(self.engine.dialect.name, - resource, spec), - keyed=True, as_generator=True, - update_keys=update_keys - )) - - def get_output_row(self, written): - row, updated, updated_id = written.row, written.updated, written.updated_id - if self.updated_column: - row[self.updated_column] = updated - if self.updated_id_column: - row[self.updated_id_column] = updated_id - return row - - def normalise_schema_for_engine(self, dialect, schema): - schema = copy.deepcopy(schema) - for field in schema['fields']: - if dialect == 'sqlite' and field['type'] in ['object', 'array']: - field['type'] = 'string' - return schema - - def normalise_for_engine(self, dialect, resource, spec): - actions = {} - for field in spec['schema']['fields']: - if field['type'] in ['array', 'object']: - assert dialect in OBJECT_FIXERS, "Don't know how to handle %r connection dialect" % dialect - actions.setdefault(field['name'], []).extend(OBJECT_FIXERS[dialect]) - - for row in resource: - for name, action_list in actions.items(): - for action in action_list: - row[name] = action(row.get(name)) - - yield row - - -SQLDumper()() +if __name__ == '__main__': + warnings.warn( + 'dump.to_sql will be removed in the future, use "dump_to_sql" instead', + DeprecationWarning + ) + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/dump/to_zip.py b/datapackage_pipelines/lib/dump/to_zip.py index a10a8ae0..99b43849 100644 --- a/datapackage_pipelines/lib/dump/to_zip.py +++ b/datapackage_pipelines/lib/dump/to_zip.py @@ -1,3 +1,4 @@ +import warnings import zipfile from datapackage_pipelines.lib.dump.dumper_base import FileDumper @@ -19,4 +20,9 @@ def finalize(self): super(ZipDumper, self).finalize() -ZipDumper()() +if __name__ == '__main__': + warnings.warn( + 'dump.to_zip will be removed in the future, use "dump_to_zip" instead', + DeprecationWarning + ) + ZipDumper()() diff --git a/datapackage_pipelines/lib/dump_to_path.py b/datapackage_pipelines/lib/dump_to_path.py new file mode 100644 index 00000000..ee33d13b --- /dev/null +++ b/datapackage_pipelines/lib/dump_to_path.py @@ -0,0 +1,23 @@ +import os + +from dataflows import Flow, dump_to_path +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow + +from datapackage_pipelines.utilities.stat_utils import STATS_DPP_KEY, STATS_OUT_DP_URL_KEY + + +def flow(parameters: dict, stats: dict): + out_path = parameters.pop('out-path', '.') + stats.setdefault(STATS_DPP_KEY, {})[STATS_OUT_DP_URL_KEY] = os.path.join(out_path, 'datapackage.json') + return Flow( + dump_to_path( + out_path, + **parameters + ) + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters, ctx.stats), ctx) diff --git a/datapackage_pipelines/lib/dump_to_sql.py b/datapackage_pipelines/lib/dump_to_sql.py new file mode 100644 index 00000000..57ab6dce --- /dev/null +++ b/datapackage_pipelines/lib/dump_to_sql.py @@ -0,0 +1,19 @@ +from dataflows import Flow, dump_to_sql +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow + + +def flow(parameters): + return Flow( + dump_to_sql( + parameters['tables'], + engine=parameters.get('engine', 'env://DPP_DB_ENGINE'), + updated_column=parameters.get("updated_column"), + updated_id_column=parameters.get("updated_id_column") + ) + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/dump_to_zip.py b/datapackage_pipelines/lib/dump_to_zip.py new file mode 100644 index 00000000..01b15166 --- /dev/null +++ b/datapackage_pipelines/lib/dump_to_zip.py @@ -0,0 +1,18 @@ +from dataflows import Flow, dump_to_zip +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow + + +def flow(parameters: dict): + out_file = parameters.pop('out-file') + return Flow( + dump_to_zip( + out_file, + **parameters + ) + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/duplicate.py b/datapackage_pipelines/lib/duplicate.py index fe2d89ed..332f66f4 100644 --- a/datapackage_pipelines/lib/duplicate.py +++ b/datapackage_pipelines/lib/duplicate.py @@ -1,52 +1,20 @@ -import copy +from dataflows import Flow, duplicate +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow, load_lazy_json -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.kvstore import DB - -def saver(resource, db): - for idx, row in enumerate(resource): - key = "{:08x}".format(idx) - db.set(key, row) - yield row - - -def loader(db): - for k, value in db.items(): - yield value - - -def process_resources(resource_iterator, source): - for resource in resource_iterator: - if resource.spec['name'] == source: - db = DB() - yield saver(resource, db) - yield loader(db) - else: - yield resource - - -def process_datapackage(dp, source, target_name, target_path): - - def traverse_resources(resources): - for res in resources: - yield res - if res['name'] == source: - res = copy.deepcopy(res) - res['name'] = target_name - res['path'] = target_path - yield res - - dp['resources'] = list(traverse_resources(dp['resources'])) - return dp +def flow(parameters): + return Flow( + load_lazy_json(parameters.get('source')), + duplicate( + parameters.get('source'), + parameters.get('target-name'), + parameters.get('target-path'), + parameters.get('batch_size', 1000) + ) + ) if __name__ == '__main__': - parameters, datapackage, resource_iterator = ingest() - - source = parameters['source'] - target_name = parameters['target-name'] - target_path = parameters['target-path'] - - spew(process_datapackage(datapackage, source, target_name, target_path), - process_resources(resource_iterator, source)) + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/filter.py b/datapackage_pipelines/lib/filter.py index 5c502028..41cdd73b 100644 --- a/datapackage_pipelines/lib/filter.py +++ b/datapackage_pipelines/lib/filter.py @@ -1,38 +1,18 @@ -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher +from dataflows import Flow, filter_rows +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -import operator -parameters, datapackage, resource_iterator = ingest() +def flow(parameters): + return Flow( + filter_rows( + equals=parameters.get('in', []), + not_equals=parameters.get('out', []), + resources=parameters.get('resources'), + ) + ) -resources = ResourceMatcher(parameters.get('resources')) -equals = parameters.get('in', []) -not_equals = parameters.get('out', []) -conditions = [ - (operator.eq, k, v) - for o in equals - for k, v in o.items() -] + [ - (operator.ne, k, v) - for o in not_equals - for k, v in o.items() -] - - -def process_resource(rows): - for row in rows: - if any(func(row[k], v) for func, k, v in conditions): - yield row - - -def process_resources(resource_iterator_): - for resource in resource_iterator_: - spec = resource.spec - if not resources.match(spec['name']): - yield resource - else: - yield process_resource(resource) - - -spew(datapackage, process_resources(resource_iterator)) +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/find_replace.py b/datapackage_pipelines/lib/find_replace.py index 148911cb..e427fe65 100644 --- a/datapackage_pipelines/lib/find_replace.py +++ b/datapackage_pipelines/lib/find_replace.py @@ -1,32 +1,17 @@ -import re +from dataflows import Flow, find_replace +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher -parameters, datapackage, resource_iterator = ingest() +def flow(parameters): + return Flow( + find_replace( + parameters.get('fields', []), + resources=parameters.get('resources') + ) + ) -resources = ResourceMatcher(parameters.get('resources')) -fields = parameters.get('fields', []) - -def process_resource(rows): - for row in rows: - for field in fields: - for pattern in field.get('patterns', []): - row[field['name']] = re.sub( - str(pattern['find']), - str(pattern['replace']), - str(row[field['name']])) - yield row - - -def process_resources(resource_iterator_): - for resource in resource_iterator_: - spec = resource.spec - if not resources.match(spec['name']): - yield resource - else: - yield process_resource(resource) - - -spew(datapackage, process_resources(resource_iterator)) +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/flow.py b/datapackage_pipelines/lib/flow.py index 13ebe807..ec6f4ac8 100644 --- a/datapackage_pipelines/lib/flow.py +++ b/datapackage_pipelines/lib/flow.py @@ -1,38 +1,15 @@ -import logging import sys -from contextlib import redirect_stderr, redirect_stdout from importlib import import_module -from datapackage_pipelines.wrapper import ingest, spew +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -class StdoutWriter: +with ingest() as ctx: + parameters, datapackage, resources = ctx + stats = {} - def write(self, message): - message = message.strip() - if message: - logging.info(message) + sys.path.append(parameters.pop('__path')) + flow_module = import_module(parameters.pop('__flow')) + flow = flow_module.flow(parameters, datapackage, resources, ctx.stats) - def flush(self): - pass - - -class StderrWriter: - - def write(self, message): - message = message.strip() - if (message): - logging.error(message) - - def flush(self): - pass - - -parameters, datapackage, resources, stats = ingest() + ({},) - -with redirect_stderr(StderrWriter()): - with redirect_stdout(StdoutWriter()): - sys.path.append(parameters.pop('__path')) - flow_module = import_module(parameters.pop('__flow')) - datastream = flow_module.flow(parameters, datapackage, resources, stats).datastream() - -spew(datastream.dp.descriptor, datastream.res_iter, stats) + spew_flow(flow, ctx) diff --git a/datapackage_pipelines/lib/join.py b/datapackage_pipelines/lib/join.py index acd5f712..561fc19a 100644 --- a/datapackage_pipelines/lib/join.py +++ b/datapackage_pipelines/lib/join.py @@ -1,297 +1,32 @@ -import copy -import os -import collections -import logging - -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.kvstore import KVStore +from dataflows import Flow, join, update_resource +from datapackage_pipelines.wrapper import ingest from datapackage_pipelines.utilities.resources import PROP_STREAMING - -db = KVStore() - - -class KeyCalc(object): - - def __init__(self, key_spec): - if isinstance(key_spec, list): - key_spec = ':'.join('{%s}' % key for key in key_spec) - self.key_spec = key_spec - - def __call__(self, row): - return self.key_spec.format(**row) - - -def identity(x): - return x - - -def median(values): - if values is None: - return None - ll = len(values) - mid = int(ll/2) - values = sorted(values) - if ll % 2 == 0: - return (values[mid - 1] + values[mid])/2 - else: - return values[mid] - - -def update_counter(curr, new): - if new is None: - return curr - if curr is None: - curr = collections.Counter() - if isinstance(new, str): - new = [new] - curr.update(new) - return curr - - -Aggregator = collections.namedtuple('Aggregator', - ['func', 'finaliser', 'dataType', 'copyProperties']) -AGGREGATORS = { - 'sum': Aggregator(lambda curr, new: - new + curr if curr is not None else new, - identity, - None, - False), - 'avg': Aggregator(lambda curr, new: - (curr[0] + 1, new + curr[1]) - if curr is not None - else (1, new), - lambda value: value[1] / value[0], - None, - False), - 'median': Aggregator(lambda curr, new: - curr + [new] if curr is not None else [new], - median, - None, - True), - 'max': Aggregator(lambda curr, new: - max(new, curr) if curr is not None else new, - identity, - None, - False), - 'min': Aggregator(lambda curr, new: - min(new, curr) if curr is not None else new, - identity, - None, - False), - 'first': Aggregator(lambda curr, new: - curr if curr is not None else new, - identity, - None, - True), - 'last': Aggregator(lambda curr, new: new, - identity, - None, - True), - 'count': Aggregator(lambda curr, new: - curr+1 if curr is not None else 1, - identity, - 'integer', - False), - 'any': Aggregator(lambda curr, new: new, - identity, - None, - True), - 'set': Aggregator(lambda curr, new: - curr.union({new}) if curr is not None else {new}, - lambda value: list(value) if value is not None else [], - 'array', - False), - 'array': Aggregator(lambda curr, new: - curr + [new] if curr is not None else [new], - lambda value: value if value is not None else [], - 'array', - False), - 'counters': Aggregator(lambda curr, new: - update_counter(curr, new), - lambda value: - list(collections.Counter(value).most_common()) if value is not None else [], - 'array', - False), -} - -parameters, datapackage, resource_iterator = ingest() - -source = parameters['source'] -source_name = source['name'] -source_key = KeyCalc(source['key']) -source_delete = source.get('delete', False) - -target = parameters['target'] -target_name = target['name'] -if target['key'] is not None: - target_key = KeyCalc(target['key']) -else: - target_key = None -deduplication = target_key is None - - -def fix_fields(fields_): - for field in sorted(fields_.keys()): - spec = fields_[field] - if spec is None: - fields_[field] = spec = {} - if 'name' not in spec: - spec['name'] = field - if 'aggregate' not in spec: - spec['aggregate'] = 'any' - return fields_ - - -fields = fix_fields(parameters['fields']) -full = parameters.get('full', True) - - -def indexer(resource): - for row in resource: - key = source_key(row) - try: - current = db[key] - except KeyError: - current = {} - for field, spec in fields.items(): - name = spec['name'] - curr = current.get(field) - agg = spec['aggregate'] - if agg != 'count': - new = row.get(name) - else: - new = '' - if new is not None: - current[field] = AGGREGATORS[agg].func(curr, new) - elif field not in current: - current[field] = None - db[key] = current - yield row - - -def process_target(resource): - if deduplication: - # just empty the iterable - collections.deque(indexer(resource), maxlen=0) - for key in db.keys(): - row = dict( - (f, None) for f in fields.keys() - ) - row.update(dict( - (k, AGGREGATORS[fields[k]['aggregate']].finaliser(v)) - for k, v in db.db.get(key).items() - )) - yield row - else: - for row in resource: - key = target_key(row) - try: - extra = db[key] - extra = dict( - (k, AGGREGATORS[fields[k]['aggregate']].finaliser(v)) - for k, v in extra.items() - ) - except KeyError: - if not full: - continue - extra = dict( - (k, row.get(k)) - for k in fields.keys() - ) - row.update(extra) - yield row - - -def new_resource_iterator(resource_iterator_): - has_index = False - for resource in resource_iterator_: - name = resource.spec['name'] - if name == source_name: - has_index = True - if source_delete: - # just empty the iterable - collections.deque(indexer(resource), maxlen=0) - else: - yield indexer(resource) - if deduplication: - yield process_target(resource) - elif name == target_name: - assert has_index - yield process_target(resource) - else: - yield resource - - -def process_target_resource(source_spec, resource): - target_fields = \ - resource.setdefault('schema', {}).setdefault('fields', []) - added_fields = sorted(fields.keys()) - for field in added_fields: - spec = fields[field] - agg = spec['aggregate'] - data_type = AGGREGATORS[agg].dataType - copy_properties = AGGREGATORS[agg].copyProperties - to_copy = {} - if data_type is None: - try: - source_field = \ - next(filter(lambda f, spec_=spec: - f['name'] == spec_['name'], - source_spec['schema']['fields'])) - except StopIteration: - logging.error('Failed to find field with name %s in resource %s', spec['name'], source_spec['name']) - raise - if copy_properties: - to_copy = copy.deepcopy(source_field) - data_type = source_field['type'] - try: - existing_field = next(iter(filter( - lambda f: f['name'] == field, - target_fields))) - assert existing_field['type'] == data_type, \ - 'Reusing %s but with different data types: %s != %s' % (field, existing_field['type'], data_type) - except StopIteration: - to_copy.update({ - 'name': field, - 'type': data_type - }) - target_fields.append(to_copy) - return resource - - -def process_datapackage(datapackage_): - - new_resources = [] - source_spec = None - - for resource in datapackage_['resources']: - - if resource['name'] == source_name: - source_spec = resource - if not source_delete: - new_resources.append(resource) - if deduplication: - resource = process_target_resource( - source_spec, - { - 'name': target_name, - PROP_STREAMING: True, - 'path': os.path.join('data', target_name + '.csv') - }) - new_resources.append(resource) - - elif resource['name'] == target_name: - assert isinstance(source_spec, dict), \ - "Source resource must appear before target resource" - resource = process_target_resource(source_spec, resource) - new_resources.append(resource) - - else: - new_resources.append(resource) - - datapackage_['resources'] = new_resources - return datapackage_ - - -spew(process_datapackage(datapackage), - new_resource_iterator(resource_iterator)) +from datapackage_pipelines.utilities.flow_utils import spew_flow, load_lazy_json + + +def flow(parameters): + source = parameters['source'] + target = parameters['target'] + return Flow( + load_lazy_json(source['name']), + join( + source['name'], + source['key'], + target['name'], + target['key'], + parameters['fields'], + parameters.get('full', True), + source.get('delete', False) + ), + update_resource( + target['name'], + **{ + PROP_STREAMING: True + } + ) + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/load.py b/datapackage_pipelines/lib/load.py new file mode 100644 index 00000000..88755b97 --- /dev/null +++ b/datapackage_pipelines/lib/load.py @@ -0,0 +1,38 @@ +from dataflows import Flow, load +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow +from datapackage_pipelines.utilities.resources import PROP_STREAMING, PROP_STREAMED_FROM + + +def flow(parameters): + _from = parameters.pop('from') + + num_resources = 0 + + def count_resources(): + def func(package): + global num_resources + num_resources = len(package.pkg.resources) + yield package.pkg + yield from package + return func + + def mark_streaming(_from): + def func(package): + for i in range(num_resources, len(package.pkg.resources)): + package.pkg.descriptor['resources'][i][PROP_STREAMING] = True + package.pkg.descriptor['resources'][i][PROP_STREAMED_FROM] = _from + yield package.pkg + yield from package + return func + + return Flow( + count_resources(), + load(_from, **parameters), + mark_streaming(_from), + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/load_resource.py b/datapackage_pipelines/lib/load_resource.py index 9b49f560..bb576b99 100644 --- a/datapackage_pipelines/lib/load_resource.py +++ b/datapackage_pipelines/lib/load_resource.py @@ -4,8 +4,9 @@ import datapackage +from dataflows.helpers.resource_matcher import ResourceMatcher + from datapackage_pipelines.wrapper import ingest, spew, get_dependency_datapackage_url -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher from datapackage_pipelines.utilities.resources import tabular, PROP_STREAMING, \ PROP_STREAMED_FROM @@ -42,7 +43,11 @@ def __call__(self): assert resources resource_index = None resource = list(resources.keys()) - name_matcher = ResourceMatcher(resource) if isinstance(resource, (str, list)) else None + name_matcher = ( + ResourceMatcher(resource, self.dp) + if isinstance(resource, (str, list)) + else None + ) selected_resources = [] found = False diff --git a/datapackage_pipelines/lib/printer.py b/datapackage_pipelines/lib/printer.py new file mode 100644 index 00000000..1988398d --- /dev/null +++ b/datapackage_pipelines/lib/printer.py @@ -0,0 +1,14 @@ +from dataflows import Flow, printer +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow + + +def flow(parameters): + return Flow( + printer(), + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/set_types.py b/datapackage_pipelines/lib/set_types.py index ad6eba29..34210ce6 100644 --- a/datapackage_pipelines/lib/set_types.py +++ b/datapackage_pipelines/lib/set_types.py @@ -1,74 +1,26 @@ -import re -import logging - -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher -import tableschema - -parameters, datapackage, resource_iterator = ingest() - -resources = ResourceMatcher(parameters.get('resources')) -types = parameters.get('types', {}) - - -def match_fields(field_name_re, expected): - def filt(field): - return (field_name_re.fullmatch(field['name']) is not None) is expected - return filt - - -def process_datapackage(datapackage_): - for resource in datapackage_['resources']: - name = resource['name'] - if not resources.match(name): - continue - - if 'schema' not in resource: - continue - - fields = resource.setdefault('schema', {}).get('fields', []) - for field_name, field_definition in types.items(): - field_name_re = re.compile(field_name) - if field_definition is not None: - filtered_fields = list( - filter(match_fields(field_name_re, True), fields) - ) - for field in filtered_fields: - field.update(field_definition) - assert len(filtered_fields) > 0, \ - "No field found matching %r" % field_name - else: - fields = list( - filter(match_fields(field_name_re, False), fields) - ) - - resource['schema']['fields'] = fields - - -def process_resource(spec, rows): - schema = spec['schema'] - jts = tableschema.Schema(schema) - field_names = list(map(lambda f: f['name'], schema['fields'])) - for row in rows: - flattened_row = [row.get(name) for name in field_names] - try: - flattened_row = jts.cast_row(flattened_row) - except Exception: - logging.error('Failed to cast row %r', flattened_row) - raise - row = dict(zip(field_names, flattened_row)) - yield row - - -def process_resources(resource_iterator_): - for resource in resource_iterator_: - spec = resource.spec - if not resources.match(spec['name']): - yield resource - else: - yield process_resource(spec, resource) - - -process_datapackage(datapackage) - -spew(datapackage, process_resources(resource_iterator)) +from dataflows import Flow, set_type, validate, delete_fields +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow + + +def flow(parameters): + resources = parameters.get('resources') + if 'types' in parameters: + return Flow( + *[ + set_type(name, resources=resources, **options) + if options is not None else + delete_fields([name], resources=resources) + for name, options in parameters['types'].items() + ] + ) + else: + return Flow( + validate() + ) + + +if __name__ == '__main__': + with ingest() as ctx: + print(flow(ctx.parameters).chain) + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/sort.py b/datapackage_pipelines/lib/sort.py index e8fea101..fd84d3cd 100644 --- a/datapackage_pipelines/lib/sort.py +++ b/datapackage_pipelines/lib/sort.py @@ -1,39 +1,19 @@ -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.kvstore import KVStore -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher +from dataflows import Flow, sort_rows +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow, load_lazy_json -class KeyCalc(object): +def flow(parameters): + return Flow( + load_lazy_json(parameters.get('resources')), + sort_rows( + parameters['sort-by'], + resources=parameters.get('resources'), + reverse=parameters.get('reverse') + ) + ) - def __init__(self, key_spec): - self.key_spec = key_spec - def __call__(self, row): - return self.key_spec.format(**row) - - -parameters, datapackage, resource_iterator = ingest() - -resources = ResourceMatcher(parameters['resources']) -key_calc = KeyCalc(parameters['sort-by']) -reverse = parameters.get('reverse', False) - - -def sorter(resource): - db = KVStore() - for row_num, row in enumerate(resource): - key = key_calc(row) + "{:08x}".format(row_num) - db[key] = row - for key in db.keys(reverse=reverse): - yield db[key] - - -def new_resource_iterator(resource_iterator_): - for resource in resource_iterator_: - if resources.match(resource.spec['name']): - yield sorter(resource) - else: - yield resource - - -spew(datapackage, new_resource_iterator(resource_iterator)) +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/stream_remote_resources.py b/datapackage_pipelines/lib/stream_remote_resources.py index 0d5f60fe..2f228629 100644 --- a/datapackage_pipelines/lib/stream_remote_resources.py +++ b/datapackage_pipelines/lib/stream_remote_resources.py @@ -10,11 +10,12 @@ from tableschema import Schema +from dataflows.helpers.resource_matcher import ResourceMatcher + from datapackage_pipelines.wrapper import ingest, spew from datapackage_pipelines.utilities.resources import streamable, PATH_PLACEHOLDER, get_path, \ PROP_STREAMED_FROM, PROP_STREAMING, streaming from datapackage_pipelines.utilities.extended_json import json -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher from datapackage_pipelines.utilities.tabulator_txt_parser import TXTParser @@ -216,7 +217,7 @@ def opener(): parameters, datapackage, resource_iterator = ingest() -resources = ResourceMatcher(parameters.get('resources')) +resources = ResourceMatcher(parameters.get('resources'), datapackage) ignore_missing = parameters.get('ignore-missing', False) limit_rows = parameters.get('limit-rows', -1) diff --git a/datapackage_pipelines/lib/unpivot.py b/datapackage_pipelines/lib/unpivot.py index 784a8b74..5bba7a47 100644 --- a/datapackage_pipelines/lib/unpivot.py +++ b/datapackage_pipelines/lib/unpivot.py @@ -1,83 +1,19 @@ -import copy -import re +from dataflows import Flow, unpivot +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow -from datapackage_pipelines.wrapper import ingest, spew -from datapackage_pipelines.utilities.resource_matcher import ResourceMatcher -parameters, datapackage, resource_iterator = ingest() +def flow(parameters): + return Flow( + unpivot( + parameters.get('unpivot'), + parameters.get('extraKeyFields'), + parameters.get('extraValueField'), + resources=parameters.get('resources') + ) + ) -resources = ResourceMatcher(parameters.get('resources')) -unpivot_fields = parameters.get('unpivot') -extra_keys = parameters.get('extraKeyFields') -extra_value = parameters.get('extraValueField') - -def match_fields(field_name_re, expected): - def filt(field): - return (field_name_re.fullmatch(field['name']) is not None) is expected - return filt - - -def process_datapackage(datapackage_): - unpivot_fields_without_regex = [] - for resource in datapackage_['resources']: - name = resource['name'] - if not resources.match(name): - continue - - if 'schema' not in resource: - continue - - fields = resource['schema'].get('fields', []) - - for u_field in unpivot_fields: - field_name_re = re.compile(u_field['name']) - fields_to_pivot = (list( - filter(match_fields(field_name_re, True), fields) - )) - fields = list( - filter(match_fields(field_name_re, False), fields) - ) - - # handle with regex - for field_to_pivot in fields_to_pivot: - original_key_values = u_field['keys'] # With regex - new_key_values = {} - for key in original_key_values: - new_val = original_key_values[key] - if isinstance(new_val, str): - new_val = re.sub( - u_field['name'], new_val, field_to_pivot['name']) - new_key_values[key] = new_val - field_to_pivot['keys'] = new_key_values - unpivot_fields_without_regex.append(field_to_pivot) - - fields_to_keep = [f['name'] for f in fields] - fields.extend(extra_keys) - fields.append(extra_value) - resource['schema']['fields'] = fields - return unpivot_fields_without_regex, fields_to_keep - - -def unpivot(rows, fields_to_unpivot_, fields_to_keep_): - for row in rows: - for unpivot_field in fields_to_unpivot_: - new_row = copy.deepcopy(unpivot_field['keys']) - for field in fields_to_keep_: - new_row[field] = row[field] - new_row[extra_value['name']] = row.get(unpivot_field['name']) - yield new_row - - -def process_resources(resource_iterator_, fields_to_unpivot, fields_to_keep): - for resource in resource_iterator_: - spec = resource.spec - if not resources.match(spec['name']): - yield resource - else: - yield unpivot(resource, fields_to_unpivot, fields_to_keep) - - -old_fields, keep_fields = process_datapackage(datapackage) - -spew(datapackage, process_resources(resource_iterator, old_fields, keep_fields)) +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/lib/update_package.py b/datapackage_pipelines/lib/update_package.py new file mode 100644 index 00000000..7f309191 --- /dev/null +++ b/datapackage_pipelines/lib/update_package.py @@ -0,0 +1,14 @@ +from dataflows import Flow, update_package +from datapackage_pipelines.wrapper import ingest +from datapackage_pipelines.utilities.flow_utils import spew_flow + + +def flow(parameters): + return Flow( + update_package(**parameters) + ) + + +if __name__ == '__main__': + with ingest() as ctx: + spew_flow(flow(ctx.parameters), ctx) diff --git a/datapackage_pipelines/utilities/flow_utils.py b/datapackage_pipelines/utilities/flow_utils.py new file mode 100644 index 00000000..6bb5113f --- /dev/null +++ b/datapackage_pipelines/utilities/flow_utils.py @@ -0,0 +1,49 @@ +from dataflows import Flow, load, update_package +from dataflows.helpers.resource_matcher import ResourceMatcher + +from datapackage_pipelines.wrapper import ProcessorContext +from datapackage_pipelines.utilities.extended_json import LazyJsonLine + + +def load_lazy_json(resources): + + def func(package): + matcher = ResourceMatcher(resources, package.pkg) + yield package.pkg + for rows in package: + if matcher.match(rows.res.name): + yield ( + row.inner + if isinstance(row, LazyJsonLine) + else row + for row in rows + ) + else: + yield rows + + return func + + +class MergeableStats(): + def __init__(self, ds_stats, ctx_stats): + self.ds_stats = ds_stats + self.ctx_stats = ctx_stats + + def __iter__(self): + if self.ds_stats is not None: + for x in self.ds_stats: + yield from x.items() + if self.ctx_stats is not None: + yield from self.ctx_stats.items() + + +def spew_flow(flow, ctx: ProcessorContext): + flow = Flow( + update_package(**ctx.datapackage), + load((ctx.datapackage, ctx.resource_iterator)), + flow, + ) + datastream = flow.datastream() + ctx.datapackage = datastream.dp.descriptor + ctx.resource_iterator = datastream.res_iter + ctx.stats = MergeableStats(datastream.stats, ctx.stats) diff --git a/datapackage_pipelines/utilities/kvstore.py b/datapackage_pipelines/utilities/kvstore.py deleted file mode 100644 index 57ed2294..00000000 --- a/datapackage_pipelines/utilities/kvstore.py +++ /dev/null @@ -1,118 +0,0 @@ -import cachetools -import tempfile -import logging - -from .extended_json import json - -try: - import plyvel as DB_ENGINE - logging.info('Using leveldb for joining') - db_kind = 'LevelDB' -except ImportError: - import sqlite3 as DB_ENGINE - logging.info('Using sqlite for joining') - db_kind = 'sqlite' - - -class SqliteDB(object): - - def __init__(self): - self.tmpfile = tempfile.NamedTemporaryFile() - self.db = DB_ENGINE.connect(self.tmpfile.name) - self.cursor = self.db.cursor() - self.cursor.execute('''CREATE TABLE d (key text, value text)''') - self.cursor.execute('''CREATE UNIQUE INDEX i ON d (key)''') - - def get(self, key): - ret = self.cursor.execute('''SELECT value FROM d WHERE key=?''', - (key,)).fetchone() - if ret is None: - raise KeyError() - else: - return json.loads(ret[0]) - - def set(self, key, value): - value = json.dumps(value) - try: - self.get(key) - self.cursor.execute('''UPDATE d SET value=? WHERE key=?''', - (value, key)) - except KeyError: - self.cursor.execute('''INSERT INTO d VALUES (?, ?)''', - (key, value)) - self.db.commit() - - def keys(self, reverse=False): - cursor = self.db.cursor() - direction = 'DESC' if reverse else 'ASC' - keys = cursor.execute('''SELECT key FROM d ORDER BY key ''' + direction) - for key, in keys: - yield key - - def items(self): - cursor = self.db.cursor() - items = cursor.execute('''SELECT key, value FROM d ORDER BY key ASC''') - for key, value in items: - yield key, json.loads(value) - - -class LevelDB(object): - - def __init__(self): - self.tmpdir = tempfile.TemporaryDirectory() - self.db = DB_ENGINE.DB(self.tmpdir.name, create_if_missing=True) - - def get(self, key): - ret = self.db.get(key.encode('utf8')) - if ret is None: - raise KeyError() - else: - return json.loads(ret.decode('utf8')) - - def set(self, key, value): - value = json.dumps(value).encode('utf8') - key = key.encode('utf8') - self.db.put(key, value) - - def keys(self, reverse=False): - for key, value in self.db.iterator(reverse=reverse): - yield key.decode('utf8') - - def items(self): - for key, value in self.db: - yield (key.decode('utf8'), json.loads(value.decode('utf8'))) - - -DB = LevelDB if db_kind == 'LevelDB' else SqliteDB - - -class CachedDB(cachetools.LRUCache): - - def __init__(self): - super(CachedDB, self).__init__(1024, self._dbget) - self.db = DB() - - def popitem(self): - key, value = super(CachedDB, self).popitem() - self._dbset(key, value) - return key, value - - def _dbget(self, key): - value = self.db.get(key) - return value - - def _dbset(self, key, value): - assert value is not None - self.db.set(key, value) - - def sync(self): - for key in iter(self): - value = cachetools.Cache.__getitem__(self, key) - self._dbset(key, value) - - def keys(self, reverse=False): - self.sync() - return self.db.keys(reverse=reverse) - - -KVStore = CachedDB diff --git a/datapackage_pipelines/utilities/lib_test_helpers.py b/datapackage_pipelines/utilities/lib_test_helpers.py index 16d3a77f..5be6426e 100644 --- a/datapackage_pipelines/utilities/lib_test_helpers.py +++ b/datapackage_pipelines/utilities/lib_test_helpers.py @@ -74,7 +74,13 @@ def _run_processor(self, processor, parameters, data_in, env): parameters, 'False', ''], input=data_in, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + print('\nProcessor output:\n') + for line in process.stderr.decode('utf8').split('\n'): + print(f'OUT> {line}') + if process.returncode != 0: + raise Exception(f'processor execution failed with {process.returncode}') return process.stdout.decode('utf8') def _test_single_fixture(self, processor, parameters, data_in, diff --git a/datapackage_pipelines/utilities/resource_matcher.py b/datapackage_pipelines/utilities/resource_matcher.py deleted file mode 100644 index 1b8b139d..00000000 --- a/datapackage_pipelines/utilities/resource_matcher.py +++ /dev/null @@ -1,23 +0,0 @@ -import re - - -class ResourceMatcher(object): - - def __init__(self, resources): - self.resources = resources - if resources is None: - self.resources = None - elif isinstance(self.resources, str): - self.resources = re.compile('^' + self.resources + '$') - self.re = True - else: - assert isinstance(self.resources, list) - self.re = False - - def match(self, name): - if self.resources is None: - return True - if self.re: - return self.resources.match(name) is not None - else: - return name in self.resources diff --git a/datapackage_pipelines/wrapper/__init__.py b/datapackage_pipelines/wrapper/__init__.py index 30075f4e..f1ee3e47 100644 --- a/datapackage_pipelines/wrapper/__init__.py +++ b/datapackage_pipelines/wrapper/__init__.py @@ -1 +1,2 @@ -from .wrapper import ingest, spew, process, get_dependency_datapackage_url +from .wrapper import ingest, spew, process, \ + get_dependency_datapackage_url, ProcessorContext diff --git a/datapackage_pipelines/wrapper/wrapper.py b/datapackage_pipelines/wrapper/wrapper.py index dd344618..b6e9b175 100644 --- a/datapackage_pipelines/wrapper/wrapper.py +++ b/datapackage_pipelines/wrapper/wrapper.py @@ -2,6 +2,7 @@ import sys import os import logging +from contextlib import ExitStack, redirect_stderr, redirect_stdout from tableschema.exceptions import CastError @@ -18,6 +19,7 @@ cache = '' first = True +stdout = sys.stdout dependency_datapackage_urls = {} @@ -26,7 +28,9 @@ def get_dependency_datapackage_url(pipeline_id): return dependency_datapackage_urls.get(pipeline_id) -def ingest(debug=False): +# ## LOW LEVEL INTERFACE + +def _ingest(debug=False): global cache global first params = None @@ -44,7 +48,7 @@ def ingest(debug=False): def spew(dp, resources_iterator, stats=None, finalizer=None): - files = [sys.stdout] + files = [stdout] cache_filename = '' if len(cache) > 0: @@ -111,7 +115,7 @@ def spew(dp, resources_iterator, stats=None, finalizer=None): sys.stderr.flush() sys.exit(1) - sys.stdout.flush() + stdout.flush() if row_count > 0: logging.info('Processed %d rows', row_count) @@ -120,7 +124,7 @@ def spew(dp, resources_iterator, stats=None, finalizer=None): for f in files: f.write('\n') # Signal to other processors that we're done - if f == sys.stdout: + if f == stdout: # Can't close sys.stdout, otherwise any subsequent # call to print() will throw an exception f.flush() @@ -131,6 +135,58 @@ def spew(dp, resources_iterator, stats=None, finalizer=None): os.rename(cache_filename+'.ongoing', cache_filename) +class StdoutWriter: + + def write(self, message): + message = message.strip() + if message: + logging.info(message) + + def flush(self): + pass + + +class StderrWriter: + + def write(self, message): + message = message.strip() + if message: + logging.error(message) + + def flush(self): + pass + + +class ProcessorContext(ExitStack): + + def __init__(self, parameters, datapackage, resource_iterator): + super().__init__() + self.parameters = parameters + self.datapackage = datapackage + self.resource_iterator = resource_iterator + self.stats = {} + + def __iter__(self): + return iter((self.parameters, self.datapackage, self.resource_iterator)) + + def __enter__(self): + super().__enter__() + self.enter_context(redirect_stdout(StdoutWriter())) + self.enter_context(redirect_stderr(StderrWriter())) + return self + + def __exit__(self, *args, **kw): + spew(self.datapackage, self.resource_iterator, stats=self.stats) + super().__exit__(*args, **kw) + + +def ingest(debug=False): + params, datapackage, resource_iterator = _ingest(debug=debug) + return ProcessorContext(params, datapackage, resource_iterator) + + +# ## HIGH LEVEL INTERFACE + def generic_process_resource(rows, spec, resource_index, @@ -162,16 +218,13 @@ def generic_process_resources(resource_iterator, def process(modify_datapackage=None, process_row=None, debug=False): - stats = {} - parameters, datapackage, resource_iterator = ingest(debug=debug) - if modify_datapackage is not None: - datapackage = modify_datapackage(datapackage, parameters, stats) - - if process_row is not None: - new_iter = generic_process_resources(resource_iterator, - parameters, - stats, - process_row) - spew(datapackage, new_iter, stats) - else: - spew(datapackage, resource_iterator, stats) + with ingest(debug=debug) as ctx: + if modify_datapackage is not None: + ctx.datapackage = modify_datapackage(ctx.datapackage, ctx.parameters, ctx.stats) + + if process_row is not None: + ctx.resource_iterator = \ + generic_process_resources(ctx.resource_iterator, + ctx.parameters, + ctx.stats, + process_row) diff --git a/samples/pipeline-spec.yaml b/samples/pipeline-spec.yaml index 00a44752..b79bc0c8 100644 --- a/samples/pipeline-spec.yaml +++ b/samples/pipeline-spec.yaml @@ -3,29 +3,18 @@ worldbank-co2-emissions: crontab: '0 * * * *' pipeline: - - run: add_metadata + run: update_package parameters: name: 'co2-emissions' title: 'CO2 emissions [metric tons per capita]' homepage: 'http://worldbank.org/' - - run: add_resource + run: load parameters: + from: "http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel" name: 'global-data' -# url: /Users/adam/Downloads/API_EN.ATM.CO2E.KT_DS2_en_csv_v2.csv -# headers: 5 headers: 4 - url: "http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel" format: xls - - - run: add_resource - parameters: - name: 'global-data-json' - headers: 4 - url: "http://api.worldbank.org/v2/en/indicator/EN.ATM.CO2E.PC?downloadformat=excel" - format: xls - - - run: stream_remote_resources - run: set_types parameters: @@ -34,49 +23,6 @@ worldbank-co2-emissions: "[12][0-9]{3}": type: number - - run: add_constant - parameters: - column-name: the_constant - value: the value - - - run: dump.to_zip - parameters: - out-file: co2-emisonss-wb.zip - force-format: false - - - run: dump.to_path - parameters: - out-path: co2-emisonss-wb - force-format: false - - - run: dump.to_sql - parameters: - tables: - co2_emisonss_wb: - resource-name: global-data - -worldbank-co2-emissions-isr: - dependencies: - - pipeline: ./worldbank-co2-emissions - pipeline: - - - run: add_metadata - parameters: - name: 'co2-emissions-israel' - title: 'CO2 emissions for Israel' - homepage: 'http://worldbank.org/' - - - run: load_resource - parameters: - url: dependency://./worldbank-co2-emissions - resource: 0 - - - run: filter - parameters: - in: - - {'Country Code': 'ISR'} - - - run: dump.to_path + run: dump_to_zip parameters: - out-path: co2-emisonss-wb-israel - force-format: false + out-file: co2-emissions-wb.zip diff --git a/setup.py b/setup.py index cc9a1b7c..b7ba07a3 100644 --- a/setup.py +++ b/setup.py @@ -23,15 +23,14 @@ def read(*paths): INSTALL_REQUIRES = [ 'celery', 'requests', - 'datapackage>=1.2.2', - 'tableschema>=1.0.12', + 'datapackage>=1.5.1', + 'tableschema>=1.2.5', 'tableschema-sql>=0.10.0', 'pyyaml', 'ujson', 'mistune', 'redis', - 'sqlalchemy', - 'click<7.0', + 'click<8.0', 'awesome-slugify', 'flask<1.0.0', 'flask-cors', @@ -39,16 +38,17 @@ def read(*paths): 'flask-basicauth', 'cachetools', 'tabulator>=1.14.0', - 'dataflows', + 'dataflows>=0.0.29', ] SPEEDUP_REQUIRES = [ - 'plyvel<1', + 'dataflows[speedup]', ] LINT_REQUIRES = [ 'pylama', ] TESTS_REQUIRE = [ 'tox', + 'sqlalchemy', ] README = read('README.md') VERSION = read(PACKAGE, 'VERSION') diff --git a/tests/cli/pipeline-spec.yaml b/tests/cli/pipeline-spec.yaml index e836723b..26da3def 100644 --- a/tests/cli/pipeline-spec.yaml +++ b/tests/cli/pipeline-spec.yaml @@ -90,6 +90,6 @@ dataflows: - flow: test_flow parameters: attr: foo - - run: dump.to_path + - run: dump_to_path parameters: out-path: test_flow_data diff --git a/tests/cli/test_flow.py b/tests/cli/test_flow.py index 2eadf8ea..4aef5f41 100644 --- a/tests/cli/test_flow.py +++ b/tests/cli/test_flow.py @@ -1,4 +1,4 @@ -from dataflows import Flow, dump_to_path, PackageWrapper, load, add_metadata +from dataflows import Flow, dump_to_path, PackageWrapper, load, update_package def hello_dataflows(package: PackageWrapper): @@ -20,8 +20,7 @@ def add_foo_value(row): row[parameters['attr']] = 'foo' stats['foo_values'] += 1 - return Flow(add_metadata(name='_'), + return Flow(update_package(name='_'), hello_dataflows, - load((datapackage, resources)), add_foo_field, add_foo_value) diff --git a/tests/docker/pipeline-spec.yaml b/tests/docker/pipeline-spec.yaml index 3b5ba077..6d61ba5b 100644 --- a/tests/docker/pipeline-spec.yaml +++ b/tests/docker/pipeline-spec.yaml @@ -1,7 +1,7 @@ test: pipeline: - run: test - - run: dump.to_path + - run: dump_to_path parameters: out-path: data @@ -17,6 +17,6 @@ test-package: - run: test parameters: test-package: true - - run: dump.to_path + - run: dump_to_path parameters: out-path: data/test_package diff --git a/tests/docker/test.py b/tests/docker/test.py index 6f60d37b..6d00432a 100644 --- a/tests/docker/test.py +++ b/tests/docker/test.py @@ -2,7 +2,7 @@ from datapackage_pipelines.utilities.resources import PROP_STREAMING import datetime -parameters, datapackage, resources, stats = ingest() + ({},) +parameters, datapackage, resources, stats = tuple(ingest()) + ({},) if parameters.get('test-package'): diff --git a/tests/env/dummy/pipeline-spec.yaml b/tests/env/dummy/pipeline-spec.yaml index def9a877..604cc75f 100644 --- a/tests/env/dummy/pipeline-spec.yaml +++ b/tests/env/dummy/pipeline-spec.yaml @@ -1,7 +1,7 @@ pipeline-test-stream-remote-resources-nones: pipeline: - - run: add_metadata + run: update_package parameters: name: test_nulls - @@ -31,14 +31,14 @@ pipeline-test-stream-remote-resources-nones: constraints: required: true - - run: dump.to_path + run: dump_to_path parameters: out-path: nulls-test pipeline-test-basic: pipeline: - - run: add_metadata + run: update_package parameters: name: 'al-treasury-spending' title: 'Albania Treasury Service' @@ -93,9 +93,11 @@ pipeline-test-basic: - run: ..common.pipeline-common - - run: dump.to_zip + run: dump_to_zip parameters: out-file: dump.zip + - + run: printer pipeline-test-big-outputs: @@ -107,16 +109,17 @@ pipeline-test-big-outputs: pipeline-test-hooks: pipeline: - - run: add_metadata + run: update_package parameters: name: 'hook-tests' - - run: add_resource + run: load parameters: + from: "https://raw.githubusercontent.com/openspending/fiscal-data-package-demos/master/al-treasury-spending/data/treasury.csv" name: "treasury" - url: "https://raw.githubusercontent.com/openspending/fiscal-data-package-demos/master/al-treasury-spending/data/treasury.csv" + quotechar: '"' - - run: dump.to_path + run: dump_to_path parameters: out-path: hooks-outputs @@ -126,7 +129,7 @@ pipeline-test-hooks: pipeline-test-datatypes: pipeline: - - run: add_metadata + run: update_package parameters: name: 'type-tests' - @@ -169,7 +172,7 @@ pipeline-test-datatypes: geojson: type: geojson - - run: dump.to_path + run: dump_to_path parameters: out-path: type-tests-output @@ -178,7 +181,7 @@ pipeline-test-datatypes2: - pipeline: ./tests/env/dummy/pipeline-test-datatypes pipeline: - - run: add_metadata + run: update_package parameters: name: 'type-tests' - @@ -187,7 +190,7 @@ pipeline-test-datatypes2: url: dependency://./tests/env/dummy/pipeline-test-datatypes resource: types - - run: dump.to_path + run: dump_to_path parameters: out-path: type-tests-output2 @@ -204,13 +207,13 @@ pipeline-test-code: run: code code: | from datapackage_pipelines.wrapper import ingest, spew - parameters, datapackage, resources, stats = ingest() + ({},) + parameters, datapackage, resources, stats = tuple(ingest()) + ({},) def get_resource(): for descriptor, resource in zip(datapackage["resources"], resources): for row in resource: yield row spew(datapackage, [get_resource()], stats) - - run: dump.to_path + run: dump_to_path parameters: out-path: code-test-output diff --git a/tests/stdlib/fixtures/add_resource_existent_env b/tests/stdlib/fixtures/add_resource_existent_env index 1cf23176..0fa5e538 100644 --- a/tests/stdlib/fixtures/add_resource_existent_env +++ b/tests/stdlib/fixtures/add_resource_existent_env @@ -16,7 +16,7 @@ add_resource "resources": [ { "name": "my-env-resource", - "dpp:streamedFrom": "file://tests/data/sample.csv", + "dpp:streamedFrom": "tests/data/sample.csv", "path": "_" } ] diff --git a/tests/stdlib/fixtures/dump_to_sql_update_mode__insert b/tests/stdlib/fixtures/dump_to_sql_update_mode__insert index 5d794f6a..ca13332f 100644 --- a/tests/stdlib/fixtures/dump_to_sql_update_mode__insert +++ b/tests/stdlib/fixtures/dump_to_sql_update_mode__insert @@ -1,4 +1,4 @@ -dump.to_sql +dump_to_sql -- { "tables": { @@ -33,11 +33,13 @@ dump.to_sql -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "id", "type": "integer"}, @@ -53,4 +55,4 @@ dump.to_sql -- {"id": 1, "mystring":"a", "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}} -{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "f9bb6e9b25d2c3e2d1764a8287107bb4"} +{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "5dad5b7c7fb3fecb7478b4f34fabbd23"} diff --git a/tests/stdlib/fixtures/dump_to_sql_update_mode__update b/tests/stdlib/fixtures/dump_to_sql_update_mode__update index 5d794f6a..ca13332f 100644 --- a/tests/stdlib/fixtures/dump_to_sql_update_mode__update +++ b/tests/stdlib/fixtures/dump_to_sql_update_mode__update @@ -1,4 +1,4 @@ -dump.to_sql +dump_to_sql -- { "tables": { @@ -33,11 +33,13 @@ dump.to_sql -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "id", "type": "integer"}, @@ -53,4 +55,4 @@ dump.to_sql -- {"id": 1, "mystring":"a", "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}} -{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "f9bb6e9b25d2c3e2d1764a8287107bb4"} +{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "5dad5b7c7fb3fecb7478b4f34fabbd23"} diff --git a/tests/stdlib/fixtures/dump_to_sql_with_updated_data b/tests/stdlib/fixtures/dump_to_sql_with_updated_data index d521d281..397a3a8a 100644 --- a/tests/stdlib/fixtures/dump_to_sql_with_updated_data +++ b/tests/stdlib/fixtures/dump_to_sql_with_updated_data @@ -1,4 +1,4 @@ -dump.to_sql +dump_to_sql -- { "tables": { @@ -33,11 +33,13 @@ dump.to_sql -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "mystring", "type": "string"}, @@ -50,6 +52,6 @@ dump.to_sql ] } -- -{"mystring":"a", "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}, "updated": false, "updated_id": null} +{"mystring":"a", "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}, "myinteger": null, "updated": false, "updated_id": null} -{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "1ae286a22a1dd3c17aa78a047dd778ef"} +{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "c1c867cd9711aedd5c94a16ce4590ece"} diff --git a/tests/stdlib/fixtures/load_existent_env b/tests/stdlib/fixtures/load_existent_env new file mode 100644 index 00000000..49fecc6f --- /dev/null +++ b/tests/stdlib/fixtures/load_existent_env @@ -0,0 +1,51 @@ +load +-- +{ + "from": "env://EXISTENT_ENV", + "name": "my-env-resource", + "validate": true +} +-- +{ + "name": "test", + "resources": [] +} +-- +-- +{ + "name": "test", + "profile": "data-package", + "resources": [ + { + "dpp:streamedFrom": "env://EXISTENT_ENV", + "dpp:streaming": true, + "encoding": "utf-8", + "format": "csv", + "mediatype": "text/csv", + "name": "my-env-resource", + "path": "my-env-resource.csv", + "profile": "tabular-data-resource", + "schema": { + "fields": [ + {"format": "default", "name": "first_name", "type": "string"}, + {"format": "default", "name": "last_name", "type": "string"}, + {"format": "default", "name": "house", "type": "string"}, + {"format": "default", "name": "age", "type": "integer"} + ], + "missingValues": [""] + } + } + ] +} +-- +{"age": 27, "first_name": "Tyrion", "house": "Lannister", "last_name": "Lannister"} +{"age": 34, "first_name": "Jaime", "house": "Lannister", "last_name": "Lannister"} +{"age": 34, "first_name": "Cersei", "house": "Lannister", "last_name": "Lannister"} +{"age": 17, "first_name": "Jon", "house": "Stark", "last_name": "Snow"} +{"age": 14, "first_name": "Sansa", "house": "Stark", "last_name": "Stark"} +{"age": 11, "first_name": "Arya", "house": "Stark", "last_name": "Stark"} +{"age": 10, "first_name": "Bran", "house": "Stark", "last_name": "Stark"} +{"age": 5, "first_name": "Rickon", "house": "Stark", "last_name": "Stark"} +{"age": 16, "first_name": "Daenerys", "house": "Targaryen", "last_name": "Targaryen"} + +{} diff --git a/tests/stdlib/fixtures/obj_fix_dump_to_sql b/tests/stdlib/fixtures/obj_fix_dump_to_sql index f2bc30b6..134b762d 100644 --- a/tests/stdlib/fixtures/obj_fix_dump_to_sql +++ b/tests/stdlib/fixtures/obj_fix_dump_to_sql @@ -1,4 +1,4 @@ -dump.to_sql +dump_to_sql -- { "comment": [ @@ -36,11 +36,13 @@ dump.to_sql -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "myarray", "type": "array"}, @@ -55,4 +57,4 @@ dump.to_sql -- {"myarray": "[\"2016-12-31\", \"2016-11-10T12:34:56\"]", "mydate": {"type{date}": "2016-12-31"}, "mynumber": {"type{decimal}": "2"}, "myobject": "{\"n1\": {\"n2\": 78.99}}"} -{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "06a152f7eaf28a2d12e1cba3b0675a46"} +{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "bed26992ae39b43e8b58c0190e8a52e5"} diff --git a/tests/stdlib/fixtures/reverse_sort b/tests/stdlib/fixtures/reverse_sort index b2a4bd08..650e3c0a 100644 --- a/tests/stdlib/fixtures/reverse_sort +++ b/tests/stdlib/fixtures/reverse_sort @@ -58,11 +58,13 @@ sort -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "concat-a1", "dpp:streaming": true, "path": "concat-a1.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "a1", "type": "string"}, {"name": "a2", "type": "string"}, @@ -73,6 +75,7 @@ sort "name": "concat-a2", "dpp:streaming": true, "path": "concat-a2.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "a1", "type": "string"}, {"name": "a2", "type": "string"}, @@ -83,6 +86,7 @@ sort "name": "concat-c", "dpp:streaming": true, "path": "concat-c.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "c1", "type": "string"}, {"name": "c2", "type": "string"}, diff --git a/tests/stdlib/fixtures/simple_add_computed_field b/tests/stdlib/fixtures/simple_add_computed_field index 93e1beab..e20bb3b3 100644 --- a/tests/stdlib/fixtures/simple_add_computed_field +++ b/tests/stdlib/fixtures/simple_add_computed_field @@ -77,11 +77,13 @@ add_computed_field -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "salaries", "dpp:streaming": true, "path": "salaries.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "first_name", "type": "string"}, {"name": "last_name", "type": "string"}, diff --git a/tests/stdlib/fixtures/simple_concat b/tests/stdlib/fixtures/simple_concat index 4729da7e..c17417d5 100644 --- a/tests/stdlib/fixtures/simple_concat +++ b/tests/stdlib/fixtures/simple_concat @@ -62,12 +62,14 @@ concatenate -- { "name": "test", - "resources": [ + "profile": "data-package", + "resources": [ { "name": "target", "dpp:streaming": true, "path": "data/target.csv", "mediatype": "text/csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "t1", "type": "string"}, {"name": "t2", "type": "string"}, diff --git a/tests/stdlib/fixtures/simple_delete_fields b/tests/stdlib/fixtures/simple_delete_fields index c44dcdde..c2835f6b 100644 --- a/tests/stdlib/fixtures/simple_delete_fields +++ b/tests/stdlib/fixtures/simple_delete_fields @@ -51,11 +51,13 @@ delete_fields -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "got-characters", "dpp:streaming": true, "path": "characters.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "first_name", "type": "string"}, {"name": "house", "type": "string"} @@ -65,6 +67,7 @@ delete_fields "name": "got-houses", "dpp:streaming": true, "path": "houses.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "house", "type": "string"} ]} diff --git a/tests/stdlib/fixtures/simple_dump_dot_to_zip b/tests/stdlib/fixtures/simple_dump_dot_to_zip new file mode 100644 index 00000000..7a46e282 --- /dev/null +++ b/tests/stdlib/fixtures/simple_dump_dot_to_zip @@ -0,0 +1,58 @@ +dump.to_zip +-- +{ + "out-file": "my-spiffy-resource.zip" +} +-- +{ + "name": "test", + "resources": [ + { + "name": "my-spiffy-resource", + "dpp:streaming": true, + "path": "data/my-data.csv", + "schema": { + "fields": [ + {"name": "mystring", "type": "string"}, + {"name": "myinteger", "type": "integer"}, + {"name": "mynumber", "type": "number"}, + {"name": "mydate", "type": "date"} + ] + } + } + ] +} +-- +{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} +-- +{ + "name": "test", + "resources": [ + { + "name": "my-spiffy-resource", + "dpp:streaming": true, + "path": "data/my-data.csv", + "encoding": "utf-8", + "format": "csv", + "dialect": { + "delimiter": ",", + "doubleQuote": true, + "lineTerminator": "\r\n", + "quoteChar": "\"", + "skipInitialSpace": false + }, + "schema": { + "fields": [ + {"name": "mystring", "type": "string"}, + {"name": "myinteger", "type": "integer"}, + {"name": "mynumber", "type": "number", "groupChar": "", "decimalChar": "."}, + {"format": "%Y-%m-%d", "name": "mydate", "type": "date"} + ] + } + } + ] +} +-- +{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} + +{"bytes": 703, "count_of_rows": 1, "dataset_name": "test", "hash": "a730863e99517930eab15f55036d309f"} diff --git a/tests/stdlib/fixtures/simple_dump_dot_to_zip_with_hash b/tests/stdlib/fixtures/simple_dump_dot_to_zip_with_hash new file mode 100644 index 00000000..1d665398 --- /dev/null +++ b/tests/stdlib/fixtures/simple_dump_dot_to_zip_with_hash @@ -0,0 +1,59 @@ +dump.to_zip +-- +{ + "out-file": "my-spiffy-resource.zip", + "add-filehash-to-path": true +} +-- +{ + "name": "test", + "resources": [ + { + "name": "my-spiffy-resource", + "dpp:streaming": true, + "path": "data/my-data.csv", + "schema": { + "fields": [ + {"name": "mystring", "type": "string"}, + {"name": "myinteger", "type": "integer"}, + {"name": "mynumber", "type": "number"}, + {"name": "mydate", "type": "date"} + ] + } + } + ] +} +-- +{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} +-- +{ + "name": "test", + "resources": [ + { + "name": "my-spiffy-resource", + "dpp:streaming": true, + "path": "data/my-data.csv", + "encoding": "utf-8", + "format": "csv", + "dialect": { + "delimiter": ",", + "doubleQuote": true, + "lineTerminator": "\r\n", + "quoteChar": "\"", + "skipInitialSpace": false + }, + "schema": { + "fields": [ + {"name": "mystring", "type": "string"}, + {"name": "myinteger", "type": "integer"}, + {"name": "mynumber", "type": "number", "groupChar": "", "decimalChar": "."}, + {"format": "%Y-%m-%d", "name": "mydate", "type": "date"} + ] + } + } + ] +} +-- +{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} + +{"bytes": 736, "count_of_rows": 1, "dataset_name": "test", "hash": "24b55bb6b0ecacdadbc8a1dc1fd9dab9"} diff --git a/tests/stdlib/fixtures/simple_dump_dot_to_zip_with_hash_&_pretty_descriptor b/tests/stdlib/fixtures/simple_dump_dot_to_zip_with_hash_&_pretty_descriptor new file mode 100644 index 00000000..83cd7349 --- /dev/null +++ b/tests/stdlib/fixtures/simple_dump_dot_to_zip_with_hash_&_pretty_descriptor @@ -0,0 +1,60 @@ +dump.to_zip +-- +{ + "out-file": "my-spiffy-resource.zip", + "add-filehash-to-path": true, + "pretty-descriptor": true +} +-- +{ + "name": "test", + "resources": [ + { + "name": "my-spiffy-resource", + "dpp:streaming": true, + "path": "data/my-data.csv", + "schema": { + "fields": [ + {"name": "mystring", "type": "string"}, + {"name": "myinteger", "type": "integer"}, + {"name": "mynumber", "type": "number"}, + {"name": "mydate", "type": "date"} + ] + } + } + ] +} +-- +{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} +-- +{ + "name": "test", + "resources": [ + { + "name": "my-spiffy-resource", + "dpp:streaming": true, + "path": "data/my-data.csv", + "encoding": "utf-8", + "format": "csv", + "dialect": { + "delimiter": ",", + "doubleQuote": true, + "lineTerminator": "\r\n", + "quoteChar": "\"", + "skipInitialSpace": false + }, + "schema": { + "fields": [ + {"name": "mystring", "type": "string"}, + {"name": "myinteger", "type": "integer"}, + {"name": "mynumber", "type": "number", "groupChar": "", "decimalChar": "."}, + {"format": "%Y-%m-%d", "name": "mydate", "type": "date"} + ] + } + } + ] +} +-- +{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} + +{"bytes": 1110, "count_of_rows": 1, "dataset_name": "test", "hash": "174d14a56ce3c798b369d1716488ca75"} diff --git a/tests/stdlib/fixtures/simple_dump_to_sql b/tests/stdlib/fixtures/simple_dump_to_sql index dc066e0c..bf279122 100644 --- a/tests/stdlib/fixtures/simple_dump_to_sql +++ b/tests/stdlib/fixtures/simple_dump_to_sql @@ -1,4 +1,4 @@ -dump.to_sql +dump_to_sql -- { "tables": { @@ -31,11 +31,13 @@ dump.to_sql -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "mystring", "type": "string"}, @@ -48,6 +50,6 @@ dump.to_sql ] } -- -{"mystring":"a", "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}} +{"mystring":"a", "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}, "myinteger": null} -{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "1ae286a22a1dd3c17aa78a047dd778ef"} +{"bytes": null, "count_of_rows": 1, "dataset_name": "test", "hash": "c1c867cd9711aedd5c94a16ce4590ece"} diff --git a/tests/stdlib/fixtures/simple_dump_to_zip b/tests/stdlib/fixtures/simple_dump_to_zip index 7a46e282..a6fee236 100644 --- a/tests/stdlib/fixtures/simple_dump_to_zip +++ b/tests/stdlib/fixtures/simple_dump_to_zip @@ -1,4 +1,4 @@ -dump.to_zip +dump_to_zip -- { "out-file": "my-spiffy-resource.zip" @@ -27,32 +27,25 @@ dump.to_zip -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", - "encoding": "utf-8", - "format": "csv", - "dialect": { - "delimiter": ",", - "doubleQuote": true, - "lineTerminator": "\r\n", - "quoteChar": "\"", - "skipInitialSpace": false - }, + "profile": "data-resource", "schema": { "fields": [ {"name": "mystring", "type": "string"}, {"name": "myinteger", "type": "integer"}, - {"name": "mynumber", "type": "number", "groupChar": "", "decimalChar": "."}, - {"format": "%Y-%m-%d", "name": "mydate", "type": "date"} + {"name": "mynumber", "type": "number"}, + {"name": "mydate", "type": "date"} ] } } ] } -- -{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} +{"mystring":"a", "myinteger": null, "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}} -{"bytes": 703, "count_of_rows": 1, "dataset_name": "test", "hash": "a730863e99517930eab15f55036d309f"} +{"bytes": 749, "count_of_rows": 1, "dataset_name": "test", "hash": "6dac7a946f31150cc099f7463c233dfc"} diff --git a/tests/stdlib/fixtures/simple_dump_to_zip_with_hash b/tests/stdlib/fixtures/simple_dump_to_zip_with_hash index 1d665398..b525d02e 100644 --- a/tests/stdlib/fixtures/simple_dump_to_zip_with_hash +++ b/tests/stdlib/fixtures/simple_dump_to_zip_with_hash @@ -1,4 +1,4 @@ -dump.to_zip +dump_to_zip -- { "out-file": "my-spiffy-resource.zip", @@ -28,32 +28,25 @@ dump.to_zip -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", - "encoding": "utf-8", - "format": "csv", - "dialect": { - "delimiter": ",", - "doubleQuote": true, - "lineTerminator": "\r\n", - "quoteChar": "\"", - "skipInitialSpace": false - }, + "profile": "data-resource", "schema": { "fields": [ {"name": "mystring", "type": "string"}, {"name": "myinteger", "type": "integer"}, - {"name": "mynumber", "type": "number", "groupChar": "", "decimalChar": "."}, - {"format": "%Y-%m-%d", "name": "mydate", "type": "date"} + {"name": "mynumber", "type": "number"}, + {"name": "mydate", "type": "date"} ] } } ] } -- -{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} +{"mystring":"a", "myinteger": null, "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}} -{"bytes": 736, "count_of_rows": 1, "dataset_name": "test", "hash": "24b55bb6b0ecacdadbc8a1dc1fd9dab9"} +{"bytes": 749, "count_of_rows": 1, "dataset_name": "test", "hash": "6dac7a946f31150cc099f7463c233dfc"} diff --git a/tests/stdlib/fixtures/simple_dump_to_zip_with_hash_&_pretty_descriptor b/tests/stdlib/fixtures/simple_dump_to_zip_with_hash_&_pretty_descriptor index 83cd7349..0385b33d 100644 --- a/tests/stdlib/fixtures/simple_dump_to_zip_with_hash_&_pretty_descriptor +++ b/tests/stdlib/fixtures/simple_dump_to_zip_with_hash_&_pretty_descriptor @@ -1,4 +1,4 @@ -dump.to_zip +dump_to_zip -- { "out-file": "my-spiffy-resource.zip", @@ -8,11 +8,13 @@ dump.to_zip -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "mystring", "type": "string"}, @@ -29,32 +31,25 @@ dump.to_zip -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "my-spiffy-resource", "dpp:streaming": true, "path": "data/my-data.csv", - "encoding": "utf-8", - "format": "csv", - "dialect": { - "delimiter": ",", - "doubleQuote": true, - "lineTerminator": "\r\n", - "quoteChar": "\"", - "skipInitialSpace": false - }, + "profile": "data-resource", "schema": { "fields": [ {"name": "mystring", "type": "string"}, {"name": "myinteger", "type": "integer"}, - {"name": "mynumber", "type": "number", "groupChar": "", "decimalChar": "."}, - {"format": "%Y-%m-%d", "name": "mydate", "type": "date"} + {"name": "mynumber", "type": "number"}, + {"name": "mydate", "type": "date"} ] } } ] } -- -{"mystring":"a", "mynumber": 2.0, "mydate": {"type{date}": "2016-12-31"}} +{"mystring":"a", "myinteger": null, "mynumber": {"type{decimal}": "2"}, "mydate": {"type{date}": "2016-12-31"}} -{"bytes": 1110, "count_of_rows": 1, "dataset_name": "test", "hash": "174d14a56ce3c798b369d1716488ca75"} +{"bytes": 749, "count_of_rows": 1, "dataset_name": "test", "hash": "6dac7a946f31150cc099f7463c233dfc"} diff --git a/tests/stdlib/fixtures/simple_filter b/tests/stdlib/fixtures/simple_filter index 0cc7d92a..38526980 100644 --- a/tests/stdlib/fixtures/simple_filter +++ b/tests/stdlib/fixtures/simple_filter @@ -62,11 +62,13 @@ filter -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "concat-a1", "dpp:streaming": true, "path": "concat-a1.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "a1", "type": "string"}, {"name": "a2", "type": "string"}, @@ -77,6 +79,7 @@ filter "name": "concat-a2", "dpp:streaming": true, "path": "concat-a2.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "a1", "type": "string"}, {"name": "a2", "type": "string"}, @@ -87,6 +90,7 @@ filter "name": "concat-c", "dpp:streaming": true, "path": "concat-c.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "c1", "type": "string"}, {"name": "c2", "type": "string"}, diff --git a/tests/stdlib/fixtures/simple_find_replace b/tests/stdlib/fixtures/simple_find_replace index 14da89dd..d7a8b2ad 100644 --- a/tests/stdlib/fixtures/simple_find_replace +++ b/tests/stdlib/fixtures/simple_find_replace @@ -66,11 +66,13 @@ find_replace -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "dates", "dpp:streaming": true, "path": "dates.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "year", "type": "string"}, diff --git a/tests/stdlib/fixtures/simple_join b/tests/stdlib/fixtures/simple_join index 47b9ee5a..f27c84cc 100644 --- a/tests/stdlib/fixtures/simple_join +++ b/tests/stdlib/fixtures/simple_join @@ -83,11 +83,13 @@ join -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "got-houses", "dpp:streaming": true, "path": "houses.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "house", "type": "string"}, {"name": "avg_age", "type": "number"}, diff --git a/tests/stdlib/fixtures/simple_load b/tests/stdlib/fixtures/simple_load new file mode 100644 index 00000000..acce5dbe --- /dev/null +++ b/tests/stdlib/fixtures/simple_load @@ -0,0 +1,51 @@ +load +-- +{ + "from": "https://raw.githubusercontent.com/frictionlessdata/datapackage-pipelines/master/tests/data/sample.csv", + "name": "my-spiffy-resource", + "validate": true +} +-- +{ + "name": "test", + "resources": [] +} +-- +-- +{ + "name": "test", + "profile": "data-package", + "resources": [ + { + "dpp:streaming": true, + "encoding": "utf-8", + "format": "csv", + "mediatype": "text/csv", + "name": "my-spiffy-resource", + "path": "my-spiffy-resource.csv", + "profile": "tabular-data-resource", + "dpp:streamedFrom": "https://raw.githubusercontent.com/frictionlessdata/datapackage-pipelines/master/tests/data/sample.csv", + "schema": { + "fields": [ + {"format": "default", "name": "first_name", "type": "string"}, + {"format": "default", "name": "last_name", "type": "string"}, + {"format": "default", "name": "house", "type": "string"}, + {"format": "default", "name": "age", "type": "integer"} + ], + "missingValues": [""] + } + } + ] +} +-- +{"age": 27, "first_name": "Tyrion", "house": "Lannister", "last_name": "Lannister"} +{"age": 34, "first_name": "Jaime", "house": "Lannister", "last_name": "Lannister"} +{"age": 34, "first_name": "Cersei", "house": "Lannister", "last_name": "Lannister"} +{"age": 17, "first_name": "Jon", "house": "Stark", "last_name": "Snow"} +{"age": 14, "first_name": "Sansa", "house": "Stark", "last_name": "Stark"} +{"age": 11, "first_name": "Arya", "house": "Stark", "last_name": "Stark"} +{"age": 10, "first_name": "Bran", "house": "Stark", "last_name": "Stark"} +{"age": 5, "first_name": "Rickon", "house": "Stark", "last_name": "Stark"} +{"age": 16, "first_name": "Daenerys", "house": "Targaryen", "last_name": "Targaryen"} + +{} diff --git a/tests/stdlib/fixtures/simple_resource_duplication b/tests/stdlib/fixtures/simple_resource_duplication index 5b50e09c..a4918884 100644 --- a/tests/stdlib/fixtures/simple_resource_duplication +++ b/tests/stdlib/fixtures/simple_resource_duplication @@ -36,11 +36,13 @@ duplicate -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "original", "dpp:streaming": true, "path": "data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "year", "type": "integer"}, {"name": "data", "type": "string"} @@ -50,6 +52,7 @@ duplicate "name": "the-dup", "dpp:streaming": true, "path": "the-dup.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "year", "type": "integer"}, {"name": "data", "type": "string"} diff --git a/tests/stdlib/fixtures/simple_set_types b/tests/stdlib/fixtures/simple_set_types index aa01611e..0c6178fb 100644 --- a/tests/stdlib/fixtures/simple_set_types +++ b/tests/stdlib/fixtures/simple_set_types @@ -27,11 +27,13 @@ set_types -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "concat-a", "dpp:streaming": true, "path": "concat-a.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "t1", "type": "number", "groupChar": ","} ]} diff --git a/tests/stdlib/fixtures/simple_sort b/tests/stdlib/fixtures/simple_sort index 664eb6fb..39c72901 100644 --- a/tests/stdlib/fixtures/simple_sort +++ b/tests/stdlib/fixtures/simple_sort @@ -57,11 +57,13 @@ sort -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "concat-a1", "dpp:streaming": true, "path": "concat-a1.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "a1", "type": "string"}, {"name": "a2", "type": "string"}, @@ -72,6 +74,7 @@ sort "name": "concat-a2", "dpp:streaming": true, "path": "concat-a2.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "a1", "type": "string"}, {"name": "a2", "type": "string"}, @@ -82,6 +85,7 @@ sort "name": "concat-c", "dpp:streaming": true, "path": "concat-c.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "c1", "type": "string"}, {"name": "c2", "type": "string"}, diff --git a/tests/stdlib/fixtures/simple_unpivot b/tests/stdlib/fixtures/simple_unpivot index 004bc96e..e9ddd7b2 100644 --- a/tests/stdlib/fixtures/simple_unpivot +++ b/tests/stdlib/fixtures/simple_unpivot @@ -74,11 +74,13 @@ unpivot -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "balance", "dpp:streaming": true, "path": "balance.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "company", "type": "string"}, {"name": "year", "type": "integer"}, diff --git a/tests/stdlib/fixtures/simple_add_metadata b/tests/stdlib/fixtures/simple_update_package similarity index 85% rename from tests/stdlib/fixtures/simple_add_metadata rename to tests/stdlib/fixtures/simple_update_package index 17bfffd1..66985c59 100644 --- a/tests/stdlib/fixtures/simple_add_metadata +++ b/tests/stdlib/fixtures/simple_update_package @@ -1,4 +1,4 @@ -add_metadata +update_package -- { "title": "Moshe", @@ -16,6 +16,7 @@ add_metadata { "name": "test", "title": "Moshe", + "profile": "data-package", "sources": { "web": "http://google.com" }, diff --git a/tests/stdlib/fixtures/sort_with_duplicate_keys b/tests/stdlib/fixtures/sort_with_duplicate_keys index 6a6a2407..4fd9c364 100644 --- a/tests/stdlib/fixtures/sort_with_duplicate_keys +++ b/tests/stdlib/fixtures/sort_with_duplicate_keys @@ -35,11 +35,13 @@ sort -- { "name": "test", + "profile": "data-package", "resources": [ { "name": "data", "dpp:streaming": true, "path": "data.csv", + "profile": "data-resource", "schema": { "fields": [ {"name": "year", "type": "integer"}, {"name": "data", "type": "string"} diff --git a/tests/stdlib/test_stdlib.py b/tests/stdlib/test_stdlib.py index 37033f5e..92d742dc 100644 --- a/tests/stdlib/test_stdlib.py +++ b/tests/stdlib/test_stdlib.py @@ -7,7 +7,7 @@ ENV = os.environ.copy() ENV['PYTHONPATH'] = ROOT_PATH -ENV['EXISTENT_ENV'] = 'file://tests/data/sample.csv' +ENV['EXISTENT_ENV'] = 'tests/data/sample.csv' DEFAULT_TEST_DB = "sqlite://" ENV['DPP_DB_ENGINE'] = os.environ.get("OVERRIDE_TEST_DB", DEFAULT_TEST_DB) diff --git a/tests/test_main.py b/tests/test_main.py index fa9c1189..10246707 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -53,7 +53,7 @@ def test_pipeline(): {"pipeline_id": "./tests/env/dummy/pipeline-test-hooks", "event": "start"}, {"pipeline_id": "./tests/env/dummy/pipeline-test-hooks", "event": "finish", "success": True, 'stats': {'.dpp': {'out-datapackage-url': 'hooks-outputs/datapackage.json'}, - 'bytes': 258, 'count_of_rows': None, - 'dataset_name': 'hook-tests', 'hash': '1871cf2829406983b5785b03bde91aa9'}} + 'bytes': 15504, 'count_of_rows': 40, + 'dataset_name': 'hook-tests', 'hash': '273045aa562baa25ce28085745e34865'}} ] assert progresses >= 1 \ No newline at end of file diff --git a/tests/wrapper/test_wrapper.py b/tests/wrapper/test_wrapper.py index aefdf5a9..b5f0031f 100644 --- a/tests/wrapper/test_wrapper.py +++ b/tests/wrapper/test_wrapper.py @@ -10,13 +10,13 @@ def test_spew_finalizer_runs_before_we_signal_that_were_done(self): to STDOUT. The finalizer parameter to spew() must be executed before that, as there can be processors that depend on us finishing our processing before they're able to run. For example, a processor that depends on - `dump.to_zip` must wait until it has finished writing to the local + `dump_to_zip` must wait until it has finished writing to the local filesystem. ''' datapackage = {} resources_iterator = iter([]) - with mock.patch('sys.stdout') as stdout_mock: + with mock.patch('datapackage_pipelines.wrapper.wrapper.stdout') as stdout_mock: def finalizer(): last_call_args = stdout_mock.write.call_args_list[-1] assert last_call_args != mock.call('\n')