Skip to content

Commit

Permalink
support es7 for hubdb
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry committed Sep 11, 2021
1 parent 65c5e3f commit 21679fe
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 102 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ config.py
/config
/config_
!/tests/web/handlers/config.py
!/tests/hub/hubdb/config.py
.hubdb
249 changes: 147 additions & 102 deletions biothings/utils/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,150 +827,195 @@ def get_last_command():
return None


# ES7+ FOR HUB_DB
# IS *NOT* DESIGNED FOR
# MANAGING HEAVY WORKLOADS

class Database(IDatabase):

CONFIG = None # will be set by bt.utils.hub_db.setup()

def __init__(self):
super(Database, self).__init__()
self.name = self.CONFIG.DATA_HUB_DB_DATABASE
self.es_host = self.CONFIG.HUB_DB_BACKEND["host"]
self.cols = {}
self.host = self.CONFIG.HUB_DB_BACKEND["host"]
self.client = get_es(self.host)

if not self.client.indices.exists(index=self.name):
self.client.indices.create(index=self.name, body={
'settings': {
'number_of_shards': 1,
"number_of_replicas": 0,
},
'mappings': {
"enabled": False
}
})

@property
def address(self):
return self.es_host
return self.host

# ES API OPS

def setup(self):
pass
def _exists(self, _id):
return self.client.exists(self.name, _id)

def get_conn(self):
return get_es(self.es_host)
def _read(self, _id):
doc = self.client.get(self.name, _id)
return doc["_source"]

def _write(self, _id, doc):
assert doc.pop("_id", None) in (_id, None)
return self.client.index(self.name, doc, id=_id)

def _modify(self, _id, func):
doc = self._read(_id)
doc = func(doc) or doc
return self._write(_id, doc)

# HUB_DB ABSTRACTION

def create_collection(self, colname):
return self[colname]

def create_if_needed(self, colname):
conn = self.get_conn()
# add dot to make it a special index so it's hidden by default in ES gui
idxcolname = "%s_%s" % (self.name, colname)
# it's not usefull to scale internal hubdb
body = {
'settings': {
'number_of_shards': 1,
"number_of_replicas": 0,
}
}
if not conn.indices.exists(index=idxcolname):
conn.indices.create(index=idxcolname, body=body)
conn.indices.put_mapping(doc_type=colname, body={"dynamic": True}, index=idxcolname)

def __getitem__(self, colname):
if colname not in self.cols:
self.create_if_needed(colname)
self.cols[colname] = Collection(colname, self)
return self.cols[colname]
return Collection(colname, self)


class Collection(object):

def __init__(self, colname, db):
self.colname = colname
self.name = colname
self.db = db

def get_conn(self):
return self.db.get_conn()
if not self.db._exists(colname):
self._write({})

@property
def dbname(self):
return "%s_%s" % (self.db.name, self.colname)
# COLLECTION OPS

@property
def name(self):
return self.colname
def _read(self):
return self.db._read(self.name)

@property
def database(self):
return self.db
def _write(self, col):
return self.db._write(self.name, col)

# COLLECTION DOC OPS

def _modify(self, doc):
def func(collection):
collection[doc["_id"]] = doc
return self.db._modify(self.name, func)

# HUB_DB ABSTRACTION

def find_one(self, *args, **kwargs):
return self.find(*args, find_one=True)
results = self.find(*args, **kwargs)
if results:
return results[0]
return None

def find(self, filter=None, *args, **kwargs):

if args or kwargs:
raise NotImplementedError()

if filter and any("." in k for k in filter):
raise NotImplementedError()

def find(self, *args, **kwargs):
results = []
query = {}
# if args and len(args) == 1 and isinstance(args[0], dict) and len(args[0]) > 0:
if args and len(args) == 1 and isinstance(args[0], dict) and args[0]:
query = {"query": {"match": args[0]}}
# it's key/value search, let's iterate
res = self.get_conn().search(self.dbname, self.colname, query, size=10000)
for _src in res["hits"]["hits"]:
doc = {"_id": _src["_id"]}
doc.update(_src["_source"])
if "find_one" in kwargs:
return doc
else:
for doc in self._read().values():
for k, v in (filter or {}).items():
if doc.get(k) != v:
break
else: # no break
results.append(doc)
if "find_one" in kwargs:
# we didn't find it if we get there
return None

return results

def insert_one(self, doc, check_unique=True):
assert "_id" in doc
_id = doc.pop("_id")
res = self.get_conn().index(self.dbname, self.colname, doc, id=_id, refresh=True)
if check_unique and not res["result"] == "created":
raise Exception("Couldn't insert document '%s'" % doc)

def update_one(self, query, what, upsert=False):
assert (len(what) == 1 and ("$set" in what or "$unset" in what or "$push" in what)), \
"$set/$unset/$push operators not found"
doc = self.find_one(query)
def insert_one(self, document, *args, **kwargs):
if args or kwargs:
raise NotImplementedError()

assert "_id" in document
collection = self._read()
if document["_id"] in collection:
raise ValueError("Already exists.")

collection[document["_id"]] = document
self._write(collection)

