Skip to content

Commit

Permalink
Batched sql queries, sanitization
Browse files Browse the repository at this point in the history
  • Loading branch information
hexylena committed Aug 1, 2017
1 parent df33d4f commit 07faeaa
Showing 1 changed file with 85 additions and 69 deletions.
154 changes: 85 additions & 69 deletions scripts/grt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from __future__ import print_function

import argparse
import gzip
import tarfile
import json
import os
Expand Down Expand Up @@ -171,13 +170,15 @@ def main(argv):
default=default_config)
parser.add_argument("-l", "--loglevel", choices=['debug', 'info', 'warning', 'error', 'critical'],
help="Set the logging level", default='warning')
# parser.add_argument("-m", "--max-batch", type=int, min=1, help="The maximum number of records to be exported in a single invocation of GRT.")
parser.add_argument("-b", "--batch-size", type=int, default=1000,
help="Batch size for sql queries")

args = parser.parse_args()
logging.getLogger().setLevel(getattr(logging, args.loglevel.upper()))

_times = []
_start_time = time.time()

def annotate(label, human_label=None):
if human_label:
logging.info(human_label)
Expand Down Expand Up @@ -213,7 +214,7 @@ def annotate(label, human_label=None):

# Set up our arrays
active_users = defaultdict(int)
grt_jobs_data = []
job_state_data = defaultdict(int)

annotate('san_init', 'Building Sanitizer')
san = Sanitization(config['blacklist'], model, sa_session)
Expand All @@ -222,97 +223,114 @@ def annotate(label, human_label=None):
if not os.path.exists(REPORT_DIR):
os.makedirs(REPORT_DIR)


# Pick an end point so our queries can return uniform data.
annotate('endpoint_start', 'Identifying a safe endpoint for SQL queries')
end_job_id = sa_session.query(model.Job.id) \
.order_by(model.Job.id.desc()) \
.first()[0]
annotate('endpoint_end')
annotate('endpoint_end', 'Processing jobs (%s, %s]' % (last_job_sent, end_job_id))

# Remember the last job sent.
if end_job_id == last_job_sent:
logging.info("No new jobs to report")
# So we can just quit now.
sys.exit(0)

# Unfortunately we have to keep this mapping for the sanitizer to work properly.
job_tool_map = {}

annotate('export_jobs_start', 'Exporting Jobs')
handle_job = open(REPORT_BASE + '.jobs.tsv', 'w')
for job in sa_session.query(model.Job.id, model.Job.tool_id, model.Job.state) \
.filter(model.Job.id > last_job_sent) \
.filter(model.Job.id <= end_job_id) \
.all():

handle_job.write(str(job[0]))
handle_job.write('\t')
handle_job.write(job[1])
handle_job.write('\n')
for offset_start in range(last_job_sent, end_job_id, args.batch_size):
logging.debug("Processing %s:%s", offset_start, min(end_job_id, offset_start + args.batch_size))
for job in sa_session.query(model.Job.id, model.Job.tool_id, model.Job.state, model.Job.user_id) \
.filter(model.Job.id > offset_start) \
.filter(model.Job.id <= min(end_job_id, offset_start + args.batch_size)) \
.all():

handle_job.write(str(job[0]))
handle_job.write('\t')
handle_job.write(job[1])
handle_job.write('\t')
handle_job.write(job[2])
handle_job.write('\n')
# meta counts
job_state_data[job[2]] += 1
active_users[job[3]] += 1
job_tool_map[job[0]] = job[1]

handle_job.close()
annotate('export_jobs_end')


annotate('export_metric_num_start', 'Exporting Metrics (Numeric)')
handle_metric_num = open(REPORT_BASE + '.metric_num.tsv', 'w')
for metric in sa_session.query(model.JobMetricNumeric.job_id, model.JobMetricNumeric.plugin, model.JobMetricNumeric.metric_name, model.JobMetricNumeric.metric_value) \
.filter(model.JobMetricNumeric.job_id > last_job_sent) \
.filter(model.JobMetricNumeric.job_id <= end_job_id) \
.all():

handle_metric_num.write(str(metric[0]))
handle_metric_num.write('\t')
handle_metric_num.write(metric[1])
handle_metric_num.write('\t')
handle_metric_num.write(metric[2])
handle_metric_num.write('\t')
handle_metric_num.write(str(metric[3]))
handle_metric_num.write('\n')
for offset_start in range(last_job_sent, end_job_id, args.batch_size):
logging.debug("Processing %s:%s", offset_start, min(end_job_id, offset_start + args.batch_size))
for metric in sa_session.query(model.JobMetricNumeric.job_id, model.JobMetricNumeric.plugin, model.JobMetricNumeric.metric_name, model.JobMetricNumeric.metric_value) \
.filter(model.JobMetricNumeric.job_id > offset_start) \
.filter(model.JobMetricNumeric.job_id <= min(end_job_id, offset_start + args.batch_size)) \
.all():

