Skip to content

Commit

Permalink
[#9] Index datapackage in elasticsearch at theend of the pipeline
Browse files Browse the repository at this point in the history
fixes #9
  • Loading branch information
akariv committed Aug 4, 2017
1 parent f1edbe8 commit 8d0f039
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
21 changes: 19 additions & 2 deletions datapackage_pipelines_assembler/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def generate_pipeline(cls, source):
'stats': {
'rowcount': 0,
'bytes': 0,
}
},
'id': pipeline_id
}
},
{
Expand Down Expand Up @@ -113,6 +114,22 @@ def generate_pipeline(cls, source):
"resource-hash": "hash",
}
}
}
},
{
'run': 'assembler.add_indexing_resource'
},
{
'run': 'elasticsearch.dump.to_index',
'parameters': {
'indexes': {
'datasets': [
{
'resource-name': 'datasets',
'doc-type': 'dataset'
}
]
}
}
},
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os

import copy
import datapackage
import itertools

from datapackage_pipelines.wrapper import ingest, spew

parameters, dp, res_iter = ingest()


def modify_datapackage(dp):
dp['resources'].append({
'name': 'datasets',
'path': 'nonexistent',
'schema': {
'fields': [
{'name': 'id', 'type': 'string'},
{'name': 'name', 'type': 'string'},
{'name': 'title', 'type': 'string'},
{'name': 'description', 'type': 'string'},
{'name': 'resources', 'type': 'array',
'es:itemType': 'object', 'es:index': False},
{'name': 'datahub', 'type': 'object',
'es:schema': {
'fields': [
{'name': 'owner', 'type': 'string'},
{'name': 'ownerid', 'type': 'string'},
]
}
},
],
'primaryKey': ['id']
}
})
return dp


def dataset_resource(dp):
yield dp


spew(modify_datapackage(dp),
itertools.chain(res_iter, [dataset_resource(dp)]))
2 changes: 2 additions & 0 deletions datapackage_pipelines_assembler/processors/update_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@


def modify_datapackage(dp, parameters, stats):
id = parameters.pop('id')
dp['id'] = id
dp['datahub'] = parameters
return dp

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def read(*paths):
NAME = PACKAGE.replace('_', '-')
INSTALL_REQUIRES = [
'datapackage-pipelines',
'datapackage-pipelines-elasticsearch',
'psycopg2',
'tweepy',
'facebook-sdk',
Expand Down

0 comments on commit 8d0f039

Please sign in to comment.