Skip to content

Commit

Permalink
first cli command
Browse files Browse the repository at this point in the history
  • Loading branch information
lzy7071 committed Oct 23, 2019
1 parent 19e9838 commit 920ad76
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 9 deletions.
67 changes: 66 additions & 1 deletion karr_lab_aws_manager/__main__.py
Expand Up @@ -9,6 +9,8 @@
import cement
import karr_lab_aws_manager
import karr_lab_aws_manager.core
from karr_lab_aws_manager.config.config import establishES
from karr_lab_aws_manager.elasticsearch_kl import util as es_util


class BaseController(cement.Controller):
Expand Down Expand Up @@ -64,6 +66,65 @@ def _default(self):
args.opt_arg_4


class EsBulkUpload(cement.Controller):
""" Karrlab elasticsearch bulk upload cli """

class Meta:
label = 'es-bulk-upload'
description = 'Bulk loading data into karrlab hosted elasticsearch service'
stacked_on = 'base'
stacked_type = 'nested'
arguments = [
(['count'], dict(
type=int, help='Cursor/file size')),
(['cursor'], dict(
type=str, help='Pymongo.Cursor/directory to files to be loaded')),
(['index'], dict(
type=str, help='Name of unique key to be used as index for es')),
(['id'], dict(
type=str, help='Key in mogno collection for identification')),
(['--bulk_size', '-bz'], dict(
type=int, default=100, help='Name of unique key to be used as index for es')),
(['--profile_name', '-pn'], dict(
type=str, default='es-poweruser',
help='AWS profile to use for authentication')),
(['--credential_path', '-cr'], dict(
type=str, default='~/.wc/third_party/aws_credentials',
help='Directory for aws credentials file')),
(['--config_path', '-cp'], dict(
type=str, default='~/.wc/third_party/aws_config',
help='Directory for aws config file')
),
(['--elastic_path', '-ep'], dict(
type=str, default='~/.wc/third_party/elasticsearch.ini',
help='Directory for file containing aws elasticsearch service variables')),
(['--headers'], dict(
type=dict, default={ "Content-Type": "application/json" },
help='Http header'))
]

@cement.ex(hide=True)
def _default(self):
''' Load data into elasticsearch service
Args:
count (:obj:`int`): cursor size
cursor (:obj:`pymongo.Cursor` or :obj:`iter`): documents to be PUT/POST to es
index (:obj:`str`): name of unique key to be used as index for es
bulk_size (:obj:`int`): number of documents in one PUT
headers (:obj:`dict`): http header
_id (:obj:`str`): key in mogno collection for identification
Returns:
(:obj:`set`): set of status codes
'''
args = self.app.pargs
es_util.EsUtil(profile_name=args.pn, credential_path=args.cr,
config_path=args.cp, elastic_path=args.ep).data_to_es_bulk(
args.count, args.cursor, args.index, bulk_size=args.bulk_size,
_id=args.id, headers=args.headers)


class App(cement.App):
""" Command line application """
class Meta:
Expand All @@ -72,8 +133,12 @@ class Meta:
handlers = [
BaseController,
Command3WithArgumentsController,
EsBulkUpload
]

def main():
with App() as app:
app.run()
app.run()

if __name__=='__main__':
main()
@@ -1,10 +1,10 @@
from datanator_query_python.query import (query_protein, query_metabolites,
query_metabolites_meta)
from datanator_query_python.config import config as config_mongo
from karr_lab_aws_manager.elasticsearch import util
from karr_lab_aws_manager.elasticsearch_kl import util as es_util


class MongoToES(util.EsUtil):
class MongoToES(es_util.EsUtil):