handle_metric_num.write(str(metric[0]))
handle_metric_num.write('\t')
handle_metric_num.write(metric[1])
handle_metric_num.write('\t')
handle_metric_num.write(metric[2])
handle_metric_num.write('\t')
handle_metric_num.write(str(metric[3]))
handle_metric_num.write('\n')
handle_metric_num.close()
annotate('export_metric_num_end')


annotate('export_metric_txt_start', 'Exporting Metrics (Text)')
handle_metric_txt = open(REPORT_BASE + '.metric_txt.tsv', 'w')
for metric in sa_session.query(model.JobMetricText.job_id, model.JobMetricText.plugin, model.JobMetricText.metric_name, model.JobMetricText.metric_value) \
.filter(model.JobMetricText.job_id > last_job_sent) \
.filter(model.JobMetricText.job_id <= end_job_id) \
.all():

handle_metric_txt.write(str(metric[0]))
handle_metric_txt.write('\t')
handle_metric_txt.write(metric[1])
handle_metric_txt.write('\t')
handle_metric_txt.write(metric[2])
handle_metric_txt.write('\t')
handle_metric_txt.write(metric[3])
handle_metric_txt.write('\n')
for offset_start in range(last_job_sent, end_job_id, args.batch_size):
logging.debug("Processing %s:%s", offset_start, min(end_job_id, offset_start + args.batch_size))
for metric in sa_session.query(model.JobMetricText.job_id, model.JobMetricText.plugin, model.JobMetricText.metric_name, model.JobMetricText.metric_value) \
.filter(model.JobMetricText.job_id > offset_start) \
.filter(model.JobMetricText.job_id <= min(end_job_id, offset_start + args.batch_size)) \
.all():

handle_metric_txt.write(str(metric[0]))
handle_metric_txt.write('\t')
handle_metric_txt.write(metric[1])
handle_metric_txt.write('\t')
handle_metric_txt.write(metric[2])
handle_metric_txt.write('\t')
handle_metric_txt.write(metric[3])
handle_metric_txt.write('\n')
handle_metric_txt.close()
annotate('export_metric_txt_end')


# job, metric_text, metric_num, params

annotate('export_params_start', 'Export Job Parameters')
handle_params = open(REPORT_BASE + '.params.tsv', 'w')
for param in sa_session.query(model.JobParameter.job_id, model.JobParameter.name, model.JobParameter.value) \
.filter(model.JobParameter.job_id > last_job_sent) \
.filter(model.JobParameter.job_id <= end_job_id) \
.all():

handle_params.write(str(param[0]))
handle_params.write('\t')
handle_params.write(param[1])
handle_params.write('\t')
handle_params.write(param[2])
handle_params.write('\n')
for offset_start in range(last_job_sent, end_job_id, args.batch_size):
logging.debug("Processing %s:%s", offset_start, min(end_job_id, offset_start + args.batch_size))
for param in sa_session.query(model.JobParameter.job_id, model.JobParameter.name, model.JobParameter.value) \
.filter(model.JobParameter.job_id > offset_start) \
.filter(model.JobParameter.job_id <= min(end_job_id, offset_start + args.batch_size)) \
.all():

unsanitized = {param[1]: json.loads(param[2])}
sanitized = san.sanitize_data(job_tool_map[param[0]], unsanitized)

handle_params.write(str(param[0]))
handle_params.write('\t')
handle_params.write(param[1])
handle_params.write('\t')
handle_params.write(json.dumps(sanitized))
handle_params.write('\n')
handle_params.close()
annotate('export_params_end')

# Remember the last job sent.
if len(grt_jobs_data) > 0:
last_job_sent = job.id
else:
logging.info("No new jobs to report")

# Now on to outputs.

with tarfile.open(REPORT_BASE + '.tar.gz', 'w:gz') as handle:
for name in ('jobs', 'metric_num', 'metric_txt', 'params'):
handle.add(REPORT_BASE + '.' + name + '.tsv')

for name in ('jobs', 'metric_num', 'metric_txt', 'params'):
os.unlink(REPORT_BASE + '.' + name + '.tsv')

_times.append(('job_finish', time.time() - _start_time))
sha = subprocess.check_output(['sha256sum', REPORT_BASE + '.tar.gz'])
_times.append(('hash_finish', time.time() - _start_time))
Expand All @@ -338,18 +356,16 @@ def annotate(label, human_label=None):
"_times": _times,
},
"users": {
"active": len(set(active_users)),
"active": len(active_users.keys()),
"total": sa_session.query(model.User.id).count(),
},
"jobs": {
"ok": len(grt_jobs_data),
},
"jobs": job_state_data,
"tools": toolbox
}, handle)

# update our checkpoint
# Write our checkpoint file so we know where to start next time.
with open(CHECK_POINT_FILE, 'w') as handle:
handle.write(str(last_job_sent))
handle.write(str(end_job_id))


if __name__ == '__main__':
Expand Down

0 comments on commit 07faeaa

Please sign in to comment.