From 5d14837e0082aff30693e5849e7722da59cef548 Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Tue, 10 Apr 2018 15:21:08 -0700 Subject: [PATCH 1/8] CT-187 Fix for thread termination issues - Now properly terminates the following threads after uploading a job: ErrorThread MetricStore PrintStatusThread ReporterThread - Disclaimer: this code really stinks. Needs an entire re-write. --- conductor/lib/uploader.py | 64 +++++++++++++++-------------------- conductor/lib/worker.py | 70 +++++++++++++++++++++++++++++++-------- 2 files changed, 82 insertions(+), 52 deletions(-) diff --git a/conductor/lib/uploader.py b/conductor/lib/uploader.py index cc9b1044..75cd56e2 100644 --- a/conductor/lib/uploader.py +++ b/conductor/lib/uploader.py @@ -33,6 +33,7 @@ def do_work(self, job, thread_int): logger.debug('job is %s', job) filename, submission_time_md5 = job assert isinstance(filename, (str, unicode)), "Filepath not of expected type. Got %s" % type(filename) + filename = str(filename) current_md5 = self.get_md5(filename) # if a submission time md5 was provided then check against it @@ -74,7 +75,6 @@ def get_md5(self, filepath): logger.debug("Using md5 cache for file: %s", filepath) return file_cache["md5"] - def cache_file_info(self, file_info): ''' Store the given file_info into the database @@ -84,12 +84,12 @@ def cache_file_info(self, file_info): thread_safe=True) - class MD5OutputWorker(worker.ThreadWorker): ''' This worker will batch the computed md5's into self.batch_size chunks. It will send a partial batch after waiting self.wait_time seconds ''' + def __init__(self, *args, **kwargs): worker.ThreadWorker.__init__(self, *args, **kwargs) self.batch_size = 20 # the controlls the batch size for http get_signed_urls @@ -149,6 +149,7 @@ class HttpBatchWorker(worker.ThreadWorker): Each item in the return list is added to the out_queue. ''' + def __init__(self, *args, **kwargs): worker.ThreadWorker.__init__(self, *args, **kwargs) self.api_client = api_client.ApiClient() @@ -156,7 +157,7 @@ def __init__(self, *args, **kwargs): def make_request(self, job): uri_path = '/api/files/get_upload_urls' - headers = {'Content-Type':'application/json'} + headers = {'Content-Type': 'application/json'} data = {"upload_files": job, "project": self.project} @@ -191,7 +192,10 @@ def do_work(self, job, thread_int): to be uploaded. Note: This is stored as an [int] in order to pass it by reference, as it needs to be accessed and reset by the caller. ''' + + class FileStatWorker(worker.ThreadWorker): + def __init__(self, *args, **kwargs): worker.ThreadWorker.__init__(self, *args, **kwargs) @@ -224,6 +228,7 @@ class UploadWorker(worker.ThreadWorker): This worker receives a (filepath: signed_upload_url) pair and performs an upload of the specified file to the provided url. ''' + def __init__(self, *args, **kwargs): worker.ThreadWorker.__init__(self, *args, **kwargs) self.chunk_size = 1048576 # 1M @@ -257,9 +262,6 @@ def do_work(self, job, thread_int): logger.error(error_message) raise - - - @common.DecRetry(retry_exceptions=api_client.CONNECTION_EXCEPTIONS, tries=5) def do_upload(self, upload_url, filename, md5): ''' @@ -327,7 +329,7 @@ def create_manager(self, project, md5_only=False): def report_status(self): logger.debug('started report_status thread') update_interval = 5 - while True: + while self.working: # don't report status if we are doing a local_upload if not self.upload_id: @@ -401,13 +403,12 @@ def convert_byte_count_to_string(byte_count, transfer_rate=False): @staticmethod def convert_time_to_string(time_remaining): if time_remaining > 3600: - return str(round(time_remaining / float(3600) , 1)) + ' hours' + return str(round(time_remaining / float(3600), 1)) + ' hours' elif time_remaining > 60: - return str(round(time_remaining / float(60) , 1)) + ' minutes' + return str(round(time_remaining / float(60), 1)) + ' minutes' else: return str(round(time_remaining, 1)) + ' seconds' - def upload_status_text(self): num_files_to_upload = self.manager.metric_store.get('num_files_to_upload') files_to_upload = str(num_files_to_upload) @@ -430,7 +431,6 @@ def upload_status_text(self): else: transfer_rate = 0 - unformatted_text = ''' ################################################################################ files to process: {files_to_analyze} @@ -467,7 +467,6 @@ def upload_status_text(self): return formatted_text - def print_status(self): logger.debug('starting print_status thread') update_interval = 3 @@ -475,15 +474,13 @@ def print_status(self): def sleep(): time.sleep(update_interval) - while True: - if self.working: - try: - logger.info(self.manager.worker_queue_status_text()) - logger.info(self.upload_status_text()) - except Exception, e: - print e - print traceback.format_exc() - # pass + while self.working: + try: + logger.info(self.manager.worker_queue_status_text()) + logger.info(self.upload_status_text()) + except Exception, e: + print e + print traceback.format_exc() sleep() def create_print_status_thread(self): @@ -496,16 +493,15 @@ def create_print_status_thread(self): # start thread thd.start() - def mark_upload_finished(self, upload_id, upload_files): - data = {'upload_id':upload_id, + data = {'upload_id': upload_id, 'status': 'server_pending', 'upload_files': upload_files} self.api_client.make_request('/uploads/%s/finish' % upload_id, - data=json.dumps(data), - verb='POST', use_api_key=True) + data=json.dumps(data), + verb='POST', use_api_key=True) return True def mark_upload_failed(self, error_message, upload_id): @@ -513,8 +509,8 @@ def mark_upload_failed(self, error_message, upload_id): # report error_message to the app self.api_client.make_request('/uploads/%s/fail' % upload_id, - data=error_message, - verb='POST', use_api_key=True) + data=error_message, + verb='POST', use_api_key=True) return True @@ -536,6 +532,7 @@ def handle_upload_response(self, project, upload_files, upload_id=None, md5_only logger.info('upload_files %s:(truncated)\n\t%s', len(upload_files), "\n\t".join(upload_files.keys()[:5])) + # reset counters self.num_files_to_process = len(upload_files) self.job_start_time = int(time.time()) @@ -583,7 +580,6 @@ def handle_upload_response(self, project, upload_files, upload_id=None, md5_only except: return traceback.format_exc() - def main(self, run_one_loop=False): logger.info('Uploader Started. Checking for uploads...') @@ -637,7 +633,6 @@ def main(self, run_one_loop=False): logger.info('exiting uploader') - def return_md5s(self): ''' Return a dictionary of the filepaths and their md5s that were generated @@ -655,6 +650,7 @@ def set_logging(level=None, log_dirpath=None): file_formatter=LOG_FORMATTER, log_filepath=log_filepath) + def run_uploader(args): ''' Start the uploader process. This process will run indefinitely, polling @@ -691,6 +687,7 @@ def get_file_info(filepath): "modtime": modtime, "size": stat.st_size} + def resolve_args(args): ''' Resolve all arguments, reconsiling differences between command line args @@ -703,7 +700,6 @@ def resolve_args(args): return args - def resolve_arg(arg_name, args, config): ''' Helper function to resolve the value of an argument. @@ -797,11 +793,3 @@ def resolve_arg(arg_name, args, config): # # logger.debug("Complete") # return md5s - - - - - - - - diff --git a/conductor/lib/worker.py b/conductor/lib/worker.py index d9db7f14..6053f9c0 100644 --- a/conductor/lib/worker.py +++ b/conductor/lib/worker.py @@ -14,7 +14,9 @@ ''' WORKING = True + class Reporter(): + def __init__(self, metric_store=None): self.metric_store = metric_store self.api_helper = api_client.ApiClient() @@ -48,8 +50,6 @@ def start(self): return self.thread - - class ThreadWorker(object): ''' Abstract worker class. @@ -79,9 +79,6 @@ def __init__(self, **kwargs): # create a list to hold the threads that we create self.threads = [] - - - def do_work(self, job): ''' This needs to be implemented for each worker type. The work task from @@ -214,6 +211,7 @@ def put_job(self, job): self.out_queue.put(job) return True + class MetricStore(): ''' This provides a thread-safe integer store that can be used by workers to @@ -226,8 +224,10 @@ def __init__(self): self.metric_store = {} self.update_queue = Queue.Queue() self.started = False + self.terminate = False def join(self): + empty_queue(self.update_queue) self.update_queue.join() return True @@ -239,11 +239,11 @@ def start(self): logger.debug('metric_store already started') return None logger.debug('starting metric_store') - thd = threading.Thread(target=self.target, name=self.__class__.__name__) - thd.daemon = True - thd.start() + self.thread = threading.Thread(target=self.target, name=self.__class__.__name__) + self.thread.daemon = True + self.thread.start() self.started = True - return thd + return self.thread def set(self, key, value): self.metric_store[key] = value @@ -312,9 +312,11 @@ def get_list(self, list_name): def target(self): logger.debug('created metric_store target thread') - while True: - # block until update given - update_tuple = self.update_queue.get(True) + while not self.terminate: + + update_tuple = safe_get(self.update_queue) + if not update_tuple: + continue method = update_tuple[0] method_args = update_tuple[1:] @@ -331,6 +333,9 @@ def target(self): # mark task done self.update_queue.task_done() + def kill(self): + self.terminate = True + self.thread.join() class JobManager(): @@ -379,18 +384,27 @@ def kill_reporters(self): logger.debug('killing reporter %s', reporter) reporter.kill() + def kill_metric_store(self): + logger.debug('killing metric store %s', self.metric_store) + self.metric_store.kill() + + def stop_work(self): global WORKING WORKING = False # stop any new jobs from being created self.drain_queues() # clear out any jobs in queue self.kill_workers() # kill all threads + self.kill_metric_store() self.kill_reporters() self.mark_all_tasks_complete() # reset task counts def error_handler_target(self): + global WORKING - while True: - error = self.error_queue.get(True) + while WORKING: + error = safe_get(self.error_queue) + if not error: + continue logger.error('got something from the error queue') self.error.append(error) self.stop_work() @@ -482,6 +496,7 @@ def join(self): if self.error: return self.error self.kill_workers() + self.kill_metric_store() self.kill_reporters() return None @@ -496,3 +511,30 @@ def worker_queue_status_text(self): msg += '\t\t%s threads' % num_active_threads msg += '\n' return msg + + +def empty_queue(queue): + ''' + Remove and return all items from the given Queue object + ''' + items = [] + + while True: + item = safe_get(queue) + if not item: + break + items.append(item) + return items + + +def safe_get(queue): + ''' + Get and return an item from the given queue. + If the queue is empty, reurn None (supressing the exception). + ''' + try: + return (queue.get_nowait()) + except Queue.Empty: + return + except Exception: + logger.exception("recovered from exception:\n%s") From adec4419a6a0268e4a3500db3197984cdce76c34 Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Thu, 12 Apr 2018 19:19:23 -0700 Subject: [PATCH 2/8] chore: converted list to iterator for performance considerations --- conductor/submitter_maya.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conductor/submitter_maya.py b/conductor/submitter_maya.py index 5e1888b2..6a20af7f 100644 --- a/conductor/submitter_maya.py +++ b/conductor/submitter_maya.py @@ -289,7 +289,7 @@ def getEnvironment(self): environment.update({"OCIO": ocio_config}) # If the user has selected rendeman for maya, make sure to disable pathhelper - if "renderman-maya" in [p["product"] for p in self.getJobPackages()]: + if "renderman-maya" in (p["product"] for p in self.getJobPackages()): logger.debug("Renderman detected. Setting CONDUCTOR_PATHHELPER to 0") environment.update({"CONDUCTOR_PATHHELPER": "0"}) From e46501ef55daa8f78cf5b7eff348d3d3bb4630fb Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Wed, 4 Apr 2018 14:03:21 -0700 Subject: [PATCH 3/8] uploader args fix --- bin/conductor | 39 +++++++++++++++++++----- conductor/lib/uploader.py | 62 ++++++++++++++++++++++----------------- 2 files changed, 67 insertions(+), 34 deletions(-) diff --git a/bin/conductor b/bin/conductor index ac2fb7d5..4781e3a5 100755 --- a/bin/conductor +++ b/bin/conductor @@ -255,6 +255,11 @@ def parse_args(): description=uploader_parser_desciption, formatter_class=argparse.RawTextHelpFormatter) + uploader_parser.add_argument("--database_filepath", + help=("The filepath to the local md5 caching database. If no filepath " + "is specified, the database will be created in a temp directory. " + "Note that this flag is only active when --local_upload is True.")) + uploader_parser.add_argument("--location", help=('An optional string to indicate which location this uploader ' 'executable should register as. This option is only relevant ' @@ -280,10 +285,25 @@ def parse_args(): "everyday, while storing the last 7 days " "of logs")) + uploader_parser.add_argument("--md5_caching", + help=("Use cached md5s. This can dramatically improve the uploading " + "times, as md5 checking can be very time consuming. Caching md5s " + "allows subsequent uploads (of the same files) to skip the " + "md5 generation process (if the files appear to not have been " + "modified since the last time they were submitted). The cache is " + "stored locally and uses a file's modification time and file size " + "to intelligently guess whether the file has changed. Set this " + "flag to False if there is concern that files may not be getting " + "re-uploaded properly. " + "Note that this flag is only active when --local_upload is True."), + choices=[False, True], + type=cast_to_bool, + default=None) + uploader_parser.add_argument("--thread_count", type=int, default=conductor.CONFIG.get("thread_count"), - help=('The number of threads that should download simultaneously')) + help=('The number of threads that should upload simultaneously')) uploader_parser.add_argument("--alt", help=('Run an alternative version of the downloader'), @@ -427,13 +447,18 @@ def run_submit(args): def run_uploader(args): + ''' + Run the Uploader + If the user has indicated to use the alternative uploader (and the system is not on Windows) + then run the alternative uploader. Otherwise run the standard uploader. + ''' args_dict = vars(args) - if sys.platform == "win32": - uploader.run_uploader(args) - if args_dict.get("alt"): - uploader_v2.run_uploader(args) - else: - uploader.run_uploader(args) + use_alt = bool(args_dict.pop("alt", False)) + + if use_alt and sys.platform != "win32": + return uploader_v2.run_uploader(args) + + return uploader.run_uploader(args) def run_downloader(args): diff --git a/conductor/lib/uploader.py b/conductor/lib/uploader.py index 75cd56e2..926527e1 100644 --- a/conductor/lib/uploader.py +++ b/conductor/lib/uploader.py @@ -285,15 +285,18 @@ class Uploader(object): sleep_time = 10 - def __init__(self, args=None): - logger.debug("Uploader.__init__") - self.api_client = api_client.ApiClient() - self.args = args or {} - self.args['thread_count'] = CONFIG['thread_count'] - logger.debug("args: %s", self.args) + def __init__(self, location=None, thread_count=4, md5_caching=True, database_filepath=None): + logger.debug("location: %s", location) + logger.debug("thread_count: %s", thread_count) + logger.debug("md5_caching: %s", md5_caching) + logger.debug("database_filepath: %s", database_filepath) + + self.thread_count = thread_count + self.location = location + self.md5_caching = md5_caching + self.database_filepath = database_filepath - self.location = self.args.get("location") - self.project = self.args.get("project") + self.api_client = api_client.ApiClient() def prepare_workers(self): logger.debug('preparing workers...') @@ -305,21 +308,21 @@ def prepare_workers(self): def create_manager(self, project, md5_only=False): if md5_only: job_description = [ - (MD5Worker, [], {'thread_count': self.args['thread_count'], - "database_filepath": self.args['database_filepath'], - "md5_caching": self.args['md5_caching']}) + (MD5Worker, [], {'thread_count': self.thread_count, + "database_filepath": self.database_filepath, + "md5_caching": self.md5_caching}) ] else: job_description = [ - (MD5Worker, [], {'thread_count': self.args['thread_count'], - "database_filepath": self.args['database_filepath'], - "md5_caching": self.args['md5_caching']}), + (MD5Worker, [], {'thread_count': self.thread_count, + "database_filepath": self.database_filepath, + "md5_caching": self.md5_caching}), (MD5OutputWorker, [], {'thread_count': 1}), - (HttpBatchWorker, [], {'thread_count': self.args['thread_count'], + (HttpBatchWorker, [], {'thread_count': self.thread_count, "project": project}), (FileStatWorker, [], {'thread_count': 1}), - (UploadWorker, [], {'thread_count': self.args['thread_count']}), + (UploadWorker, [], {'thread_count': self.thread_count}), ] manager = worker.JobManager(job_description) @@ -532,7 +535,6 @@ def handle_upload_response(self, project, upload_files, upload_id=None, md5_only logger.info('upload_files %s:(truncated)\n\t%s', len(upload_files), "\n\t".join(upload_files.keys()[:5])) - # reset counters self.num_files_to_process = len(upload_files) self.job_start_time = int(time.time()) @@ -658,16 +660,24 @@ def run_uploader(args): ''' # convert the Namespace object to a dictionary args_dict = vars(args) + logger.debug("Parsed args: %s", args_dict) # Set up logging - log_level_name = args_dict.get("log_level") or CONFIG.get("log_level") + log_level_name = args_dict.pop("log_level", None) or CONFIG.get("log_level") log_level = loggeria.LEVEL_MAP.get(log_level_name) - log_dirpath = args_dict.get("log_dir") or CONFIG.get("log_dir") + log_dirpath = args_dict.pop("log_dir", None) or CONFIG.get("log_dir") set_logging(log_level, log_dirpath) - logger.debug('Uploader parsed_args is %s', args_dict) - resolved_args = resolve_args(args_dict) - uploader = Uploader(resolved_args) + location = resolve_arg("location", args_dict, CONFIG) + thread_count = resolve_arg("thread_count", args_dict, CONFIG) + md5_caching = resolve_arg("md5_caching", args_dict, CONFIG) + database_filepath = resolve_arg("database_filepath", args_dict, CONFIG) or client_db.get_default_db_filepath() + uploader = Uploader( + location=location, + thread_count=thread_count, + md5_caching=md5_caching, + database_filepath=database_filepath, + ) uploader.main() @@ -696,11 +706,11 @@ def resolve_args(args): args["md5_caching"] = resolve_arg("md5_caching", args, CONFIG) args["database_filepath"] = resolve_arg("database_filepath", args, CONFIG) args["location"] = resolve_arg("location", args, CONFIG) - + args["thread_count"] = resolve_arg("thread_count", args, CONFIG) return args -def resolve_arg(arg_name, args, config): +def resolve_arg(arg_name, args, config, default=None): ''' Helper function to resolve the value of an argument. The order of resolution is: @@ -719,9 +729,7 @@ def resolve_arg(arg_name, args, config): if value != None: return value # Otherwise use the value in the config if it's there, otherwise default to None - return config.get(arg_name) - - + return config.get(arg_name, default) # @common.dec_timer_exitlog_level=logging.DEBUG From e4caea230777ba5f5daa04319fb1180219c1ba2a Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Thu, 12 Apr 2018 16:07:11 -0700 Subject: [PATCH 4/8] fix for hanging thread when error occurs --- conductor/lib/worker.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/conductor/lib/worker.py b/conductor/lib/worker.py index 6053f9c0..cf457b3b 100644 --- a/conductor/lib/worker.py +++ b/conductor/lib/worker.py @@ -388,7 +388,6 @@ def kill_metric_store(self): logger.debug('killing metric store %s', self.metric_store) self.metric_store.kill() - def stop_work(self): global WORKING WORKING = False # stop any new jobs from being created @@ -401,7 +400,7 @@ def stop_work(self): def error_handler_target(self): global WORKING - while WORKING: + while WORKING: error = safe_get(self.error_queue) if not error: continue @@ -491,10 +490,10 @@ def join(self): logger.debug('waiting for %s workers to finish', worker_class_name) worker.join() logger.debug('all workers finished') - self.metric_store.join() - logger.debug('metric store in sync') if self.error: return self.error + self.metric_store.join() + logger.debug('metric store in sync') self.kill_workers() self.kill_metric_store() self.kill_reporters() From 1191f351cfe50d5cafed214194d96f0ca7ac2534 Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Thu, 12 Apr 2018 16:08:09 -0700 Subject: [PATCH 5/8] misc uploader improvements --- conductor/lib/common.py | 20 ++++++++++++++++++++ conductor/lib/uploader.py | 30 +++++++++++++++++------------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/conductor/lib/common.py b/conductor/lib/common.py index 5447ab7f..cbe6fff6 100644 --- a/conductor/lib/common.py +++ b/conductor/lib/common.py @@ -7,6 +7,7 @@ import multiprocessing import os import platform +from pprint import pformat import random import signal import subprocess @@ -671,3 +672,22 @@ class TmpLoader(loader): with open(filepath) as f: return yaml.load(f, loader) # nosec (ignore bandit static analysis warning for not using safe_load [B506:yaml_load] ) + + +def sstr(object_, char_count=1000, pretty=True): + ''' + Return a string representation of the given object, shortened to the given + char_count. This can be useful when printing/logging out data for debugging + purposes, but don't want an overwhelming wall of text to scroll through. + + pretty: bool. If true, will pretty print the object + ''' + + try: + s_str = pformat(object_) if pretty else str(object_) + except Exception: + s_str = "" % type(object_) + + if len(s_str) > char_count: + s_str = s_str[:char_count] + "..." + return s_str diff --git a/conductor/lib/uploader.py b/conductor/lib/uploader.py index 926527e1..7511573f 100644 --- a/conductor/lib/uploader.py +++ b/conductor/lib/uploader.py @@ -1,5 +1,4 @@ import datetime -import time import json import logging import os @@ -7,6 +6,7 @@ import sys import thread from threading import Thread +import time import traceback from conductor import CONFIG @@ -474,17 +474,13 @@ def print_status(self): logger.debug('starting print_status thread') update_interval = 3 - def sleep(): - time.sleep(update_interval) - while self.working: try: logger.info(self.manager.worker_queue_status_text()) logger.info(self.upload_status_text()) - except Exception, e: - print e - print traceback.format_exc() - sleep() + except Exception: + logger.exception("#### Print Status Thread exception ####\n") + self.sleep(update_interval) def create_print_status_thread(self): logger.debug('creating console status thread') @@ -579,9 +575,17 @@ def handle_upload_response(self, project, upload_files, upload_id=None, md5_only self.mark_upload_finished(self.upload_id, finished_upload_files) - except: + except Exception: + logger.exception("######## ENCOUNTERED EXCEPTION #########\n") return traceback.format_exc() + @classmethod + def sleep(cls, seconds): + for _ in xrange(seconds): + if common.SIGINT_EXIT: + return + time.sleep(1) + def main(self, run_one_loop=False): logger.info('Uploader Started. Checking for uploads...') @@ -598,12 +602,12 @@ def main(self, run_one_loop=False): logger.debug('no files to upload') sys.stdout.write('.') sys.stdout.flush() - time.sleep(self.sleep_time) + self.sleep(self.sleep_time) continue elif resp_code != 201: logger.error('received invalid response code from app %s', resp_code) logger.error('response is %s', resp_str) - time.sleep(self.sleep_time) + self.sleep(self.sleep_time) continue print '' # to make a newline after the 204 loop @@ -614,7 +618,7 @@ def main(self, run_one_loop=False): upload = json_data.get("data", {}) except ValueError: logger.error('response was not valid json: %s', resp_str) - time.sleep(self.sleep_time) + self.sleep(self.sleep_time) continue upload_files = upload['upload_files'] @@ -630,7 +634,7 @@ def main(self, run_one_loop=False): break except: logger.exception('Caught exception:\n') - time.sleep(self.sleep_time) + self.sleep(self.sleep_time) continue logger.info('exiting uploader') From 6bae665f328750998309f0d7294f72e027727ccd Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Thu, 12 Apr 2018 19:09:58 -0700 Subject: [PATCH 6/8] CT-194 added memcache to uploader --- conductor/lib/uploader.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/conductor/lib/uploader.py b/conductor/lib/uploader.py index 7511573f..08d269b6 100644 --- a/conductor/lib/uploader.py +++ b/conductor/lib/uploader.py @@ -157,6 +157,7 @@ def __init__(self, *args, **kwargs): def make_request(self, job): uri_path = '/api/files/get_upload_urls' + params = {"memcached": True} headers = {'Content-Type': 'application/json'} data = {"upload_files": job, "project": self.project} @@ -164,6 +165,7 @@ def make_request(self, job): response_str, response_code = self.api_client.make_request(uri_path=uri_path, verb='POST', headers=headers, + params=params, data=json.dumps(data), raise_on_error=True, use_api_key=True) From 85871594200047c8e453087a807cb77c92c7afcb Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Thu, 15 Mar 2018 21:18:26 -0700 Subject: [PATCH 7/8] CT-171 Added compression option for api make_request - Activated for POST /jobs/ --- conductor/lib/api_client.py | 13 ++++++++++++- conductor/lib/conductor_submit.py | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/conductor/lib/api_client.py b/conductor/lib/api_client.py index e7fe5342..bccd4d48 100644 --- a/conductor/lib/api_client.py +++ b/conductor/lib/api_client.py @@ -1,7 +1,9 @@ +import gzip import json import logging import os import requests +import StringIO import time import urlparse import jwt @@ -57,7 +59,7 @@ def _make_request(self, verb, conductor_url, headers, params, data, raise_on_err def make_request(self, uri_path="/", headers=None, params=None, data=None, verb=None, conductor_url=None, raise_on_error=True, tries=5, - use_api_key=False): + compress=False, use_api_key=False): ''' verb: PUT, POST, GET, DELETE, HEAD, PATCH ''' @@ -91,6 +93,15 @@ def make_request(self, uri_path="/", headers=None, params=None, data=None, assert verb in self.http_verbs, "Invalid http verb: %s" % verb + # GZip Compress the content of the request + if compress: + headers["Content-Encoding"] = "gzip" + logger.debug("gzipping content...") + out_file = StringIO.StringIO() + with gzip.GzipFile(fileobj=out_file, mode="wb") as gzipper: + gzipper.write(data) + data = out_file.getvalue() + # Create a retry wrapper function retry_wrapper = common.DecRetry(retry_exceptions=CONNECTION_EXCEPTIONS, tries=tries) diff --git a/conductor/lib/conductor_submit.py b/conductor/lib/conductor_submit.py index 7642ae5b..3242da33 100755 --- a/conductor/lib/conductor_submit.py +++ b/conductor/lib/conductor_submit.py @@ -303,7 +303,6 @@ def validate_args(self): if self.gpu_config.get("type") not in supported_gpu_types: raise BadArgumentError("GPU type %s is not one of %s" % (self.gpu_config.get("type"), supported_gpu_types)) - def send_job(self, upload_files, upload_size): ''' Construct args for two different cases: @@ -378,6 +377,7 @@ def send_job(self, upload_files, upload_size): logger.info("Sending Job...") response, response_code = self.api_client.make_request(uri_path="jobs/", data=json.dumps(submit_dict), + compress=True, raise_on_error=False, use_api_key=True) if response_code not in [201, 204]: From 03a1f1da1c95d2b6d5f41b39eadfe5af71cf7e3f Mon Sep 17 00:00:00 2001 From: Lawrence Schlosser Date: Wed, 25 Apr 2018 13:05:15 -0700 Subject: [PATCH 8/8] Increased uploader MD5OutputWorker batch size from 20 to 100 --- conductor/lib/uploader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/conductor/lib/uploader.py b/conductor/lib/uploader.py index 08d269b6..1ea185d4 100644 --- a/conductor/lib/uploader.py +++ b/conductor/lib/uploader.py @@ -92,7 +92,7 @@ class MD5OutputWorker(worker.ThreadWorker): def __init__(self, *args, **kwargs): worker.ThreadWorker.__init__(self, *args, **kwargs) - self.batch_size = 20 # the controlls the batch size for http get_signed_urls + self.batch_size = 100 # the controlls the batch size for http get_signed_urls self.wait_time = 1 self.batch = {}