Skip to content

Commit

Permalink
fixed Issue#184
Browse files Browse the repository at this point in the history
  • Loading branch information
erikyao committed Aug 29, 2021
1 parent 38fbf09 commit 336b14e
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions biothings/utils/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, index, doc_type='_doc', es_host='localhost:9200',
# if index is actually an alias, resolve the alias to
# the real underlying index
try:
res = self._es.indices.get_alias(index)
res = self._es.indices.get_alias(index=index)
# this was an alias
assert len(res) == 1, "Expecing '%s' to be an alias, but got nothing..." % index
self._index = list(res.keys())[0]
Expand Down Expand Up @@ -148,7 +148,7 @@ def mexists(self, bid_list):
@wrapper
def count(self, q=None, raw=False):
try:
_res = self._es.count(self._index, self._doc_type, q)
_res = self._es.count(index=self._index, doc_type=self._doc_type, body=q)
return _res if raw else _res['count']
except NotFoundError:
return None
Expand All @@ -173,7 +173,7 @@ def count_src(self, src):

@wrapper
def create_index(self, mapping=None, extra_settings=None):
if not self._es.indices.exists(self._index):
if not self._es.indices.exists(index=self._index):
body = {
'settings': {
'number_of_shards': self.number_of_shards,
Expand Down Expand Up @@ -202,13 +202,13 @@ def _populate_es_version(self):

@wrapper
def exists_index(self):
return self._es.indices.exists(self._index)
return self._es.indices.exists(index=self._index)

def index(self, doc, id=None, action="index"): # pylint: disable=redefined-builtin
'''add a doc to the index. If id is not None, the existing doc will be
updated.
'''
self._es.index(self._index, self._doc_type, doc, id=id, params={"op_type": action})
self._es.index(index=self._index, doc_type=self._doc_type, body=doc, id=id, params={"op_type": action})

def index_bulk(self, docs, step=None, action='index'):

Expand Down Expand Up @@ -236,7 +236,7 @@ def _get_bulk(doc):

def delete_doc(self, id): # pylint: disable=redefined-builtin
'''delete a doc from the index based on passed id.'''
return self._es.delete(self._index, self._doc_type, id)
return self._es.delete(index=self._index, doc_type=self._doc_type, id=id)

def delete_docs(self, ids, step=None):
'''delete a list of docs in bulk.'''
Expand All @@ -256,7 +256,7 @@ def _get_bulk(_id):
return helpers.bulk(self._es, actions, chunk_size=step, stats_only=True, raise_on_error=False)

def delete_index(self):
self._es.indices.delete(self._index)
self._es.indices.delete(index=self._index)

def update(self, id, extra_doc, upsert=True): # pylint: disable=redefined-builtin
'''update an existing doc with extra_doc.
Expand All @@ -265,7 +265,7 @@ def update(self, id, extra_doc, upsert=True): # pylint: disable=redefin
body = {'doc': extra_doc}
if upsert:
body['doc_as_upsert'] = True
return self._es.update(self._index, self._doc_type, id, body)
return self._es.update(index=self._index, doc_type=self._doc_type, id=id, body=body)

def update_docs(self, partial_docs, upsert=True, step=None, **kwargs):
'''update a list of partial_docs in bulk.
Expand Down Expand Up @@ -333,7 +333,7 @@ def build_index(self, collection, verbose=True, query=None, bulk=True, update=Fa
"auto_expand_replicas": "0-all",
}
}
self._es.indices.put_settings(body, index_name)
self._es.indices.put_settings(body=body, index=index_name)
try:
self._build_index_sequential(collection, verbose, query=query, bulk=bulk, update=update, allow_upsert=True)
finally:
Expand All @@ -343,7 +343,7 @@ def build_index(self, collection, verbose=True, query=None, bulk=True, update=Fa
"refresh_interval": "1s" # default settings
}
}
self._es.indices.put_settings(body, index_name)
self._es.indices.put_settings(body=body, index=index_name)

try:
self._es.indices.flush()
Expand Down Expand Up @@ -486,7 +486,7 @@ def doc_feeder(self, step=None, verbose=True, query=None, scroll='10m', only_sou
# if verbose:
# t1 = time.time()

res = self._es.search(self._index, self._doc_type, body=q,
res = self._es.search(index=self._index, doc_type=self._doc_type, body=q,
size=_size, search_type='scan', scroll=scroll, **kwargs)
# double check initial scroll request returns no hits
# assert len(res['hits']['hits']) == 0
Expand All @@ -495,7 +495,7 @@ def doc_feeder(self, step=None, verbose=True, query=None, scroll='10m', only_sou
while True:
# if verbose:
# t1 = time.time()
res = self._es.scroll(res['_scroll_id'], scroll=scroll)
res = self._es.scroll(scroll_id=res['_scroll_id'], scroll=scroll)
# if len(res['hits']['hits']) == 0:
if not res['hits']['hits']:
break
Expand Down Expand Up @@ -584,9 +584,9 @@ def restore(self, repo_name, snapshot_name, index_name=None, purge=False, body=N
index_name = index_name or snapshot_name
if purge:
try:
self._es.indices.get(index_name)
self._es.indices.get(index=index_name)
# if we get there, it exists, delete it
self._es.indices.delete(index_name)
self._es.indices.delete(index=index_name)
except NotFoundError:
# no need to delete it,
pass
Expand Down Expand Up @@ -621,7 +621,7 @@ def get_snapshot_status(self, repo, snapshot):

def get_restore_status(self, index_name=None):
index_name = index_name or self._index
recov = self._es.indices.recovery(index_name)
recov = self._es.indices.recovery(index=index_name)
if index_name not in recov:
return {"status": "INIT", "progress": "0%"}
shards = recov[index_name]["shards"]
Expand Down Expand Up @@ -861,9 +861,9 @@ def create_if_needed(self, colname):
"number_of_replicas": 0,
}
}
if not conn.indices.exists(idxcolname):
conn.indices.create(idxcolname, body=body)
conn.indices.put_mapping(colname, {"dynamic": True}, index=idxcolname)
if not conn.indices.exists(index=idxcolname):
conn.indices.create(index=idxcolname, body=body)
conn.indices.put_mapping(doc_type=colname, body={"dynamic": True}, index=idxcolname)

def __getitem__(self, colname):
if colname not in self.cols:
Expand Down

0 comments on commit 336b14e

Please sign in to comment.