Skip to content

Commit

Permalink
sqlite3 internal/config backend done + convert exiting mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
sirloon committed Jun 26, 2017
1 parent 1be8b81 commit ecb98dd
Show file tree
Hide file tree
Showing 13 changed files with 175 additions and 252 deletions.
9 changes: 9 additions & 0 deletions biothings/__init__.py
Expand Up @@ -15,6 +15,15 @@ def config_for_app(config_mod):
# (but "import biothings.config" won't b/c not a real module within biothings
globals()["config"] = config_mod
config.APP_PATH = app_path
if not hasattr(config_mod,"CONFIG_BACKEND"):
import biothings.utils.mongo
config.internal_backend = biothings.utils.mongo
else:
import importlib
config.internal_backend = importlib.import_module(config.CONFIG_BACKEND["module"])
import biothings.utils.internal_backend
biothings.utils.internal_backend.setup()


def get_loop(max_workers=None):
loop = asyncio.get_event_loop()
Expand Down
5 changes: 4 additions & 1 deletion biothings/databuild/backend.py
Expand Up @@ -7,6 +7,7 @@
from biothings.utils.backend import DocBackendBase, DocMongoBackend, DocESBackend
from biothings.utils.es import ESIndexer
import biothings.utils.mongo as mongo
import biothings.utils.internal_backend as ib

# Source specific backend (deals with build config, master docs, etc...)
class SourceDocBackendBase(DocBackendBase):
Expand Down Expand Up @@ -127,7 +128,9 @@ def get_src_metadata(self,src_filter=[]):
srcs = []
if self.sources_accessed:
for src in self.sources_accessed:
doc = self.dump.find_one({"$where":"function() {if(this.upload) {for(var index in this.upload.jobs) {if(this.upload.jobs[index].step == \"%s\") return this;}}}" % src})
fullname = ib.get_source_fullname(src)
main_name = fullname.split(".")[0]
doc = self.dump.find_one({"_id":main_name})
srcs.append(doc["_id"])
srcs = list(set(srcs))
else:
Expand Down
29 changes: 15 additions & 14 deletions biothings/databuild/builder.py
Expand Up @@ -20,6 +20,7 @@
from biothings.utils.manager import BaseManager, ManagerError
from biothings.utils.dataload import update_dict_recur
import biothings.utils.mongo as mongo
import biothings.utils.internal_backend as ib
import biothings.databuild.backend as backend
from biothings.databuild.backend import TargetDocMongoBackend
from biothings import config as btconfig
Expand Down Expand Up @@ -185,12 +186,13 @@ def check_ready(self,force=False):
_cfg = src_build_config.find_one({'_id': self.build_config['_id']})
# check if all resources are uploaded
for src_name in _cfg["sources"]:
# "sources" in config is a list a collection names. src_dump _id is the name of the
# resource but can have sub-resources with different collection names. We need
# to query inner keys upload.job.*.step, which always contains the collection name
src_doc = src_dump.find_one({"$where":"function() {if(this.upload) {for(var index in this.upload.jobs) {if(this.upload.jobs[index].step == \"%s\") return this;}}}" % src_name})
fullname = ib.get_source_fullname(src_name)
if not fullname:
raise ResourceNotReady("Can't find source '%s'" % src_name)
main_name = fullname.split(".")[0]
src_doc = src_dump.find_one({"_id":main_name})
if not src_doc:
raise ResourceNotReady("Missing information for source '%s' to start upload" % src_name)
raise ResourceNotReady("Missing information for source '%s' to start merging" % src_name)
if not src_doc.get("upload",{}).get("jobs",{}).get(src_name,{}).get("status") == "success":
raise ResourceNotReady("No successful upload found for resource '%s'" % src_name)

Expand Down Expand Up @@ -693,7 +695,6 @@ def post_merge(self, source_names, batch_size, job_manager):


from biothings.utils.backend import DocMongoBackend
import biothings.utils.mongo as mongo

