Skip to content

Commit

Permalink
add snapshot_cleanup command
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry committed Sep 5, 2021
1 parent 6242f97 commit a8dabd1
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 76 deletions.
3 changes: 2 additions & 1 deletion biothings/hub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,8 @@ def configure_commands(self):
self.commands["index"] = self.managers["index_manager"].index
self.commands["index_cleanup"] = self.managers["index_manager"].cleanup
if self.managers.get("snapshot_manager"):
self.commands["snapshot"] = self.managers[ "snapshot_manager"].snapshot
self.commands["snapshot"] = self.managers["snapshot_manager"].snapshot
self.commands["snapshot_cleanup"] = self.managers["snapshot_manager"].cleanup
# data release commands
if self.managers.get("release_manager"):
self.commands["create_release_note"] = self.managers[
Expand Down
29 changes: 29 additions & 0 deletions biothings/hub/dataindex/snapshooter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from config import logger as logging

from . import snapshot_cleanup as cleaner
from . import snapshot_registrar as registrar
from .snapshot_repo import Repository
from .snapshot_task import Snapshot
Expand Down Expand Up @@ -423,3 +424,31 @@ def _():

def snapshot_info(self, env=None, remote=False):
return self.snapshot_config

def cleanup(
self, env=None, # a snapshot environment describing a repository
keep=3, # the number of most recent snapshots to keep in one group
group_by="build_config", # the attr of which its values form groups
dryrun=True, # display the snapshots to be deleted without deleting them
**filters # a set of criterions to limit which snapshots are to be cleaned
):
""" Delete past snapshots and keep only the most recent ones.
Examples:
>>> snapshot_cleanup()
>>> snapshot_cleanup("s3_outbreak")
>>> snapshot_cleanup("s3_outbreak", keep=0)
"""

snapshots = cleaner.find( # filters support dotfield.
get_src_build(), env, keep, group_by, **filters)

if dryrun:
return '\n'.join((
"-" * 75, str(snapshots), "-" * 75,
"DRYRUN ONLY - APPLY THE ACTIONS WITH:",
" > snapshot_cleanup(..., dryrun=False)"
))

# return the number of snapshots successfully deleted
return cleaner.delete(get_src_build(), snapshots, self)
164 changes: 89 additions & 75 deletions biothings/hub/dataindex/snapshot_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,52 @@
import logging
import xml.dom.minidom
from typing import NamedTuple
from xml.etree import ElementTree

from elasticsearch import Elasticsearch

class Cleaner():

def __init__(self, collection, snapenvs, indexers, logger=None):
class _Ele(NamedTuple): # Cleanup Element
tag: str
attrs: dict
elems: list

self.collection = collection # pymongo.collection.Collection
self.snapenvs = snapenvs # hub.dataindex.snapshooter.SnapshotManager
self.indexers = indexers # hub.dataindex.indexer.IndexManager
self.logger = logger or logging.getLogger(__name__)
@classmethod
def ment(cls, tag, attrs, content): # _Ele.ment(..) :)
return _Ele(tag, attrs, [_Ele.ment(*e) for e in content])

def to_xml(self):
attrs = self.attrs.copy()

def find(collection, env=None, keep=3, group_by="build_config", **filters):
if isinstance(group_by, str):
group_by = "$" + group_by
if self.tag in ("CleanUps", "Remove", "Keep"):
attrs["size"] = str(len(self.elems))

if self.tag == "Snapshot":
attrs = {
"_id": attrs["_id"],
"build_name": attrs["build_name"],
"created_at": str(attrs["created_at"]),
"env": attrs.get("environment") or "N/A"
}

root = ElementTree.Element(self.tag, attrs)
for elem in self.elems:
root.append(elem.to_xml())
return root

def __str__(self):
ets = ElementTree.tostring(self.to_xml())
dom = xml.dom.minidom.parseString(ets)
return dom.toprettyxml(indent=" "*2)


def find(collection, env=None, keep=3, group_by=None, **filters):
if isinstance(group_by, (str, type(None))):
group_by = "$" + (group_by or "build_config")
elif isinstance(group_by, (list, tuple)):
group_by = {k.replace('.', '_'): "$" + k for k in group_by}

results = list(collection.aggregate([
groups = list(collection.aggregate([
{'$project': {
'build_config': '$build_config._id',
'snapshot': {'$objectToArray': '$snapshot'}}},
Expand All @@ -33,23 +61,23 @@ def find(collection, env=None, keep=3, group_by="build_config", **filters):
{'$group': {'_id': group_by, 'items': {"$push": "$$ROOT"}}}
]))

return (
return _Ele.ment(
"CleanUps", {},
[("Group", _expand(doc["_id"], group_by),
[("Group", _expand(group["_id"], group_by),
[("Remove", {},
[("Snapshot", _asattr(_doc, group_by), [])
for _doc in _remove(doc, keep)]),
[("Snapshot", _doc, [])
for _doc in _remove(group, keep)]),
("Keep", {},
[("Snapshot", _asattr(_doc, group_by), [])
for _doc in _keep(doc, keep)])]
) for doc in results
[("Snapshot", _doc, [])
for _doc in _keep(group, keep)])]
) for group in groups
])

def _expand(doc_id, group_by):
if isinstance(group_by, str):
return _asattr({group_by.strip("$"): doc_id})
if isinstance(group_by, dict):
return _asattr(doc_id)
def _expand(group_id, group_by):
if isinstance(group_id, str):
return {group_by.strip("$"): group_id}
if isinstance(group_id, dict):
return group_id
raise TypeError()

def _keep(doc, keep):
Expand All @@ -58,47 +86,42 @@ def _keep(doc, keep):
def _remove(doc, keep):
return doc["items"][:-keep or len(doc["items"])]

def _asattr(doc, filters=None):
_doc = {}
for k, v in doc.items():

# firstly, remove irrelevant information
# -----------------------------------------
# snapshot step result, only indicate step success.
if k in ("pre", "snapshot", "post"):
continue
# this key, if present, is a duplicate in conf key.
if k == "repository":
continue
# no need to repeat filters, they must be the same.
if isinstance(filters, dict):
if k in filters:
continue
elif isinstance(filters, str):
if k == filters.strip("$"):
continue

# secondly, make special objects concise
# ------------------------------------------
if v or not filters:
if isinstance(v, dict):
v = "{...}"
elif v is None:
v = ""
else: # like datetime ...
v = str(v)
_doc[k] = v
return _doc


def to_xml(x):
x[1]["len"] = str(len(x[2]))
root = ElementTree.Element(x[0], x[1])
for _x in x[2]:
root.append(to_xml(_x))
return root

# the operations below are not made async
# because SnapshotEnv.client is not async

def delete(collection, element, envs):
cnt = 0
assert element.tag == "CleanUps"
for group in element.elems:
for catagory in group.elems:
if catagory.tag == "Remove":
for snapshot in catagory.elems:
_delete(collection, snapshot, envs)
cnt += 1
return cnt


def _delete(collection, snapshot, envs):
assert snapshot.tag == "Snapshot"

if "environment" in snapshot.attrs:
env = snapshot.attrs["environment"]
client = envs[env].client
else: # legacy format
env = snapshot.attrs["conf"]["indexer"]["env"]
env = envs.index_manager[env]
client = Elasticsearch(**env["args"])

client.snapshot.delete(
snapshot.attrs["conf"]["repository"]["name"],
snapshot.attrs["_id"])

collection.update_one(
{"_id": snapshot.attrs["build_name"]},
{"$unset": {f"snapshot.{snapshot.attrs['_id']}": 1}}
)

def test_find():
from pymongo import MongoClient
logging.basicConfig(level="DEBUG")
Expand All @@ -110,26 +133,17 @@ def test_find():

client = MongoClient("su06")
collection = client["outbreak_hubdb"]["src_build"]
# cleaner = Cleaner(collection, {"local": {"args": {}}}, {})
obj = find(collection)
printxml(to_xml(obj))
# print(obj)
# return cleaner, obj

print(find(collection))


def test_print():
printxml(to_xml((
print(_Ele.ment(
"A", {}, [
("AA", {}, []),
("AB", {"ABC": "D"}, [])
]
)))

def printxml(element):
import xml.dom.minidom
dom = xml.dom.minidom.parseString(ElementTree.tostring(element))
with open("output.xml", "w") as file:
file.write(dom.toprettyxml(indent=" "*2))
# print(dom.toprettyxml(indent=" "*2))
))


if __name__ == '__main__':
Expand Down

0 comments on commit a8dabd1

Please sign in to comment.