Skip to content

Commit

Permalink
Various optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
hexylena committed Jul 31, 2017
1 parent 11ac796 commit d453231
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 44 deletions.
160 changes: 116 additions & 44 deletions scripts/grt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
import yaml
import logging
# logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

from collections import defaultdict

Expand All @@ -38,7 +39,7 @@ def dumper(obj):
return str(obj)


def _init(config):
def _init(config, need_app=False):
if config.startswith('/'):
config_file = os.path.abspath(config)
else:
Expand All @@ -50,6 +51,11 @@ def _init(config):
if not config.database_connection:
logging.warning("The database connection is empty. If you are using the default value, please uncomment that in your galaxy.ini")

if need_app:
app = galaxy.app.UniverseApplication(global_conf={'__file__': config_file, 'here': os.getcwd()}),
else:
app = None

return (
mapping.init(
config.file_path,
Expand All @@ -60,7 +66,7 @@ def _init(config):
object_store,
config.database_connection.split(':')[0],
config,
galaxy.app.UniverseApplication(global_conf={'__file__': config_file, 'here': os.getcwd()}),
app
)


Expand All @@ -73,8 +79,12 @@ def kw_metrics(job):

class Sanitization:

def __init__(self, sanitization_config):
def __init__(self, sanitization_config, model, sa_session):
self.sanitization_config = sanitization_config
# SA Stuff
self.model = model
self.sa_session = sa_session
self.filesize_cache = {}

if 'tool_params' not in self.sanitization_config:
self.sanitization_config['tool_params'] = {}
Expand All @@ -94,7 +104,37 @@ def sanitize_data(self, tool_id, data):
self.tool_id = tool_id
return self._sanitize_value(data)

def _file_dict(self, data):
key = '{src}-{id}'.format(**data)
if key in self.filesize_cache:
return self.filesize_cache[data]
if data['src'] == 'hda':
try:
dataset = self.sa_session.query(self.model.Dataset.id, self.model.Dataset.total_size) \
.filter_by(id=data['id']) \
.first()
if dataset and dataset[1]:
data['size'] = int(dataset[1])
else:
data['size'] = None
except sa.orm.exc.NoResultFound:
data['size'] = None

# Push to cache for later.
self.filesize_cache[data['id']] = data
return data
else:
raise Exception("Cannot handle {src} yet".format(data))


def _sanitize_dict(self, unsanitized_dict, path=""):
# if it is a file dictionary, handle specially.
if len(unsanitized_dict.keys()) == 2 and \
'id' in unsanitized_dict and \
'src' in unsanitized_dict and \
unsanitized_dict['src'] in ('hda', 'ldda'):
return self._file_dict(unsanitized_dict)

return {
k: self._sanitize_value(v, path=path + '.' + k)
for (k, v)
Expand Down Expand Up @@ -132,7 +172,7 @@ def main(argv):
help="Set the logging level", default='warning')

args = parser.parse_args()
logging.basicConfig(level=getattr(logging, args.loglevel.upper()))
logging.getLogger().setLevel(getattr(logging, args.loglevel.upper()))

_times = []
_start_time = time.time()
Expand Down Expand Up @@ -160,9 +200,10 @@ def main(argv):
last_job_sent = -1

logging.info('Loading Galaxy...')
model, object_store, engine, gxconfig, app = _init(config['galaxy_config'])
model, object_store, engine, gxconfig, app = _init(config['galaxy_config'], need_app=config['grt']['metadata']['share_toolbox'])

sa_session = model.context.current
logging.info('Configuration Loaded')
_times.append(('gx_conf_loaded', time.time() - _start_time))

# Fetch jobs COMPLETED with status OK that have not yet been sent.
Expand All @@ -171,39 +212,64 @@ def main(argv):
active_users = defaultdict(int)
grt_jobs_data = []

san = Sanitization(config['blacklist'])

# For every job
for job in sa_session.query(model.Job)\
.filter(sa.and_(
model.Job.table.c.state == "ok",
model.Job.table.c.id > last_job_sent
))\
.order_by(model.Job.table.c.id.asc())\
.all():
if job.tool_id in config['blacklist'].get('tools', []):
continue

# If the user has run a job, they're active.
active_users[job.user_id] += 1

metrics = kw_metrics(job)

params = job.raw_param_dict()
for key in params:
params[key] = json.loads(params[key])

logging.debug("Sanitizing %s %s" % (job.tool_id, str(params)))
job_data = (
str(job.id),
job.tool_id,
job.tool_version,
job.update_time.strftime('%s'),
json.dumps(metrics, default=dumper),
json.dumps(san.sanitize_data(job.tool_id, params))
)
grt_jobs_data.append(job_data)
_times.append(('jobs_parsed', time.time() - _start_time))
logging.info('Building Sanitizer')
_times.append(('san_init', time.time() - _start_time))
san = Sanitization(config['blacklist'], model, sa_session)
_times.append(('san_fin', time.time() - _start_time))

logging.info('Loading Jobs')
_times.append(('job_init', time.time() - _start_time))

# Batch the database queries to improve the performance.
# Get the maximum value.
max_job_id = sa_session.query(model.Job.id) \
.filter(model.Job.id > last_job_sent) \
.order_by(model.Job.id.desc()) \
.first()[0]

for selection_start in range(last_job_sent, max_job_id + 1, 1000):
logging.info("Processing %s - %s", selection_start, selection_start + 1000)
_processing_times = []
# For every job
for job in sa_session.query(model.Job)\
.filter(sa.and_(
model.Job.table.c.state == "ok",
model.Job.table.c.id > selection_start,
model.Job.table.c.id < selection_start + 1000
))\
.order_by(model.Job.table.c.id.asc())\
.all():
if job.id % 100 == 0:
logging.info(str(job.id))

_job_start_time = time.time()
if job.tool_id in config['blacklist'].get('tools', []):
continue

# If the user has run a job, they're active.
active_users[job.user_id] += 1

metrics = kw_metrics(job)

params = job.raw_param_dict()
for key in params:
params[key] = json.loads(params[key])

logging.debug("Sanitizing %s %s" % (job.tool_id, str(params)))
job_data = (
str(job.id),
job.tool_id,
job.tool_version,
job.update_time.strftime('%s'),
json.dumps(metrics, default=dumper),
json.dumps(san.sanitize_data(job.tool_id, params))
)
grt_jobs_data.append(job_data)
_processing_times.append(time.time() - _job_start_time)
logging.info('Min %s', min(_processing_times))
logging.info('Max %s', max(_processing_times))
logging.info('Avg %s', sum(t / len(_processing_times) for t in _processing_times))
_times.append(('job_fin', time.time() - _start_time))

# Remember the last job sent.
if len(grt_jobs_data) > 0:
Expand All @@ -219,10 +285,11 @@ def main(argv):
if os.path.exists(REPORT_DIR) and not os.path.exists(ARCHIVE_DIR):
os.makedirs(ARCHIVE_DIR)

_times.append(('job_meta_start', time.time() - _start_time))
with open(METADATA_FILE, 'w') as handle:
json.dump(config['grt']['metadata'], handle, indent=2)
_times.append(('job_meta_end', time.time() - _start_time))

_times.append(('job_meta', time.time() - _start_time))
with gzip.open(REPORT_BASE + '.tsv.gz', 'w') as handle:
for job in grt_jobs_data:
handle.write('\t'.join(job))
Expand All @@ -235,6 +302,14 @@ def main(argv):

# Now serialize the individual report data.
with open(REPORT_BASE + '.json', 'w') as handle:
if config['grt']['metadata']['share_toolbox']:
toolbox = [
(tool.id, tool.name, tool.version, tool.tool_shed, tool.repository_id, tool.repository_name)
for tool_id, tool in app.toolbox._tools_by_id.items()
]
else:
toolbox = None

json.dump({
"version": 1,
"galaxy_version": gxconfig.version_major,
Expand All @@ -245,15 +320,12 @@ def main(argv):
},
"users": {
"active": len(set(active_users)),
"total": sa_session.query(model.User).count(),
"total": sa_session.query(model.User.id).count(),
},
"jobs": {
"ok": len(grt_jobs_data),
},
"tools": [
(tool.id, tool.name, tool.version, tool.tool_shed, tool.repository_id, tool.repository_name)
for tool_id, tool in app.toolbox._tools_by_id.items()
]
"tools": toolbox
}, handle)

# update our checkpoint
Expand Down
4 changes: 4 additions & 0 deletions scripts/grt.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ grt:
# Owners (these are the usernames of users registered with the galactic-radio-telescope system.)
owners:
- jane.doe
# Are you willing to share your toolbox? I.e. what tools are installed.
# If your instance is public, this can help us direct users to your
# instance.
share_toolbox: False


blacklist:
Expand Down

0 comments on commit d453231

Please sign in to comment.