Skip to content

Commit

Permalink
Combine adding indexing resource with custom s3 dumper
Browse files Browse the repository at this point in the history
fixes #8
fixes #9
  • Loading branch information
akariv committed Aug 5, 2017
1 parent fed5feb commit 260ed86
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
7 changes: 2 additions & 5 deletions datapackage_pipelines_assembler/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datapackage_pipelines.generators import (
GeneratorBase,
)
from .processors.add_indexing_resource import create_index
from .processors.dump_to_s3 import create_index

import logging
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -106,7 +106,7 @@ def generate_pipeline(cls, source):
# }
# },
{
'run': 'aws.dump.to_s3',
'run': 'assembler.dump_to_s3',
'parameters': {
'force-format': False,
'handle-non-tabular': True,
Expand All @@ -122,9 +122,6 @@ def generate_pipeline(cls, source):
}
}
},
{
'run': 'assembler.add_indexing_resource'
},
{
'run': 'elasticsearch.dump.to_index',
'parameters': {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import itertools

import copy
from tableschema_elasticsearch import Storage

from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines_aws.processors.dump.to_s3 import S3Dumper


SCHEMA = {
'fields': [
Expand Down Expand Up @@ -50,11 +53,22 @@ def dataset_resource(dp):
'datahub'
]
)
dp = copy.deepcopy(dp)
dp['resources'].pop()
ret['datapackage'] = dp
yield ret


class MyS3Dumper(S3Dumper):

def prepare_datapackage(self, datapackage, params):
datapackage = super(MyS3Dumper, self).prepare_datapackage(datapackage, params)
return modify_datapackage(datapackage)

def handle_resources(self, datapackage, resource_iterator, parameters, stats):
yield from super(MyS3Dumper, self).handle_resources(datapackage, resource_iterator, parameters, stats)
yield [dataset_resource(datapackage)]


if __name__ == "__main__":
parameters, dp, res_iter = ingest()
spew(modify_datapackage(dp),
itertools.chain(res_iter, [dataset_resource(dp)]))
MyS3Dumper()()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def read(*paths):
INSTALL_REQUIRES = [
'datapackage-pipelines',
'datapackage-pipelines-elasticsearch>=0.0.3',
'datapackage-pipelines-aws>=0.0.6',
'psycopg2',
'tweepy',
'facebook-sdk',
Expand Down

0 comments on commit 260ed86

Please sign in to comment.