Skip to content

Commit

Permalink
Merge branch '0.10.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry committed Sep 15, 2021
2 parents 17d2a42 + ed574f7 commit 0282161
Showing 1 changed file with 46 additions and 4 deletions.
50 changes: 46 additions & 4 deletions biothings/utils/es.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import datetime
import itertools
import json
import re
Expand All @@ -7,6 +8,8 @@
from elasticsearch import (Elasticsearch, NotFoundError, RequestError,
TransportError, ElasticsearchException)
from elasticsearch import helpers
from elasticsearch.serializer import JSONSerializer
from importlib import import_module

from biothings.utils.common import iter_n, splitstr, nan, inf, merge, traverse
from biothings.utils.dataload import dict_walk
Expand Down Expand Up @@ -823,7 +826,7 @@ def __init__(self):
super(Database, self).__init__()
self.name = self.CONFIG.DATA_HUB_DB_DATABASE
self.host = self.CONFIG.HUB_DB_BACKEND["host"]
self.client = get_es(self.host)
self.client = Elasticsearch(self.host, serializer=_HubDBEncoder())

if not self.client.indices.exists(index=self.name):
self.client.indices.create(index=self.name, body={
Expand Down Expand Up @@ -890,7 +893,7 @@ def _write(self, col):
# COLLECTION DOC OPS

def _exists_one(self, _id):
return _id in self._read()
return str(_id) in self._read()

def _write_one(self, doc):
def func(collection):
Expand All @@ -917,12 +920,12 @@ def find(self, filter=None, projection=None, *args, **kwargs):
_doc.update(dict(traverse(doc, True)))
for k, v in (filter or {}).items():
if isinstance(v, dict) and "$exists" in v:
logging.error("Ingored filter: %s:%s", k, v)
logging.error("Ingored filter: {'%s': %s}", k, v)
continue
if _doc.get(k) != v:
break
else: # no break
results.append(doc)
results.append(_pyobj(doc))

if projection: # used by BuildManager.build_info
logging.error("Ignored projection: %s", projection)
Expand Down Expand Up @@ -1054,3 +1057,42 @@ def count(self):
# In the future,
# Use count_documents() or estimated_document_count() instead.
return len(self._read())


def _pyobj(doc): # ES doc -> Python object

for _, _doc in traverse(doc):
if isinstance(_doc, dict):
for k, v in list(_doc.items()):
_doc[k] = _eval(v)
elif isinstance(_doc, list):
_doc[:] = map(_eval, _doc)

return doc


_PY_REPR = re.compile(r"([\w.]+)\(.*\)")


def _eval(v):
try:
match = _PY_REPR.match(v)
if match:
clsstr = match.group(1)
modstr = clsstr.rsplit(".", 1)[0]
return eval(v, {modstr: import_module(modstr)})
except:
...

return v


class _HubDBEncoder(JSONSerializer):
TYPES = (
datetime.datetime,
# ...
)
def default(self, obj):
if isinstance(obj, self.TYPES):
return repr(obj)
return super().default(obj)

0 comments on commit 0282161

Please sign in to comment.