def _update_one(self, doc, update, *args, **kwargs):
if args or kwargs:
raise NotImplementedError()

if not ((len(update) == 1 and next(iter(update)) in ("$set", "$unset", "$push"))):
raise NotImplementedError()

if "$set" in update:
_update = parse_dot_fields(update["$set"])
doc = update_dict_recur(doc, _update)
elif "$unset" in update:
for key in update["$unset"].keys():
doc.pop(key, None)
elif "$push" in update:
for listkey, elem in update["$push"].items():
assert "." not in listkey, "$push not supported for nested keys: %s" % listkey
doc.setdefault(listkey, []).append(elem)
self._modify(doc)

def update_one(self, filter, update, upsert=False, *args, **kwargs):

doc = self.find_one(filter)
if doc:
if "$set" in what:
# parse_dot_fields uses json.dumps internally, we can to make
# sure everything is serializable first
what = json.loads(json.dumps(what, default=json_serial))
what = parse_dot_fields(what["$set"])
doc = update_dict_recur(doc, what)
elif "$unset" in what:
for keytounset in what["$unset"].keys():
doc.pop(keytounset, None)
elif "$push" in what:
for listkey, elem in what["$push"].items():
assert "." not in listkey, "$push not supported for nested keys: %s" % listkey
doc.setdefault(listkey, []).append(elem)

self.save(doc)
self._update_one(doc, update, *args, **kwargs)
elif upsert:
assert "_id" in query, "Can't upsert without _id"
assert "$set" in what, "Upsert needs $set operator (it makes sense...)"
doc = what["$set"]
doc["_id"] = query["_id"]
self.save(doc)
assert "_id" in filter, "Can't upsert without _id"
assert "$set" in update, "Upsert needs $set operator"
doc = update["$set"]
doc["_id"] = filter["_id"]
self._modify(doc)
else:
raise ValueError("No Match.")

def update(self, query, what):
docs = self.find(query)
for doc in docs:
self.update_one({"_id": doc["_id"]}, what)
def update_many(self, filter, update, *args, **kwargs):
for doc in self.find(filter):
self._update_one(doc, update, *args, **kwargs)

def save(self, doc):
return self.insert_one(doc, check_unique=False)
def replace_one(self, filter, replacement, *args, **kwargs):
if args or kwargs:
raise NotImplementedError()

def replace_one(self, query, doc, *args, **kwargs):
orig = self.find_one(query)
if orig:
self.insert_one(doc, check_unique=False)
doc = self.find_one(filter)
if doc:
replacement["_id"] = doc["_id"]
self._modify(replacement)
else:
raise ValueError("No Match.")

def __getitem__(self, _id):
return self.find_one({"_id": _id})

# DEPRECATED
# -----------------

def update(self, *args, **kwargs):
self.update_many(*args, **kwargs)

def save(self, *args, **kwargs):
raise NotImplementedError("Use insert_one() or replace_one() instead.")

def remove(self, query):
# In the future,
# Use delete_one() or delete_many() instead.

docs = self.find(query)
conn = self.get_conn()
collection = self._read()
for doc in docs:
conn.delete(self.dbname, self.colname, id=doc["_id"], refresh=True)
del collection[doc["_id"]]
self._write(collection)

def count(self):
return self.get_conn().count(self.dbname, self.colname)["count"]

def __getitem__(self, _id):
return self.find_one({"_id": _id})
# In the future,
# Use count_documents() or estimated_document_count() instead.
return len(self._read())
21 changes: 21 additions & 0 deletions tests/hub/hubdb/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
HUB_DB_BACKEND = {
"module": "biothings.utils.es",
"host": "localhost:9200"
}

# db containing the following (internal use)
DATA_HUB_DB_DATABASE = "biothings"
# for metadata of each src collections
DATA_SRC_MASTER_COLLECTION = 'src_master'
# for src data download information
DATA_SRC_DUMP_COLLECTION = 'src_dump'
# for src data build information
DATA_SRC_BUILD_COLLECTION = 'src_build'
# for src data build configuration
DATA_SRC_BUILD_CONFIG_COLLECTION = 'src_build_config'
# for data plugins information
DATA_PLUGIN_COLLECTION = 'data_plugin'
# for api information (running under hub control)
API_COLLECTION = 'api'
EVENT_COLLECTION = "event"
CMD_COLLECTION = "cmd"
18 changes: 18 additions & 0 deletions tests/hub/hubdb/es.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
(biothings) PS C:\Users\...\biothings.api\tests\hub\hubdb> python
Python 3.9.5 (tags/v3.9.5:0a7dcbd, May 3 2021, 17:27:52) [MSC v.1928 64 bit (AMD64)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from biothings.hub import config
>>> from biothings.utils import hub_db
>>> src_build = hub_db.get_src_build()
>>> src_build.insert_one({"_id": "one", "comment": "hi"})
>>> src_build.find_one({"comment": "hi"})
>>> src_build.replace_one({"_id": "one"}, {"extra": "ok"})
As of 9/10/2021, hub_db module relies on biothings.hub import,
this makes unit testing a single backend without a config file
difficult. The command above is the easiest approach to run it.
"""

0 comments on commit 21679fe

Please sign in to comment.