def __init__(self, profile_name=None, credential_path=None,
config_path=None, elastic_path=None,
Expand Down
@@ -1,4 +1,4 @@
from karr_lab_aws_manager.elasticsearch import util
from karr_lab_aws_manager.elasticsearch_kl import util
import requests


Expand Down
Expand Up @@ -3,7 +3,9 @@
import requests
import json
import math
from pathlib import Path
from requests_aws4auth import AWS4Auth
import re


class EsUtil(config.establishES):
Expand Down Expand Up @@ -128,11 +130,23 @@ def data_to_es_bulk(self, count, cursor, index, bulk_size=100, _id='uniprot_id',
status_code = {201}
bulk_file = ''
tot_rounds = math.ceil(count/bulk_size)

def gen_bulk_file(_iid, bulk_file):
action_and_metadata = self.make_action_and_metadata(index, _iid)
bulk_file += json.dumps(action_and_metadata) + '\n'
bulk_file += json.dumps(doc) + '\n'
return bulk_file
return bulk_file

def mod_cursor(cursor):
pathlist = Path(cursor).glob('**/*.json')
for path in pathlist:
with path.open() as f:
yield json.load(f)

if isinstance(cursor, str):
cursor = mod_cursor(cursor)
else:
cursor = cursor

for i, doc in enumerate(cursor):
if i == self.max_entries:
Expand Down
5 changes: 5 additions & 0 deletions setup.py
Expand Up @@ -6,6 +6,7 @@
pip._internal.main(['install', 'pkg_utils'])
import pkg_utils
import os
import sys

name = 'karr_lab_aws_manager'
dirname = os.path.dirname(__file__)
Expand Down Expand Up @@ -43,6 +44,10 @@
],
entry_points={
'console_scripts': [
'{} = {}.__main__:main'.format(name, name),
'{} = {}.__main__:main'.format(name.replace('_', '-'), name),
'{}{:d} = {}.__main__:main'.format(name, sys.version_info[0], name),
'{}{:d} = {}.__main__:main'.format(name.replace('_', '-'), sys.version_info[0], name),
],
},
)
2 changes: 1 addition & 1 deletion tests/elasticsearch/test_batch_load.py
@@ -1,5 +1,5 @@
import unittest
from karr_lab_aws_manager.elasticsearch import batch_load
from karr_lab_aws_manager.elasticsearch_kl import batch_load
from datanator_query_python.config import config
import tempfile
import shutil
Expand Down
13 changes: 12 additions & 1 deletion tests/elasticsearch/test_elasticsearch_util.py
@@ -1,10 +1,13 @@
import unittest
from karr_lab_aws_manager.elasticsearch import util
from karr_lab_aws_manager.elasticsearch_kl import util
from datanator_query_python.config import config
from pathlib import Path
import json
import tempfile
import shutil
import requests
import json


class TestMongoToES(unittest.TestCase):

Expand Down Expand Up @@ -50,13 +53,21 @@ def test_data_to_es_bulk(self):
self.assertTrue(result <= {201, 200})
result = self.src.index_settings(self.index, 0)
self.assertEqual(result.text, '{"acknowledged":true}')
for dic in cursor:
p = Path(self.cache_dir).joinpath(dic['uniprot_id'] + '.json')
with p.open(mode='w+') as f:
json.dump(dic, f)
result = self.src.data_to_es_bulk(len(cursor), self.cache_dir, self.index, bulk_size=1)
self.assertTrue(result <= {201, 200})

# @unittest.skip('reduce debugging confusion')
def test_data_to_es_single(self):
cursor = [{'mock_key': 'mock_value_0', 'another_mock_key': 'another_value_0', 'uniprot_id': 'P0'},
{'mock_key': 'mock_value_1', 'another_mock_key': 'another_value_0', 'uniprot_id': 'P1'}]
result = self.src.data_to_es_single(len(cursor), cursor, self.index)
self.assertTrue(result <= {201, 200})

# @unittest.skip('reduce debugging confusion')
def test_delete_index(self):
cursor = [{'number': 0, 'mock_key_bulk': 'mock_value_0', 'uniprot_id': 'P0'},
{'number': 1, 'mock_key_bulk': 'mock_value_1', 'uniprot_id': 'P1'},
Expand Down
4 changes: 2 additions & 2 deletions tests/elasticsearch/test_es_query_builder.py
@@ -1,8 +1,8 @@
import unittest
import tempfile
import shutil
from karr_lab_aws_manager.elasticsearch import query_builder
from karr_lab_aws_manager.elasticsearch import util as es_util
from karr_lab_aws_manager.elasticsearch_kl import query_builder
from karr_lab_aws_manager.elasticsearch_kl import util as es_util
import requests
import time

Expand Down

0 comments on commit 920ad76

Please sign in to comment.