Skip to content

Commit

Permalink
Merge pull request #326 from biothings/0.12.x
Browse files Browse the repository at this point in the history
This to prepare the biothings.api to 0.12.4 release
  • Loading branch information
everaldorodrigo committed Apr 2, 2024
2 parents 230b3f3 + fe004d3 commit ecdc720
Show file tree
Hide file tree
Showing 31 changed files with 739 additions and 233 deletions.
11 changes: 6 additions & 5 deletions biothings/hub/autoupdate/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from functools import partial
from typing import Optional

from elasticsearch import Elasticsearch, NotFoundError, RequestsHttpConnection
from elasticsearch import Elasticsearch, NotFoundError
from requests_aws4auth import AWS4Auth

import biothings.hub.dataload.uploader as uploader
Expand Down Expand Up @@ -89,7 +89,7 @@ def _get_es_client(self, es_host: str, auth: Optional[dict]):
Used by self._get_repository, self._create_repository
"""
es_conf = {
"timeout": 120,
"request_timeout": 120,
"max_retries": 3,
"retry_on_timeout": False,
}
Expand All @@ -103,7 +103,8 @@ def _get_es_client(self, es_host: str, auth: Optional[dict]):
"es",
)
es_conf["http_auth"] = AWS4Auth(*auth_args)
es_conf["connection_class"] = RequestsHttpConnection
# RequestsHttpConnection is not available in elasticsearch 8.x
# es_conf["connection_class"] = RequestsHttpConnection
elif auth["type"] == "http":
auth_args = (
auth["properties"]["username"],
Expand All @@ -118,7 +119,7 @@ def _get_es_client(self, es_host: str, auth: Optional[dict]):
def _get_repository(self, es_host: str, repo_name: str, auth: Optional[dict]):
es = self._get_es_client(es_host, auth)
try:
repo = es.snapshot.get_repository(repository=repo_name)
repo = es.snapshot.get_repository(name=repo_name)
except NotFoundError:
repo = None
return repo
Expand All @@ -128,7 +129,7 @@ def _create_repository(self, es_host: str, repo_name: str, repo_settings: dict,
Create Elasticsearch Snapshot repository
"""
es = self._get_es_client(es_host, auth)
es.snapshot.create_repository(repository=repo_name, body=repo_settings)
es.snapshot.create_repository(name=repo_name, body=repo_settings)