def merger_worker(col_name,dest_name,ids,mapper,upsert,batch_num):
try:
Expand All @@ -719,7 +720,7 @@ def merger_worker(col_name,dest_name,ids,mapper,upsert,batch_num):


def set_pending_to_build(conf_name=None):
src_build_config = mongo.get_src_build_config()
src_build_config = ib.get_src_build_config()
qfilter = {}
if conf_name:
qfilter = {"_id":conf_name}
Expand All @@ -744,14 +745,14 @@ def __init__(self,source_backend_factory=None,
same arguments as the base DataBuilder
"""
super(BuilderManager,self).__init__(*args,**kwargs)
self.src_build_config = mongo.get_src_build_config()
self.src_build_config = ib.get_src_build_config()
self.source_backend_factory = source_backend_factory
self.target_backend_factory = target_backend_factory
self.builder_class = builder_class
self.poll_schedule = poll_schedule
self.setup_log()
# check if src_build exist and create it as necessary
if not self.src_build_config.name in self.src_build_config.database.collection_names():
if self.src_build_config.count() == 0:
logging.debug("Creating '%s' collection (one-time)" % self.src_build_config.name)
self.src_build_config.database.create_collection(self.src_build_config.name)
# this is dummy configuration, used as a template
Expand All @@ -773,10 +774,10 @@ def create(build_name):
from biothings import config
source_backend = self.source_backend_factory and self.source_backend_factory() or \
partial(backend.SourceDocMongoBackend,
build_config=partial(mongo.get_src_build_config),
build=partial(mongo.get_src_build),
master=partial(mongo.get_src_master),
dump=partial(mongo.get_src_dump),
build_config=partial(ib.get_src_build_config),
build=partial(ib.get_src_build),
master=partial(ib.get_src_master),
dump=partial(ib.get_src_dump),
sources=partial(mongo.get_src_db))

# declare target backend
Expand Down Expand Up @@ -853,7 +854,7 @@ def clean_temp_collections(self,build_name,date=None,prefix=''):
def poll(self):
if not self.poll_schedule:
raise ManagerError("poll_schedule is not defined")
src_build_config = mongo.get_src_build_config()
src_build_config = ib.get_src_build_config()
@asyncio.coroutine
def check_pending_to_build():
confs = [src['_id'] for src in src_build_config.find({'pending_to_build': True}) if type(src['_id']) == str]
Expand Down
3 changes: 2 additions & 1 deletion biothings/databuild/syncer.py
Expand Up @@ -12,7 +12,8 @@
from elasticsearch.exceptions import NotFoundError, ConflictError

from biothings.utils.common import timesofar, iter_n, loadobj, dump
from biothings.utils.mongo import doc_feeder, get_target_db, get_src_build, invalidate_cache
from biothings.utils.mongo import doc_feeder, get_target_db, invalidate_cache
from biothings.utils.internal_backend import get_src_build
from biothings.utils.loggers import get_logger, HipchatHandler
from biothings import config as btconfig
from biothings.utils.manager import BaseManager, ManagerError
Expand Down
5 changes: 3 additions & 2 deletions biothings/dataindex/indexer.py
Expand Up @@ -8,6 +8,7 @@
from functools import partial

import biothings.utils.mongo as mongo
import biothings.utils.internal_backend as ib
import biothings.utils.aws as aws
from biothings.utils.common import timesofar
from biothings.utils.loggers import HipchatHandler, get_logger
Expand All @@ -29,7 +30,7 @@ class IndexerManager(BaseManager):
def __init__(self, pindexer, *args, **kwargs):
super(IndexerManager,self).__init__(*args, **kwargs)
self.pindexer = pindexer
self.src_build = mongo.get_src_build()
self.src_build = ib.get_src_build()
self.target_db = mongo.get_target_db()
self.t0 = time.time()
self.prepared = False
Expand Down Expand Up @@ -432,7 +433,7 @@ def get_build_version(self):
def load_build(self, target_name=None):
'''Load build info from src_build collection.'''
target_name = target_name or self.target_name
src_build = mongo.get_src_build()
src_build = ib.get_src_build()
self.build_doc = src_build.find_one({'_id': target_name})
assert self.build_doc, "Can't find build document associated to '%s'" % target_name
_cfg = self.build_doc.get("build_config")
Expand Down
2 changes: 1 addition & 1 deletion biothings/dataload/dumper.py
Expand Up @@ -4,7 +4,7 @@
import asyncio
from functools import partial

from biothings.utils.mongo import get_src_dump
from biothings.utils.internal_backend import get_src_dump
from biothings.utils.common import timesofar
from biothings.utils.loggers import HipchatHandler
from config import logger as logging, HIPCHAT_CONFIG, LOG_FOLDER
Expand Down
14 changes: 8 additions & 6 deletions biothings/dataload/uploader.py
Expand Up @@ -5,7 +5,8 @@
from functools import wraps, partial

from biothings.utils.common import get_timestamp, get_random_string, timesofar, iter_n
from biothings.utils.mongo import get_src_conn, get_src_dump
from biothings.utils.internal_backend import get_src_dump, get_src_master
from biothings.utils.mongo import get_src_conn
from biothings.utils.dataload import merge_struct
from biothings.utils.manager import BaseSourceManager, \
ManagerError, ResourceNotFound
Expand Down Expand Up @@ -149,6 +150,7 @@ def prepare(self,state={}):
self._state["db"] = self.conn[self.__class__.__database__]
self._state["collection"] = self.db[self.collection_name]
self._state["src_dump"] = self.prepare_src_dump()
self._state["src_master"] = get_src_master()
self._state["logger"] = self.setup_log()
self.data_folder = self.src_doc.get("data_folder")
# flag ready
Expand All @@ -165,6 +167,7 @@ def unprepare(self):
"conn" : self._state["conn"],
"collection" : self._state["collection"],
"src_dump" : self._state["src_dump"],
"src_master" : self._state["src_master"],
"logger" : self._state["logger"]
}
for k in state:
Expand Down Expand Up @@ -293,13 +296,12 @@ def update_master(self):
self.save_doc_src_master(_doc)

def save_doc_src_master(self,_doc):
coll = self.conn[DocSourceMaster.__database__][DocSourceMaster.__collection__]
dkey = {"_id": _doc["_id"]}
prev = coll.find_one(dkey)
prev = self.src_master.find_one(dkey)
if prev:
coll.replace_one(dkey, _doc)
self.src_master.replace_one(dkey, _doc)
else:
coll.insert_one(_doc)
self.src_master.insert_one(_doc)

def register_status(self,status,**extra):
"""
Expand Down Expand Up @@ -676,7 +678,7 @@ def done(f):
return jobs
except Exception as e:
self.register_status(src,"failed",err=str(e))
self.logger.exception("Error while uploading '%s': %s" % (src,e),extra={"notify":True})
logging.exception("Error while uploading '%s': %s" % (src,e),extra={"notify":True})
raise

@asyncio.coroutine
Expand Down
15 changes: 14 additions & 1 deletion biothings/utils/backend.py
@@ -1,4 +1,6 @@
''' Backend access class. '''
from functools import partial

from biothings.utils.es import ESIndexer
from biothings import config as btconfig
from elasticsearch.exceptions import NotFoundError
Expand Down Expand Up @@ -223,7 +225,18 @@ class DocESBackend(DocBackendBase):

def __init__(self, esidxer=None):
"""esidxer is an instance of utils.es.ESIndexer class."""
self.target_esidxer = esidxer
if type(esidxer) == partial:
self._target_esidxer_provider = esidxer
self._target_esidxer= None
else:
_target_esidxer_provider = None
self._target_esidxer = esidxer

@property
def target_esidxer(self):
if not self._target_esidxer:
self._target_esidxer = self._target_esidxer_provider()
return self._target_esidxer

@property
def target_name(self):
Expand Down
2 changes: 1 addition & 1 deletion biothings/utils/dataload.py
Expand Up @@ -520,7 +520,7 @@ def updated_dict(_dict, attrs):
def update_dict_recur(d,u):
"""
Update dict d with dict u's values, recursively
(so existing values d but not in u are kept even if nested)
(so existing values in d but not in u are kept even if nested)
"""
for k, v in u.items():
if isinstance(v, collections.Mapping):
Expand Down
52 changes: 11 additions & 41 deletions biothings/utils/internal_backend.py
@@ -1,33 +1,37 @@
from biothings import config


get_src_conn = None
get_config_conn = None
get_src_dump = None
get_src_master = None
get_src_build = None
get_src_build_config = None
get_source_fullname = None


def setup():
global get_src_conn
global get_config_conn
global get_src_dump
global get_src_master
global get_src_build
global get_src_build_config
get_src_conn = config.internal_backend.get_src_conn
global get_source_fullname
get_config_conn = config.internal_backend.get_config_conn
get_src_dump = config.internal_backend.get_src_dump
get_src_master = config.internal_backend.get_src_master
get_src_build = config.internal_backend.get_src_build
get_src_build_config = config.internal_backend.get_src_build_config
get_source_fullname = config.internal_backend.get_source_fullname


class Connection(object):
class IConnection(object):
"""
This class mimicks / is a mock for mongokit.Connection class,
used to keep used interface (registering document model for instance)
This class declares an interface and partially implements some of it,
mimicking mongokit.Connection class. It's used to keep used document model.
Any internal backend should implement (derives) this interface
"""
def __init__(self, *args, **kwargs):
super(Connection,self).__init__(*args,**kwargs)
super(IConnection,self).__init__(*args,**kwargs)
self._registered_documents = {}
def register(self, obj):
self._registered_documents[obj.__name__] = obj
Expand All @@ -41,37 +45,3 @@ def __getattr__(self,key):
except Exception:
raise AttributeError(key)


def get_cache_filename(col_name):
cache_folder = getattr(config,"CACHE_FOLDER",None)
if not cache_folder:
return # we don't even use cache, forget it
cache_format = getattr(config,"CACHE_FORMAT",None)
cache_file = os.path.join(config.CACHE_FOLDER,col_name)
cache_file = cache_format and (cache_file + ".%s" % cache_format) or cache_file
return cache_file


def invalidate_cache(col_name,col_type="src"):
if col_type == "src":
src_dump = get_src_dump()
if not "." in col_name:
fullname = get_source_fullname(col_name)
assert fullname, "Can't resolve source '%s' (does it exist ?)" % col_name

main,sub = fullname.split(".")
doc = src_dump.find_one({"_id":main})
assert doc, "No such source '%s'" % main
assert doc.get("upload",{}).get("jobs",{}).get(sub), "No such sub-source '%s'" % sub
# this will make the cache too old
doc["upload"]["jobs"][sub]["started_at"] = datetime.datetime.now()
src_dump.update_one({"_id":main},{"$set" : {"upload.jobs.%s.started_at" % sub:datetime.datetime.now()}})
elif col_type == "target":
# just delete the cache file
cache_file = get_cache_filename(col_name)
if cache_file:
try:
os.remove(cache_file)
except FileNotFoundError:
pass

4 changes: 2 additions & 2 deletions biothings/utils/manager.py
Expand Up @@ -8,7 +8,7 @@
from biothings import config
logger = config.logger

from biothings.utils.mongo import get_src_conn
from biothings.utils.internal_backend import get_config_conn
from biothings.utils.common import timesofar, get_random_string, sizeof_fmt
from biothings.utils.hub import find_process

Expand Down Expand Up @@ -166,7 +166,7 @@ class BaseSourceManager(BaseManager):

def __init__(self, job_manager, datasource_path="dataload.sources", *args, **kwargs):
super(BaseSourceManager,self).__init__(job_manager,*args,**kwargs)
self.conn = get_src_conn()
self.conn = get_config_conn()
self.default_src_path = datasource_path

def filter_class(self,klass):
Expand Down

0 comments on commit ecb98dd

Please sign in to comment.