Skip to content

Commit

Permalink
More updates to GRT
Browse files Browse the repository at this point in the history
  • Loading branch information
hexylena committed Jul 21, 2017
1 parent 69d0e21 commit 9803dc6
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 144 deletions.
293 changes: 155 additions & 138 deletions scripts/grt.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
#!/usr/bin/env python
"""Script for uploading Galaxy statistics to the Galactic radio telescope.
"""Script for parsing Galaxy job information in preparation for submission to the Galactic radio telescope.
See doc/source/admin/grt.rst for more detailed usage information.
"""
from __future__ import print_function

import os
import sys
import json
import urllib2
import argparse
import gzip
import json
import os
import sqlalchemy as sa
import sys
import time
import yaml
import re
import logging

from collections import defaultdict

sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, 'lib')))

Expand All @@ -25,6 +28,14 @@
default_config = os.path.abspath(os.path.join(os.path.dirname(__file__), 'grt.yml'))


def dumper(obj):
try:
return obj.toJSON()
except AttributeError:
if obj.__class__.__name__ == 'Decimal':
return str(obj)


def _init(config):
if config.startswith('/'):
config = os.path.abspath(config)
Expand All @@ -34,6 +45,8 @@ def _init(config):
properties = load_app_properties(ini_file=config)
config = galaxy.config.Configuration(**properties)
object_store = build_object_store_from_config(config)
if not config.database_connection:
logging.warning("The database connection is empty. If you are using the default value, please uncomment that in your galaxy.ini")

return (
mapping.init(
Expand All @@ -43,184 +56,188 @@ def _init(config):
object_store=object_store
),
object_store,
config.database_connection.split(':')[0]
config.database_connection.split(':')[0],
config
)


def _sanitize_dict(unsanitized_dict):
sanitized_dict = dict()

for key in unsanitized_dict:
if key == 'values' and type(unsanitized_dict[key]) is list:
sanitized_dict[key] = None
else:
sanitized_dict[key] = _sanitize_value(unsanitized_dict[key])

if sanitized_dict[key] is None:
del sanitized_dict[key]
def kw_metrics(job):
return {
'%s_%s' % (metric.plugin, metric.metric_name): metric.metric_value
for metric in job.metrics
}

if len(sanitized_dict) == 0:
return None
else:
return sanitized_dict

class Sanitization:

def _sanitize_list(unsanitized_list):
sanitized_list = list()
def __init__(self, sanitization_config):
self.sanitization_config = sanitization_config

for key in range(len(unsanitized_list)):
sanitized_value = _sanitize_value(unsanitized_list[key])
if not None:
sanitized_list.append(sanitized_value)
if 'tool_params' not in self.sanitization_config:
self.sanitization_config['tool_params'] = {}

if len(sanitized_list) == 0:
return None
else:
return sanitized_list
if '__any__' not in self.sanitization_config['tool_params']:
self.sanitization_config['tool_params']['__any__'] = []

def blacklisted_tree(self, path):
if path.lstrip('.') in self.sanitization_config['tool_params']['__any__']:
return True
elif self.tool_id in self.sanitization_config['tool_params']:
if path.lstrip('.') in self.sanitization_config['tool_params'][self.tool_id]:
return True
return False

def _sanitize_value(unsanitized_value):
sanitized_value = None
def sanitize_data(self, tool_id, data):
self.tool_id = tool_id
return self._sanitize_value(data)

fp_regex = re.compile('^(\/[^\/]+)+$')
def _sanitize_dict(self, unsanitized_dict, path=""):
return {
k: self._sanitize_value(v, path=path + '.' + k)
for (k, v)
in unsanitized_dict.items()
}

if type(unsanitized_value) is dict:
sanitized_value = _sanitize_dict(unsanitized_value)
elif type(unsanitized_value) is list:
sanitized_value = _sanitize_list(unsanitized_value)
else:
if fp_regex.match(str(unsanitized_value)):
sanitized_value = None
def _sanitize_list(self, unsanitized_list, path=""):
return [
self._sanitize_value(v, path=path + '.*')
for v in unsanitized_list
]

def _sanitize_value(self, unsanitized_value, path=""):
logging.debug("%sSAN %s" % (' ' * path.count('.'), unsanitized_value))
if self.blacklisted_tree(path):
logging.debug("%sSAN ***REDACTED***" % (' ' * path.count('.')))
return None

if type(unsanitized_value) is dict:
return self._sanitize_dict(unsanitized_value, path=path)
elif type(unsanitized_value) is list:
return self._sanitize_list(unsanitized_value, path=path)
else:
sanitized_value = unsanitized_value

return sanitized_value
logging.debug("%s> Sanitizing %s = %s" % (' ' * path.count('.'), path, unsanitized_value))
return unsanitized_value


def main(argv):
"""Entry point for GRT statistics collection."""
parser = argparse.ArgumentParser()
parser.add_argument('instance_id', help='Galactic Radio Telescope Instance ID')
parser.add_argument('api_key', help='Galactic Radio Telescope API Key')

parser.add_argument('-c', '--config', dest='config', help='Path to GRT config file (scripts/grt.ini)', default=default_config)
parser.add_argument('--dry-run', dest='dryrun', help='Dry run (show data to be sent, but do not send)', action='store_true', default=False)
parser.add_argument('--grt-url', dest='grt_url', help='GRT Server (You can run your own!)')
args = parser.parse_args(argv[1:])

print('Loading GRT ini...')
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-r', '--report-directory', help='Directory to store reports in',
default=os.path.abspath(os.path.join('.', 'reports')))
parser.add_argument('-c', '--config', help='Path to GRT config file',
default=default_config)
parser.add_argument("-l", "--loglevel", choices=['debug', 'info', 'warning', 'error', 'critical'],
help="Set the logging level", default='warning')

args = parser.parse_args()
logging.basicConfig(level=getattr(logging, args.loglevel.upper()))

logging.info('Loading GRT ini...')
try:
with open(args.config) as f:
config_dict = yaml.load(f)
with open(args.config) as handle:
config = yaml.load(handle)
except Exception:
with open(sample_config) as f:
config_dict = yaml.load(f)

# set to 0 by default
if 'last_job_id_sent' not in config_dict:
config_dict['last_job_id_sent'] = 0

if args.instance_id:
config_dict['instance_id'] = args.instance_id
if args.api_key:
config_dict['api_key'] = args.api_key
if args.grt_url:
config_dict['grt_url'] = args.grt_url

print('Loading Galaxy...')
model, object_store, engine = _init(config_dict['galaxy_config'])
with open(sample_config) as handle:
config = yaml.load(handle)

REPORT_DIR = args.report_directory
CHECK_POINT_FILE = os.path.join(REPORT_DIR, '.checkpoint')
ARCHIVE_DIR = os.path.join(REPORT_DIR, 'archives')
METADATA_FILE = os.path.join(REPORT_DIR, 'meta.json')
REPORT_IDENTIFIER = str(time.time())
REPORT_BASE = os.path.join(ARCHIVE_DIR, REPORT_IDENTIFIER)

if os.path.exists(CHECK_POINT_FILE):
with open(CHECK_POINT_FILE, 'r') as handle:
last_job_sent = int(handle.read())
else:
last_job_sent = -1

logging.info('Loading Galaxy...')
model, object_store, engine, gxconfig = _init(config['galaxy_config'])
sa_session = model.context.current

# Fetch jobs COMPLETED with status OK that have not yet been sent.
jobs = sa_session.query(model.Job)\
.filter(sa.and_(
model.Job.table.c.state == "ok",
model.Job.table.c.id > config_dict['last_job_id_sent']
model.Job.table.c.id > last_job_sent
))\
.all()

# Set up our arrays
active_users = []
grt_tool_data = []
active_users = defaultdict(int)
grt_jobs_data = []

def kw_metrics(job):
return {
'%s_%s' % (metric.plugin, metric.metric_name): metric.metric_value
for metric in job.metrics
}
san = Sanitization(config['blacklist'])

# For every job
for job in jobs:
if job.tool_id in config_dict['tool_blacklist']:
if job.tool_id in config['blacklist'].get('tools', []):
continue

# Append an active user, we'll reduce at the end
active_users.append(job.user_id)

# Find the tool in our normalized tool table.
if (job.tool_id, job.tool_version) not in grt_tool_data:
grt_tool_idx = len(grt_tool_data)
grt_tool_data.append((job.tool_id, job.tool_version))
else:
grt_tool_idx = grt_tool_data.index((job.tool_id, job.tool_version))
# If the user has run a job, they're active.
active_users[job.user_id] += 1

metrics = kw_metrics(job)

wanted_metrics = ('core_galaxy_slots', 'core_runtime_seconds')

grt_metrics = {
k: int(metrics.get(k, 0))
for k in wanted_metrics
}

params = job.raw_param_dict()
for key in params:
params[key] = json.loads(params[key])

job_data = {
'tool': grt_tool_idx,
'date': job.update_time.strftime('%s'),
'metrics': grt_metrics,
'params': _sanitize_dict(params)
}
logging.debug("Sanitizing %s %s" % (job.tool_id, str(params)))
job_data = (
str(job.id),
job.tool_id,
job.tool_version,
job.update_time.strftime('%s'),
json.dumps(metrics, default=dumper),
json.dumps(san.sanitize_data(job.tool_id, params))
)
grt_jobs_data.append(job_data)

# Remember the last job sent.
if len(jobs) > 0:
config_dict['last_job_id_sent'] = jobs[-1].id

grt_report_data = {
'meta': {
'version': 1,
'instance_uuid': config_dict['instance_id'],
'instance_api_key': config_dict['api_key'],
# We do not record ANYTHING about your users other than count.
'active_users': len(set(active_users)),
'total_users': sa_session.query(model.User).count(),
'recent_jobs': len(jobs),
},
'tools': [
{
'tool_id': a,
'tool_version': b,
}
for (a, b) in grt_tool_data
],
'jobs': grt_jobs_data,
}

if args.dryrun:
print(json.dumps(grt_report_data, indent=2))
last_job_sent = jobs[-1].id
else:
try:
urllib2.urlopen(config_dict['grt_url'], data=json.dumps(grt_report_data))
except urllib2.HTTPError as htpe:
print(htpe.read())
exit(1)

# Update grt.ini with last id of job (prevent duplicates from being sent)
with open(args.config, 'w') as f:
yaml.dump(config_dict, f, default_flow_style=False)
logging.info("No new jobs to report")

# Now on to outputs.
if not os.path.exists(REPORT_DIR):
os.makedirs(REPORT_DIR)
os.makedirs(ARCHIVE_DIR)

if os.path.exists(REPORT_DIR) and not os.path.exists(ARCHIVE_DIR):
os.makedirs(ARCHIVE_DIR)

with open(METADATA_FILE, 'w') as handle:
json.dump(config['grt']['metadata'], handle, indent=2)

# Now serialize the individual report data.
with open(REPORT_BASE + '.json', 'w') as handle:
json.dump({
"version": 1,
"generated": REPORT_IDENTIFIER,
"metrics": {
},
"users": {
"active": len(set(active_users)),
"total": sa_session.query(model.User).count(),
},
"jobs": {
"ok": len(jobs),
},
"tools": [
]
}, handle)

with gzip.open(REPORT_BASE + '.tsv.gz', 'w') as handle:
for job in grt_jobs_data:
handle.write('\t'.join(job))
handle.write('\n')

# update our checkpoint
with open(CHECK_POINT_FILE, 'w') as handle:
handle.write(str(last_job_sent))


if __name__ == '__main__':
Expand Down

0 comments on commit 9803dc6

Please sign in to comment.