Skip to content

Commit

Permalink
- sharded and "normal" builders
Browse files Browse the repository at this point in the history
- temp. deactivate some custom commands
- migrate config to new biothings 0.6.0+
  • Loading branch information
sirloon committed Feb 5, 2020
1 parent 6e50a31 commit c835714
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 77 deletions.
64 changes: 33 additions & 31 deletions src/bin/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import biothings.hub.databuild.differ as differ
import biothings.hub.databuild.syncer as syncer

from hub.databuild.builder import MyVariantDataBuilder
from hub.databuild.builder import MyVariantDataBuilder, MyVariantShardedDataBuilder
from hub.dataindex.indexer import MyVariantIndexerManager
from hub.databuild.differ import MyVariantDifferManager
from hub.databuild.mapper import TagObserved, TagObservedAndSkipLongId
Expand Down Expand Up @@ -108,8 +108,10 @@ def do(srcs,tgt):
def configure_build_manager(self):
observed = TagObserved(name="observed")
observed_skipidtoolong = TagObservedAndSkipLongId(name="observed_skipidtoolong")
mvbuilder = partial(MyVariantDataBuilder,mappers=[observed,observed_skipidtoolong])
sharded_mvbuilder = partial(MyVariantShardedDataBuilder,mappers=[observed,observed_skipidtoolong])
build_manager = builder.BuilderManager(
builder_class=partial(MyVariantDataBuilder,mappers=[observed,observed_skipidtoolong]),
builder_class=[mvbuilder,sharded_mvbuilder],
job_manager=self.managers["job_manager"])
build_manager.configure()
self.managers["build_manager"] = build_manager
Expand Down Expand Up @@ -145,38 +147,38 @@ def configure_commands(self):
self.commands["premerge"] = partial(self.managers["build_manager"].merge,steps=["merge","metadata"])
# sync
self.commands["es_sync_hg19_test"] = partial(self.managers["sync_manager_test"].sync,"es",
target_backend=(config.ES_CONFIG["env"]["test"]["host"],
config.ES_CONFIG["env"]["test"]["index"]["hg19"][0]["index"],
config.ES_CONFIG["env"]["test"]["index"]["hg19"][0]["doc_type"]))
target_backend=(config.INDEX_CONFIG["env"]["test"]["host"],
config.INDEX_CONFIG["env"]["test"]["index"]["hg19"][0]["index"],
config.INDEX_CONFIG["env"]["test"]["index"]["hg19"][0]["doc_type"]))
self.commands["es_sync_hg38_test"] = partial(self.managers["sync_manager_test"].sync,"es",
target_backend=(config.ES_CONFIG["env"]["test"]["host"],
config.ES_CONFIG["env"]["test"]["index"]["hg38"][0]["index"],
config.ES_CONFIG["env"]["test"]["index"]["hg38"][0]["doc_type"]))
target_backend=(config.INDEX_CONFIG["env"]["test"]["host"],
config.INDEX_CONFIG["env"]["test"]["index"]["hg38"][0]["index"],
config.INDEX_CONFIG["env"]["test"]["index"]["hg38"][0]["doc_type"]))
self.commands["es_sync_hg19_prod"] = partial(self.managers["sync_manager"].sync,"es",
target_backend=(config.ES_CONFIG["env"]["prod"]["host"],
config.ES_CONFIG["env"]["prod"]["index"]["hg19"][0]["index"],
config.ES_CONFIG["env"]["prod"]["index"]["hg19"][0]["doc_type"]))
target_backend=(config.INDEX_CONFIG["env"]["prod"]["host"],
config.INDEX_CONFIG["env"]["prod"]["index"]["hg19"][0]["index"],
config.INDEX_CONFIG["env"]["prod"]["index"]["hg19"][0]["doc_type"]))
self.commands["es_sync_hg38_prod"] = partial(self.managers["sync_manager"].sync,"es",
target_backend=(config.ES_CONFIG["env"]["prod"]["host"],
config.ES_CONFIG["env"]["prod"]["index"]["hg38"][0]["index"],
config.ES_CONFIG["env"]["prod"]["index"]["hg38"][0]["doc_type"]))
# snapshot, diff & publish
self.commands["snapshot_demo"] = partial(self.managers["index_manager"].snapshot,repository=config.SNAPSHOT_REPOSITORY + "-demo")
# override with diff type
self.commands["diff_prod"] = partial(self.managers["diff_manager"].diff,differ.ColdHotSelfContainedJsonDiffer.diff_type)
self.commands["diff_demo"] = partial(self.managers["diff_manager"].diff,differ.SelfContainedJsonDiffer.diff_type)
self.commands["publish_diff_hg19"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-hg19")
self.commands["publish_diff_hg38"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-hg38")
self.commands["publish_snapshot_hg19"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-hg19")
self.commands["publish_snapshot_hg38"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-hg38")
self.commands["publish_diff_demo_hg19"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-demo_hg19",
s3_bucket=config.S3_DIFF_BUCKET + "-demo")
self.commands["publish_diff_demo_hg38"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-demo_hg38",
s3_bucket=config.S3_DIFF_BUCKET + "-demo")
self.commands["publish_snapshot_demo_hg19"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-demo_hg19",
repository=config.READONLY_SNAPSHOT_REPOSITORY + "-demo")
self.commands["publish_snapshot_demo_hg38"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-demo_hg38",
repository=config.READONLY_SNAPSHOT_REPOSITORY + "-demo")
target_backend=(config.INDEX_CONFIG["env"]["prod"]["host"],
config.INDEX_CONFIG["env"]["prod"]["index"]["hg38"][0]["index"],
config.INDEX_CONFIG["env"]["prod"]["index"]["hg38"][0]["doc_type"]))
## snapshot, diff & publish
#self.commands["snapshot_demo"] = partial(self.managers["index_manager"].snapshot,repository=config.SNAPSHOT_REPOSITORY + "-demo")
## override with diff type
#self.commands["diff_prod"] = partial(self.managers["diff_manager"].diff,differ.ColdHotSelfContainedJsonDiffer.diff_type)
#self.commands["diff_demo"] = partial(self.managers["diff_manager"].diff,differ.SelfContainedJsonDiffer.diff_type)
#self.commands["publish_diff_hg19"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-hg19")
#self.commands["publish_diff_hg38"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-hg38")
#self.commands["publish_snapshot_hg19"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-hg19")
#self.commands["publish_snapshot_hg38"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-hg38")
#self.commands["publish_diff_demo_hg19"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-demo_hg19",
# s3_bucket=config.S3_DIFF_BUCKET + "-demo")
#self.commands["publish_diff_demo_hg38"] = partial(self.managers["diff_manager"].publish_diff,config.S3_APP_FOLDER + "-demo_hg38",
# s3_bucket=config.S3_DIFF_BUCKET + "-demo")
#self.commands["publish_snapshot_demo_hg19"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-demo_hg19",
# repository=config.READONLY_SNAPSHOT_REPOSITORY + "-demo")
#self.commands["publish_snapshot_demo_hg38"] = partial(self.managers["index_manager"].publish_snapshot,s3_folder=config.S3_APP_FOLDER + "-demo_hg38",
# repository=config.READONLY_SNAPSHOT_REPOSITORY + "-demo")


import hub.dataload
Expand Down
140 changes: 103 additions & 37 deletions src/config_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@
CMD_COLLECTION = 'cmd' # for launched/running commands in shell
EVENT_COLLECTION = 'event' # for launched/running commands in shell

DATA_TARGET_MASTER_COLLECTION = 'db_master'

# Redis config to cache IDs when doing cold/hot merge
REDIS_CONNECTION_PARAMS = {}

# where to store info about processes launched by the hub
RUN_DIR = '/tmp/run'

# define valid sources to get chrom from, and for each, name of the chrom field
CHROM_FIELDS = {'cadd':'chrom', 'clinvar':'chrom', 'cosmic':'chrom', 'dbnsfp':'chrom',
'dbsnp':'chrom', 'docm':'chrom', 'evs':'chrom', 'exac':'chrom'}
Expand All @@ -40,14 +32,6 @@
# max length for _id field
MAX_ID_LENGTH = 512

# ES s3 repository to use snapshot/restore (must be pre-configured in ES)
SNAPSHOT_REPOSITORY = "variant_repository"
# ES snapshot name accessible (usually using a URL)
# These two snapshot configs should point to
# the same location in a way. The different is the first
# used access controller to write data, and the second is read-only
READONLY_SNAPSHOT_REPOSITORY ="variant_url"

# cache file format ("": ascii/text uncompressed, or "gz|zip|xz"
CACHE_FORMAT = "xz"

Expand All @@ -69,12 +53,6 @@
# as any pending job will consume some memory).
MAX_QUEUED_JOBS = os.cpu_count() * 4

# when creating a snapshot, how long should we wait before querying ES
# to check snapshot status/completion ? (in seconds)
# Since myvariant's indices are pretty big, a whole snaphost won't happen in few secs,
# let's just monitor the status every 5min
MONITOR_SNAPSHOT_DELAY = 5 * 60

# Hub environment (like, prod, dev, ...)
# Used to generate remote metadata file, like "latest.json", "versions.json"
# If non-empty, this constant will be used to generate those url, as a prefix
Expand All @@ -87,15 +65,9 @@
HUB_ICON = "http://biothings.io/static/img/myvariant-logo-shiny.svg"
HUB_VERSION = "0.2"

# S3 bucket, root of all biothings releases information
S3_RELEASE_BUCKET = "biothings-releases"
# S3 bucket, root of all biothings diffs
S3_DIFF_BUCKET = "biothings-diffs"
# what sub-folder should be used within diff bucket to upload diff files
S3_APP_FOLDER = "myvariant.info" # hg19/hg38 will be concat

# Pre-prod/test ES definitions
ES_CONFIG = {
INDEX_CONFIG = {
"build_config_key" : "assembly", # used to select proper idxr/syncer
"indexer_select": {
# default
Expand All @@ -105,7 +77,7 @@
},
"env" : {
"prod" : {
"host" : "prodserver:9200",
"host" : "<PRODSERVER>:9200",
"indexer" : {
"args" : {
"timeout" : 300,
Expand Down Expand Up @@ -136,6 +108,105 @@
},
}

# Snapshot environment configuration
SNAPSHOT_CONFIG = {
"env" : {
"prod" : {
"cloud" : {
"type" : "aws", # default, only one supported by now
"access_key" : None,
"secret_key" : None,
},
"repository" : {
"name" : "variant_repository",
"type" : "s3",
"settings" : {
"bucket" : "<SNAPSHOT_BUCKET_NAME>",
"base_path" : "myvariant.info/$(Y)", # per year
"region" : "us-west-2",
},
"acl" : "private",
},
"indexer" : {
# reference to INDEX_CONFIG
"env" : "prod",
},
# when creating a snapshot, how long should we wait before querying ES
# to check snapshot status/completion ? (in seconds)
"monitor_delay" : 60 * 5,
},
"demo" : {
"cloud" : {
"type" : "aws", # default, only one supported by now
"access_key" : None,
"secret_key" : None,
},
"repository" : {
"name" : "variant_repository-demo",
"type" : "s3",
"settings" : {
"bucket" : "<SNAPSHOT_DEMO_BUCKET_NAME>",
"base_path" : "myvariant.info/$(Y)", # per year
"region" : "us-west-2",
},
"acl" : "public",
},
"indexer" : {
# reference to INDEX_CONFIG
"env" : "test",
},
# when creating a snapshot, how long should we wait before querying ES
# to check snapshot status/completion ? (in seconds)
"monitor_delay" : 10,
}
}
}

# Release configuration
# Each root keys define a release environment (test, prod, ...)
RELEASE_CONFIG = {
"env" : {
"prod" : {
"cloud" : {
"type" : "aws", # default, only one supported by now
"access_key" : None,
"secret_key" : None,
},
"release" : {
"bucket" : "<RELEASES_BUCKET_NAME>",
"region" : "us-west-2",
"folder" : "myvariant.info",
"auto" : True, # automatically generate release-note ?
},
"diff" : {
"bucket" : "<DIFFS_BUCKET_NAME>",
"folder" : "myvariant.info",
"region" : "us-west-2",
"auto" : True, # automatically generate diff ? Careful if lots of changes
},
},
"demo": {
"cloud" : {
"type" : "aws", # default, only one supported by now
"access_key" : None,
"secret_key" : None,
},
"release" : {
"bucket" : "<RELEASES_BUCKET_NAME>",
"region" : "us-west-2",
"folder" : "myvariant.info-demo",
"auto" : True, # automatically generate release-note ?
},
"diff" : {
"bucket" : "<DIFFS_BUCKET_NAME>",
"folder" : "myvariant.info",
"region" : "us-west-2",
"auto" : True, # automatically generate diff ? Careful if lots of changes
},
}
}
}

SLACK_WEBHOOK = None

# SSH port for hub console
Expand All @@ -152,9 +223,8 @@
# cached data (it None, caches won't be used at all)
CACHE_FOLDER = None

# Role, when master, hub will publish data (updates, snapshot, etc...) that
# other instances can use (production, standalones)
BIOTHINGS_ROLE = "slave"
# when publishing releases, specify the targetted (ie. required) standalone version
STANDALONE_VERSION = "standalone_v3"

import logging
from biothings.utils.loggers import setup_default_log
Expand Down Expand Up @@ -230,14 +300,10 @@
default=ConfigurationValue("""os.path.join(DATA_ARCHIVE_ROOT,"logs")"""),
desc="Define path to folder which will contain log files")

STANDALONE_VERSION = ConfigurationError("Define standalone version targetted by this Hub")

IDS_S3_BUCKET = ConfigurationDefault(
default="myvariant-ids",
desc="Define a bucket name to upload myvariant _ids to")

STANDALONE_VERSION = ConfigurationError("Define standalone version targetted by this Hub")

# default hub logger
logger = ConfigurationDefault(
default=logging,
Expand Down
18 changes: 10 additions & 8 deletions src/hub/databuild/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@ class MyVariantDataBuilder(builder.DataBuilder):

MAX_CHROM_EX = 100000 # if chrom discrepancies found, max # of examples we keep

def __init__(self, build_name, source_backend, target_backend, *args, **kwargs):
shared_tgt_backend = partial(ShardedTargetDocMongoBackend,
target_db=partial(mongo.get_target_db))
super().__init__(build_name=build_name,
source_backend=source_backend,
target_backend=shared_tgt_backend,
*args,**kwargs)

def merge(self, sources=None, target_name=None, batch_size=50000, job_manager=None, **kwargs):
# just override default batch_size or it consumes too much mem
return super(MyVariantDataBuilder,self).merge(
Expand Down Expand Up @@ -131,6 +123,16 @@ def get_stats(self,*args,**kwargs):
# overwritten by the ones coming from the base class (see root_keys in set_chrom())
return {}

class MyVariantShardedDataBuilder(MyVariantDataBuilder):

def __init__(self, build_name, source_backend, target_backend, *args, **kwargs):
shared_tgt_backend = partial(ShardedTargetDocMongoBackend,
target_db=partial(mongo.get_target_db))
super().__init__(build_name=build_name,
source_backend=source_backend,
target_backend=shared_tgt_backend,
*args,**kwargs)


def get_chrom(doc):
chrom_keys = set()
Expand Down
2 changes: 1 addition & 1 deletion src/hub/dataindex/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def post_index(self, target_name, index_name, job_manager, steps=["index","post"
return update_stats(idxer,assembly)


class MyVariantIndexerManager(indexer.IndexerManager):
class MyVariantIndexerManager(indexer.IndexManager):

def post_publish(self, snapshot, index, *args, **kwargs):
# assuming build name == index name, and assuming demo index has
Expand Down

0 comments on commit c835714

Please sign in to comment.