Skip to content

Commit

Permalink
Merge branch '0.10.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry committed Sep 2, 2021
2 parents 336b14e + 194f0e0 commit 6242f97
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 35 deletions.
42 changes: 22 additions & 20 deletions biothings/hub/dataindex/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,20 @@ class Step(abc.ABC):
method: property(abc.abstractmethod(lambda _: ...))
catelog = dict()

@staticmethod
def order(steps):
if isinstance(steps, str):
return (yield from Step.order([steps]))
for _step in ("pre", "index", "post"):
if _step in steps:
yield _step

def __init__(self, indexer):
self.indexer = indexer
self.state = self.state(
get_src_build(),
indexer.build_name,
indexer.es_index_name,
logfile=indexer.logfile)

@classmethod
Expand Down Expand Up @@ -338,14 +347,9 @@ def index(self, job_manager, **kwargs):
mode = kwargs.setdefault("mode", "index")
ids = kwargs.setdefault("ids", None)

if isinstance(steps, str):
steps = [steps]

assert job_manager
assert all(isinstance(_id, str) for _id in ids) if ids else True
assert 500 <= batch_size <= 10000, '"batch_size" out-of-range'
assert isinstance(steps, (list, tuple)), 'bad argument "steps"'
assert isinstance(mode, str), 'bad argument "mode"'

# the batch size here controls only the task partitioning
# it does not affect how the elasticsearch python client
Expand All @@ -356,7 +360,7 @@ def index(self, job_manager, **kwargs):
# inefficient, amplifying the scheduling overhead.

x = IndexerCumulativeResult()
for step in steps:
for step in Step.order(steps):
step = Step.dispatch(step)(self)
self.logger.info(step)
step.state.started()
Expand All @@ -372,9 +376,7 @@ def index(self, job_manager, **kwargs):
merge(x.data, dx.data)
self.logger.info(dx)
self.logger.info(x)
step.state.succeed({
self.es_index_name: x.data
})
step.state.succeed(x.data)

return x

Expand Down Expand Up @@ -520,12 +522,12 @@ def post_index(self, *args, **kwargs):


class ColdHotIndexer():
"""
This indexer works with 2 mongo collections to create a single index.
- one premerge collection contains "cold" data, which never changes (not updated)
- another collection contains "hot" data, regularly updated
Index is created fetching the premerge documents. Then, documents from the hot collection
are merged by fetching docs from the index, updating them, and putting them back in the index.
""" MongoDB to Elasticsearch 2-pass Indexer.
(
1st pass: <MongoDB Cold Collection>, # static data
2nd pass: <MongoDB Hot Collection> # changing data
) =>
<Elasticsearch Index>
"""

# "ColdHotIndexer" is not a subclass of the "Indexer".
Expand All @@ -545,20 +547,20 @@ def __init__(self, build_doc, indexer_env, index_name):
def index(self,
job_manager,
batch_size=10000,
ids=None,
mode="index",
steps=("pre", "index", "post"),
ids=None, mode=None,
**kwargs):

result = []

cold_task = self.cold.index(
job_manager, steps=("pre", "index"),
job_manager, steps=set(Step.order(steps)) & {"pre", "index"},
batch_size=batch_size, ids=ids, mode=mode)
result.append((yield from cold_task))

hot_task = self.hot.index(
job_manager, steps=("index", "post"),
batch_size=batch_size, ids=ids, mode="merge")
job_manager, steps=set(Step.order(steps)) & {"index", "post"},
batch_size=batch_size, ids=ids, mode=mode or "merge")
result.append((yield from hot_task))

return result
Expand Down
24 changes: 16 additions & 8 deletions biothings/hub/dataindex/indexer_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ class Stage(Enum):
STARTED = 1
DONE = 2

# an implementation like this should be further
# generalized to replace utils.manager.BaseStatusRegisterer
def at(self, stage):
assert self == stage

# IndexJobStateRegistrar CAN be further generalized
# to replace utils.manager.BaseStatusRegisterer

class IndexJobStateRegistrar():

def __init__(self, collection, build_name, **context):
def __init__(self, collection, build_name, index_name, **context):

self.collection = collection
self.build_id = build_name

self.index_name = index_name
self.context = context

self.stage = Stage.READY
Expand Down Expand Up @@ -49,7 +54,7 @@ def prune(collection):

def started(self, step="index"):

assert self.stage == Stage.READY
self.stage.at(Stage.READY)
self.stage = Stage.STARTED

self.t0 = time.time()
Expand Down Expand Up @@ -77,12 +82,14 @@ def func(job, delta_build):
def succeed(self, result):
def func(job, delta_build):
job["status"] = "success"
delta_build["index"] = result
if result:
delta_build["index"] = {
self.index_name: result}
self._done(func)

def _done(self, func):

assert self.stage == Stage.STARTED
self.stage.at(Stage.STARTED)
self.stage = Stage.DONE

build = self.collection.find_one({'_id': self.build_id})
Expand All @@ -106,12 +113,13 @@ def started(self):
def succeed(self, result):
# no result registration on pre-indexing step.
# --------------------------------------------
# registration indicates the existance of
# registration indicates the creation of
# the index on the elasticsearch server.
# thus failure at the post-index stage means
# registration of the index state up until the
# indexing step, but success at the pre-index
# stage only means no registration at all.
# stage suggests no index created and thus
# no registration at all.
super().succeed({})

class MainIndexJSR(IndexJobStateRegistrar):
Expand Down
6 changes: 3 additions & 3 deletions biothings/hub/dataindex/indexer_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ class IndexingTask():
The documents to index are specified by their ids.
"""

def __init__(self, es, mongo, ids, mode='index', logger=None, name="task"):
def __init__(self, es, mongo, ids, mode=None, logger=None, name="task"):

assert callable(es)
assert callable(mongo)

self.ids = _validate_ids(ids)
self.mode = Mode(mode)
self.mode = Mode(mode or 'index')

# these are functions to create clients,
# each also associated with an organizational
Expand Down Expand Up @@ -195,7 +195,7 @@ def index(self):
query={'_id': {
'$in': self.ids
}})
self.logger.info("%s: Indexing %d documents.", self.name, len(self.ids))
self.logger.info("%s: %d documents.", self.name, len(self.ids))
return clients.es.mindex(docs)

def merge(self):
Expand Down
4 changes: 0 additions & 4 deletions biothings/hub/dataindex/snapshot_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import logging
from datetime import datetime
from typing import Type
from xml.etree import ElementTree

from biothings.utils.dataload import file_merge


class Cleaner():

Expand Down

0 comments on commit 6242f97

Please sign in to comment.