Skip to content

Commit

Permalink
elasticsearch/util.py
Browse files Browse the repository at this point in the history
  • Loading branch information
lzy7071 committed Sep 27, 2019
1 parent 75aaf8f commit 6da22e1
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 17 deletions.
24 changes: 12 additions & 12 deletions karr_lab_aws_manager/elasticsearch/batch_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,53 +71,53 @@ def make_action_and_metadata(self, _id):
action_and_metadata = {'index': { "_index" : self.index, "_id" : _id }}
return action_and_metadata

def data_to_es_bulk(self, count, cursor, bulk_size=100,
def data_to_es_bulk(self, count, cursor, bulk_size=100, _id='uniprot_id',
headers={ "Content-Type": "application/json" }):
''' Load data into elasticsearch service
Args:
count (:obj: `int`): cursor size
cursor (:obj: `pymongo.Cursor` or :obj: `iter`): documents to be PUT to es
action_and_metadata (:obj: `dict`): elasticsearch action_and_metadata information for bulk operations
e.g. {"index": { "_index": "test", "_type" : "_doc"}}
bulk_size (:obj: `int`): number of documents in one PUT
headers (:obj: `dict`): http header
_id (:obj: `str`): unique id for identification
Return:
status_code (:obj: `set`): set of status codes
'''
url = self.es_endpoint + '/_bulk'
status_code = {201}
bulk_file = ''
tot_rounds = math.ceil(count/bulk_size)
def gen_bulk_file(i, bulk_file):
action_and_metadata = self.make_action_and_metadata(i)
def gen_bulk_file(_index, bulk_file):
action_and_metadata = self.make_action_and_metadata(_index)
bulk_file += json.dumps(action_and_metadata) + '\n'
bulk_file += json.dumps(doc) + '\n'
return bulk_file

for i, doc in enumerate(cursor):
if i == self.max_entries:
break
if self.verbose:
if self.verbose and i % bulk_size == 0:
print("Processing bulk {} out of {} ...".format(math.floor(i/bulk_size)+1, tot_rounds))

if i == count - 1: # last entry
bulk_file = gen_bulk_file(i, bulk_file)
bulk_file = gen_bulk_file(doc[_id], bulk_file)
# print('commit last entry')
# print(bulk_file)
r = requests.post(url, auth=self.awsauth, data=bulk_file, headers=headers)
status_code.add(r.status_code)
return status_code
elif i % bulk_size != 0 or i == 0: # bulk_size*(n-1) + 1 --> bulk_size*n - 1
bulk_file = gen_bulk_file(i, bulk_file)
bulk_file = gen_bulk_file(doc[_id], bulk_file)
# print('building in between')
# print(bulk_file)
else: # bulk_size * n
# print('commit endpoint')
# print(bulk_file)
r = requests.post(url, auth=self.awsauth, data=bulk_file, headers=headers)
status_code.add(r.status_code)
bulk_file = gen_bulk_file(i, '') # reset bulk_file
bulk_file = gen_bulk_file(doc[_id], '') # reset bulk_file

def data_to_es_single(self, count, cursor, headers={ "Content-Type": "application/json" }):
def data_to_es_single(self, count, cursor, _id='uniprot_id', headers={ "Content-Type": "application/json" }):
''' Load data into elasticsearch service
Args:
count (:obj: `int`): cursor size
Expand All @@ -134,7 +134,7 @@ def data_to_es_single(self, count, cursor, headers={ "Content-Type": "applicatio
break
if i % 20 == 0 and self.verbose:
print("Processing doc {} out of {}...".format(i, min(count, self.max_entries)))
url = url_root + str(i)
url = url_root + doc[_id]
r = requests.post(url, auth=self.awsauth, json=doc, headers=headers)
status_code.add(r.status_code)
return status_code
Expand All @@ -147,7 +147,7 @@ def main():
server = conf.SERVER
authDB = conf.AUTHDB
db = 'datanator'
manager = MongoToES()
manager = MongoToES(verbose=True)

# data from "protein" collection
count, docs = manager.data_from_mongo_protein(server, db, username, password, authSource=authDB)
Expand Down
49 changes: 49 additions & 0 deletions karr_lab_aws_manager/elasticsearch/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from datanator_query_python.config import config as config_mongo
from karr_lab_aws_manager.config import config
import requests
from requests_aws4auth import AWS4Auth


class EsUtil:

def __init__(self, profile_name='karrlab-zl', credential_path='.wc/third_party/aws_credentials',
config_path='.wc/third_party/aws_config', elastic_path='.wc/third_party/elasticsearch.ini',
cache_dir=None, service_name='es', index='protein', max_entries=float('inf'), verbose=False):
'''
Args:
profile_name (:obj: `str`): AWS profile to use for authentication
credential_path (:obj: `str`): directory for aws credentials file
config_path (:obj: `str`): directory for aws config file
elastic_path (:obj: `str`): directory for file containing aws elasticsearch service variables
cache_dir (:obj: `str`): temp directory to store json for bulk upload
service_name (:obj: `str`): aws service to be used
'''
session = config.establishES(config_path=config_path, profile_name=profile_name,
elastic_path=elastic_path, service_name=service_name)
self.verbose = verbose
self.max_entries = max_entries
self.cache_dir = cache_dir
self.client = session.client
self.es_endpoint = session.es_endpoint
self.awsauth = AWS4Auth(session.access_key, session.secret_key,
session.region, service_name)
self.index = index

def delete_index(self, index, _id=None):
''' Delete elasticsearch index
Args:
index (:obj: `str`): name of index in es
_id (:obj: `int`): id of the doc in index (optional)
'''
if _id is None:
url = self.es_endpoint + '/' + index
else:
url = self.es_endpoint + '/' + index + '/_doc/' + _id
requests.delete(url, auth=self.awsauth)

def main():
manager = EsUtil()
manager.delete_index('protein')

if __name__ == "__main__":
main()
11 changes: 6 additions & 5 deletions tests/elasticsearch/test_batch_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ def test_make_action_and_metadata(self):
self.assertEqual(result, {'index': { "_index" : self.src.index, "_id" : _index }})

def test_data_to_es_bulk(self):
cursor = [{'number': 0, 'mock_key_bulk': 'mock_value_0'},
{'number': 1, 'mock_key_bulk': 'mock_value_1'},
{'number': 2, 'mock_key_bulk': 'mock_value_2'}]
cursor = [{'number': 0, 'mock_key_bulk': 'mock_value_0', 'uniprot_id': 0},
{'number': 1, 'mock_key_bulk': 'mock_value_1', 'uniprot_id': 1},
{'number': 2, 'mock_key_bulk': 'mock_value_2', 'uniprot_id': 2},
{'number': 3, 'mock_key_bulk': 'mock_value_4', 'uniprot_id': 3}]
result = self.src.data_to_es_bulk(len(cursor), cursor, bulk_size=1)
self.assertTrue(result <= {201, 200})

def test_data_to_es_single(self):
cursor = [{'mock_key': 'mock_value_0', 'another_mock_key': 'another_value_0'},
{'mock_key': 'mock_value_1', 'another_mock_key': 'another_value_0'}]
cursor = [{'mock_key': 'mock_value_0', 'another_mock_key': 'another_value_0', 'uniprot_id': 0},
{'mock_key': 'mock_value_1', 'another_mock_key': 'another_value_0', 'uniprot_id': 1}]
result = self.src.data_to_es_single(len(cursor), cursor)
self.assertTrue(result <= {201, 200})

0 comments on commit 6da22e1

Please sign in to comment.