Skip to content

Commit

Permalink
snapshot is aware of missing repository
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry committed Aug 19, 2021
1 parent 6f24f2a commit 2c198f2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 35 deletions.
43 changes: 30 additions & 13 deletions biothings/hub/dataindex/snapshooter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import copy
import json
import time
from collections import UserDict
from collections import UserDict, UserString
from dataclasses import dataclass
from datetime import datetime
from functools import partial
Expand Down Expand Up @@ -90,6 +90,17 @@ def __str__(self):
f">"
)

class _UserString(UserString):

def __str__(self):
return f"{type(self).__name__}({self.data})"

class TemplateStr(_UserString):
...

class RenderedStr(_UserString):
...

class RepositoryConfig(UserDict):
"""
{
Expand Down Expand Up @@ -120,11 +131,16 @@ def format(self, doc=None):
where "_meta.build_version" value is taken from doc in
dot field notation, and the current year replaces "$(Y)".
"""
template = json.dumps(self.data)
string = template_out(template, doc or {})
template = TemplateStr(json.dumps(self.data))
string = RenderedStr(template_out(template.data, doc or {}))
if "%" in string:
logging.error(template)
logging.error(string)
raise ValueError("Failed to template.")
return RepositoryConfig(json.loads(string))
if template != string:
logging.debug(template)
logging.debug(string)
return RepositoryConfig(json.loads(string.data))


class _SnapshotResult(UserDict):
Expand Down Expand Up @@ -165,6 +181,7 @@ def snapshot(self, index, snapshot=None):
def _snapshot(snapshot):
x = CumulativeResult()
build_doc = self._doc(index)
cfg = self.repcfg.format(build_doc)
for step in ("pre", "snapshot", "post"):
state = registrar.dispatch(step) # _TaskState Class
state = state(get_src_build(), build_doc.get("_id"))
Expand All @@ -175,8 +192,7 @@ def _snapshot(snapshot):
self.pinfo.get_pinfo(step, snapshot),
partial(
getattr(self, state.func),
self.repcfg.format(build_doc),
index, snapshot
cfg, index, snapshot
))
try:
dx = yield from job
Expand Down Expand Up @@ -210,7 +226,7 @@ def pre_snapshot(self, cfg, index, snapshot):
if not bucket.exists():
bucket.create(cfg.get("acl", "private"))
logging.info(bucket)
repo.create(**cfg["settings"])
repo.create(**cfg)
logging.info(repo)

return {
Expand Down Expand Up @@ -242,11 +258,12 @@ def _snapshot(self, cfg, index, snapshot):

if state == "FAILED":
raise ValueError(state)
elif state == "IN_PROGRESS":
time.sleep(self.wtime)
elif state == "SUCCESS":
break

# Wait "IN_PROGRESS"
time.sleep(self.wtime)
else: # PARTIAL/MISSING/N/A
raise ValueError(state)

return {
"replaced": _replace,
Expand Down Expand Up @@ -322,7 +339,7 @@ def configure(self, conf):
dx = envdict["indexer"]

if isinstance(dx, str): # {"indexer": "prod"}
return dict(name=dx) # . ↓
dx = dict(name=dx) # .
if not isinstance(dx, dict): # {"indexer": {"name": "prod"}}
raise TypeError(dx)

Expand All @@ -345,13 +362,13 @@ def poll(self, state, func):
# Features
# -----------

def snapshot(self, snapshot_env, index, snapshot=None, **kwargs):
def snapshot(self, snapshot_env, index, snapshot=None):
"""
Create a snapshot named "snapshot" (or, by default, same name as the index)
from "index" according to environment definition (repository, etc...) "env".
"""
env = self.register[snapshot_env]
return env.snapshot(index, snapshot, **kwargs)
return env.snapshot(index, snapshot)

def snapshot_build(self, build_doc):
"""
Expand Down
10 changes: 5 additions & 5 deletions biothings/hub/dataindex/snapshot_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,26 @@ def __init__(self, client, repository):
# /_snapshot/<repository>

self.client = client
self.repository = repository
self.name = repository

def exists(self):
try:
self.client.snapshot.get_repository(self.repository)
self.client.snapshot.get_repository(self.name)
except elasticsearch.exceptions.NotFoundError:
return False
return True

def create(self, **body):
# https://www.elastic.co/guide/en/elasticsearch/plugins/current/repository-s3-client.html
return self.client.snapshot.create_repository(self.repository, **body)
return self.client.snapshot.create_repository(self.name, body=body)

def delete(self):
self.client.snapshot.delete_repository(self.repository)
self.client.snapshot.delete_repository(self.name)

def __str__(self):
return (
f"<Repository {'READY' if self.exists() else 'MISSING'}"
f" name='{self.repository}'"
f" name='{self.name}'"
f" client={self.client}"
f">"
)
Expand Down
38 changes: 21 additions & 17 deletions biothings/hub/dataindex/snapshot_task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from biothings.hub.dataindex.snapshot_repo import Repository

class Snapshot():

Expand All @@ -6,45 +7,48 @@ def __init__(self, client, repository, snapshot):
# /_snapshot/<repository>/<snapshot>

self.client = client
self.repository = repository
self.snapshot = snapshot
self.repository = Repository(client, repository)
self.name = snapshot

def exists(self):
return bool(self.client.snapshot.get(
self.repository, self.snapshot,
ignore_unavailable=True
)["snapshots"])
if self.repository.exists():
return bool(self.client.snapshot.get(
self.repository.name, self.name,
ignore_unavailable=True
)["snapshots"])
return False

def create(self, indices):
self.client.snapshot.create(
self.repository, self.snapshot,
self.repository.name, self.name,
{
"indices": indices,
"include_global_state": False
}
)

def state(self):
snapshots = self.client.snapshot.get(
self.repository, self.snapshot,
ignore_unavailable=True
)["snapshots"]
if self.repository.exists():
snapshots = self.client.snapshot.get(
self.repository.name, self.name,
ignore_unavailable=True
)["snapshots"]

if snapshots: # [{...}]
return snapshots[0]["state"]
if snapshots: # [{...}]
return snapshots[0]["state"]
return "MISSING"

return "N/A"

def delete(self):
self.client.snapshot.delete(
self.repository, self.snapshot)
self.repository.name, self.name)

def __str__(self):
return (
f"<Snapshot {self.state()}"
f" repository='{self.repository}'"
f" snapshot='{self.snapshot}'"
f" client={self.client}"
f" name='{self.name}'"
f" repository={self.repository}"
f">"
)

Expand Down

0 comments on commit 2c198f2

Please sign in to comment.