async def restore_snapshot(self, build_meta, job_manager, **kwargs):
self.logger.debug("Restoring snapshot...")
Expand Down
4 changes: 2 additions & 2 deletions biothings/hub/databuild/syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ def sync_es_jsondiff_worker(
eskwargs = {}
# pass optional ES Indexer args
if hasattr(btconfig, "ES_TIMEOUT"):
eskwargs["timeout"] = btconfig.ES_TIMEOUT
eskwargs["request_timeout"] = btconfig.ES_TIMEOUT
if hasattr(btconfig, "ES_MAX_RETRY"):
eskwargs["max_retries"] = btconfig.ES_MAX_RETRY
if hasattr(btconfig, "ES_RETRY"):
Expand Down Expand Up @@ -690,7 +690,7 @@ def sync_es_coldhot_jsondiff_worker(
eskwargs = {}
# pass optional ES Indexer args
if hasattr(btconfig, "ES_TIMEOUT"):
eskwargs["timeout"] = btconfig.ES_TIMEOUT
eskwargs["request_timeout"] = btconfig.ES_TIMEOUT
if hasattr(btconfig, "ES_MAX_RETRY"):
eskwargs["max_retries"] = btconfig.ES_MAX_RETRY
if hasattr(btconfig, "ES_RETRY"):
Expand Down
19 changes: 9 additions & 10 deletions biothings/hub/dataindex/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,7 @@ async def pre_index(self, *args, mode, **kwargs):
if mode in ("index", None):
# index MUST NOT exist
# ----------------------

if await client.indices.exists(self.es_index_name):
if await client.indices.exists(index=self.es_index_name):
msg = (
"Index '%s' already exists, (use mode='purge' to "
"auto-delete it or mode='resume' to add more documents)"
Expand All @@ -410,7 +409,7 @@ async def pre_index(self, *args, mode, **kwargs):
# index MUST exist
# ------------------

if not (await client.indices.exists(self.es_index_name)):
if not (await client.indices.exists(index=self.es_index_name)):
raise IndexerException("'%s' does not exist." % self.es_index_name)
self.logger.info(("Exists", self.es_index_name))
return # skip index creation
Expand All @@ -419,14 +418,14 @@ async def pre_index(self, *args, mode, **kwargs):
# index MAY exist
# -----------------

response = await client.indices.delete(self.es_index_name, ignore_unavailable=True)
response = await client.indices.delete(index=self.es_index_name, ignore_unavailable=True)
self.logger.info(("Deleted", self.es_index_name, response))

else:
raise ValueError("Invalid mode: %s" % mode)

response = await client.indices.create(
self.es_index_name,
index=self.es_index_name,
body={
"settings": (await self.es_index_settings.finalize(client)),
"mappings": (await self.es_index_mappings.finalize(client)),
Expand Down Expand Up @@ -594,7 +593,7 @@ def __init__(self, *args, **kwargs):
"host": "localhost:9200",
"indexer": {
"args": {
"timeout": 300,
"request_timeout": 300,
"retry_on_timeout": True,
"max_retries": 10,
},
Expand Down Expand Up @@ -638,7 +637,7 @@ def configure(self, conf):
# During heavy indexing, the following settings
# significantly increase the one-pass success rate.
esargs = {
"timeout": 300,
"request_timeout": 300,
"retry_on_timeout": True,
"max_retries": 10,
}
Expand Down Expand Up @@ -756,7 +755,7 @@ async def _update_meta(_meta):
async with AsyncElasticsearch(**env["args"]) as client:
doc_type = None
if int((await client.info())["version"]["number"].split(".")[0]) < 7:
mappings = client.indices.get_mapping(index_name)
mappings = client.indices.get_mapping(index=index_name)
mappings = mappings[index_name]["mappings"]
doc_type = next(iter(mappings.keys()))

Expand Down Expand Up @@ -785,7 +784,7 @@ async def _enhance(conf):
for name, env in self.register.items():
async with AsyncElasticsearch(**env["args"]) as client:
try:
indices = await client.indices.get("*")
indices = await client.indices.get(index="*")
except elasticsearch.exceptions.ConnectionError:
... # keep the hard-coded place-holders info
else: # replace the index key with remote info
Expand Down Expand Up @@ -842,7 +841,7 @@ async def fetch(index_name, env_name=None, limit=None):

async with AsyncElasticsearch(**env["args"]) as client:
try:
indices = await client.indices.get(index_name)
indices = await client.indices.get(index=index_name)
except Exception:
continue
for index_name, index_data in indices.items():
Expand Down
2 changes: 1 addition & 1 deletion biothings/hub/dataindex/indexer_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def test_registrar():

indexer = SimpleNamespace(
mongo_collection_name="mynews_202012280220_vsdevjdk", # must exists in DB
es_client_args=dict(hosts="localhost:9200"),
es_client_args=dict(hosts="http://localhost:9200"),
es_index_name="__index_name__",
logfile="/log/file",
conf_name="bc_news",
Expand Down
24 changes: 16 additions & 8 deletions biothings/hub/dataindex/snapshooter.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,17 @@ def __init__(self, job_manager, cloud, repository, indexer, **kwargs):
self.wtime = kwargs.get("monitor_delay", 15)

def _doc(self, index):
doc = get_src_build().find_one({f"index.{index}.environment": self.idxenv})
doc = get_src_build().find_one(
{f"index.{index}.environment": self.idxenv})
if not doc: # not asso. with a build
raise ValueError("Not a hub-managed index.")
return doc # TODO UNIQUENESS

def setup_log(self, index):
build_doc = self._doc(index)
log_name = build_doc["target_name"] or build_doc["_id"]
log_folder = os.path.join(btconfig.LOG_FOLDER, "build", log_name, "snapshot") if btconfig.LOG_FOLDER else None
log_folder = os.path.join(
btconfig.LOG_FOLDER, "build", log_name, "snapshot") if btconfig.LOG_FOLDER else None
self.logger, _ = get_logger(index, log_folder=log_folder, force=True)

def snapshot(self, index, snapshot=None, recreate_repo=False):
Expand All @@ -211,7 +213,8 @@ async def _snapshot(snapshot):

job = await self.job_manager.defer_to_thread(
self.pinfo.get_pinfo(step, snapshot),
partial(getattr(self, state.func), cfg, index, snapshot, recreate_repo=recreate_repo),
partial(getattr(self, state.func), cfg, index,
snapshot, recreate_repo=recreate_repo),
)
try:
dx = await job
Expand Down Expand Up @@ -256,7 +259,8 @@ def pre_snapshot(self, cfg, index, snapshot, **kwargs):
try:
repo.verify(config=cfg)
except TransportError as tex:
raise RepositoryVerificationFailed({"error": tex.error, "detail": tex.info["error"]})
raise RepositoryVerificationFailed(
{"error": tex.error, "detail": tex.info["error"]})

return {
"__REPLACE__": True,
Expand Down Expand Up @@ -330,7 +334,7 @@ class SnapshotManager(BaseManager):
"indexer": {
"name": "local",
"args": {
"timeout": 100,
"request_timeout": 100,
"max_retries": 5
}
},
Expand Down Expand Up @@ -463,7 +467,9 @@ def cleanup(
>>> snapshot_cleanup("s3_outbreak", keep=0)
"""

snapshots = cleaner.find(get_src_build(), env, keep, group_by, **filters) # filters support dotfield.
# filters support dotfield.
snapshots = cleaner.find(get_src_build(), env,
keep, group_by, **filters)

if dryrun:
return "\n".join(
Expand Down Expand Up @@ -495,10 +501,12 @@ def done(f):
jobs = []
try:
for environment, snapshot_names in snapshots_data.items():
job = self.job_manager.submit(partial(delete, environment, snapshot_names))
job = self.job_manager.submit(
partial(delete, environment, snapshot_names))
jobs.append(job)
tasks = asyncio.gather(*jobs)
tasks.add_done_callback(done)
except Exception as ex:
logging.exception("Error while deleting snapshots. error: %s", ex, extra={"notify": True})
logging.exception(
"Error while deleting snapshots. error: %s", ex, extra={"notify": True})
return jobs
10 changes: 5 additions & 5 deletions biothings/hub/dataindex/snapshot_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ def __init__(self, client, repository):

def exists(self):
try:
self.client.snapshot.get_repository(self.name)
self.client.snapshot.get_repository(name=self.name)
except elasticsearch.exceptions.NotFoundError:
return False
return True

def create(self, **body):
# https://www.elastic.co/guide/en/elasticsearch/plugins/current/repository-s3-client.html
return self.client.snapshot.create_repository(self.name, body=body)
return self.client.snapshot.create_repository(name=self.name, body=body)

def delete(self):
self.client.snapshot.delete_repository(self.name)
self.client.snapshot.delete_repository(name=self.name)

def verify(self, config):
"""A repository is consider properly setup and working, when:
Expand All @@ -32,10 +32,10 @@ def verify(self, config):
"""

# Check if the repo pass ElasticSearch's verification
self.client.snapshot.verify_repository(self.name)
self.client.snapshot.verify_repository(name=self.name)

# Check if the repo's settings match with the snapshot's config
repo_settings = self.client.snapshot.get_repository(self.name)[self.name]
repo_settings = self.client.snapshot.get_repository(name=self.name)[self.name]
incorrect_data = {}

for field in ["type", "settings"]:
Expand Down
18 changes: 9 additions & 9 deletions biothings/hub/dataindex/snapshot_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,28 @@ def exists(self):
if self.repository.exists():
return bool(
self.client.snapshot.get(
self.repository.name,
self.name,
repository=self.repository.name,
snapshot=self.name,
ignore_unavailable=True,
)["snapshots"]
)
return False

def create(self, indices):
self.client.snapshot.create(
self.repository.name,
self.name,
{
repository=self.repository.name,
snapshot=self.name,
body={
"indices": indices,
"include_global_state": False,
},
}
)

def state(self):
if self.repository.exists():
snapshots = self.client.snapshot.get(
self.repository.name,
self.name,
repository=self.repository.name,
snapshot=self.name,
ignore_unavailable=True,
)["snapshots"]

Expand All @@ -46,7 +46,7 @@ def state(self):
return "N/A"

def delete(self):
self.client.snapshot.delete(self.repository.name, self.name)
self.client.snapshot.delete(repository=self.repository.name, snapshot=self.name)

def __str__(self):
return f"<Snapshot {self.state()} name='{self.name}' repository={self.repository}>"
Expand Down
6 changes: 3 additions & 3 deletions biothings/hub/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@
},
"env": {
"localhub": {
"host": "localhost:9200",
"indexer": {"args": {"timeout": 300, "retry_on_timeout": True, "max_retries": 10}},
"host": "http://localhost:9200",
"indexer": {"args": {"request_timeout": 300, "retry_on_timeout": True, "max_retries": 10}},
}
},
}
Expand Down Expand Up @@ -327,7 +327,7 @@
STANDALONE_CONFIG = {
# default config
"_default": {
"es_host": "localhost:9200",
"es_host": "http://localhost:9200",
"index": "biothings_current",
},
# # custom definition
Expand Down
2 changes: 1 addition & 1 deletion biothings/tests/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def _setup_elasticsearch(self):
s.mount(
"http://", adapter=requests.adapters.HTTPAdapter(max_retries=urllib3.Retry(total=5, backoff_factor=3.0))
) # values seem reasonable
es_host = "http://" + self.config.ES_HOST
es_host = self.config.ES_HOST

server_info = s.get(es_host).json()
version_info = tuple(int(v) for v in server_info["version"]["number"].split("."))
Expand Down
5 changes: 5 additions & 0 deletions biothings/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,11 @@ def list2dict(a_list, keyitem, alwayslist=False):
return _dict


def list_trim(a_list, idx_to_remove):
"""Remove items IN-PLACE from a list by index."""
for i in sorted(idx_to_remove, reverse=True):
del a_list[i]

def filter_dict(d, keys):
"""
Remove keys from dict "d". "keys" is a list
Expand Down
Loading

0 comments on commit ecdc720

Please sign in to comment.