Skip to content
This repository has been archived by the owner on Jul 21, 2022. It is now read-only.

Commit

Permalink
Added support for paging resulting when getting a job's downloads
Browse files Browse the repository at this point in the history
- also added support for specifying multiple tids when downloading.
- --job_id and --task_id flags now take multiple space-separated args.
  • Loading branch information
lawschlosser committed Nov 13, 2018
1 parent 6d22213 commit 6e6cc49
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 28 deletions.
11 changes: 8 additions & 3 deletions bin/conductor
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,18 @@ def parse_args():
formatter_class=argparse.RawTextHelpFormatter)

downloader_parser.add_argument("--job_id",
type=int,
nargs="+",
help=("The job id(s) to download. When specified "
"will only download those jobs and terminate "
"afterwards"),
action='append')
"afterwards"))

downloader_parser.add_argument("--task_id",
help="Manually download output for this task")
nargs="*",
default=[],
type=int,
help=("Only valid with the --job_id flag. Instead of downloading *all* "
"of the jobs' tasks, will download only the tasks specified."))

downloader_parser.add_argument("--output",
help="Override for the output directory")
Expand Down
71 changes: 46 additions & 25 deletions conductor/lib/downloader2.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class Downloader(object):
# The amount of time to "sleep" before querying the app for more downloads
naptime = 15
endpoint_downloads_next = '/downloads/next'
endpoint_downloads_job = '/downloads/%s'
endpoint_downloads_job = '/jobs/%s/downloads'
endpoint_downloads_status = '/downloads/status'

download_progess_polling = 2
Expand All @@ -251,6 +251,9 @@ class Downloader(object):
# record last threads alive
_threads_alive = ()

# The number of Downloads to fetch per request (only relevant when downloading by job_id)
download_page_size = CONFIG.get("download_page_size", 200)

def __init__(self, thread_count=None, location=None, output_dir=None):

# Turn on the SIGINT handler. This will catch
Expand Down Expand Up @@ -282,12 +285,12 @@ def start_daemon(cls, thread_count=None, location=None, output_dir=None, summary
downloader.print_uptime()

@classmethod
def download_jobs(cls, job_ids, task_id=None, thread_count=None, output_dir=None):
def download_jobs(cls, job_ids, task_ids=None, thread_count=None, output_dir=None):
'''
Run the downloader for explicit jobs, and terminate afterwards.
'''
downloader = cls(thread_count=thread_count, output_dir=output_dir)
thread_states = downloader.start(job_ids, task_id=task_id)
thread_states = downloader.start(job_ids, task_ids=task_ids)
while not common.SIGINT_EXIT and (not downloader.pending_queue.empty() or not downloader.downloading_queue.empty()):
sleep_time = 2
# logger.debug("sleeping2: %s", sleep_time)
Expand All @@ -296,7 +299,7 @@ def download_jobs(cls, job_ids, task_id=None, thread_count=None, output_dir=None
downloader._print_download_history()
downloader.print_uptime()

def start(self, job_ids=None, task_id=None, summary_interval=10):
def start(self, job_ids=None, task_ids=None, summary_interval=10):
# Create new queues
self.start_time = time.time()
self.pending_queue = Queue.Queue()
Expand All @@ -306,7 +309,7 @@ def start(self, job_ids=None, task_id=None, summary_interval=10):
# If a job id has been specified then only load the queue up with that work
if job_ids:
self.history_queue_max = None
self.get_jobs_downloads(job_ids, task_id)
self.get_jobs_downloads(job_ids, task_ids)

# otherwise create a queue thread the polls the app for wor
else:
Expand Down Expand Up @@ -448,15 +451,13 @@ def get_next_downloads(self, count):
except Exception as e:
logger.exception('Could not get next download')

def get_jobs_downloads(self, job_ids, task_id):
def get_jobs_downloads(self, job_ids, task_ids):

for job_id in job_ids:
endpoint = self.endpoint_downloads_job % job_id
downloads = _get_job_download(endpoint, self.api_client, job_id, task_id)
if downloads:
for task_download in downloads.get("downloads", []):
print "putting in queue: %s" % task_download
self.pending_queue.put(task_download, block=True)
downloads = get_job_downloads(endpoint, self.api_client, task_ids, page_size=self.download_page_size)
for task_download in downloads:
self.pending_queue.put(task_download, block=True)

@common.dec_catch_exception(raise_=True)
def download_target(self, pending_queue, downloading_queue, task_download_state):
Expand Down Expand Up @@ -728,6 +729,7 @@ def _update_file_state_callback(self, file_state, filepath, file_size, bytes_pro

# @dec_random_exception(percentage_chance=0.05)


def add_to_history(self, file_download_state):

self.history_queue.put(file_download_state, block=False)
Expand All @@ -740,6 +742,7 @@ def add_to_history(self, file_download_state):

# @dec_random_exception(percentage_chance=0.05)


def report_download_status(self, task_download_state):
download_id = task_download_state.task_download.get("download_id")
if not download_id:
Expand Down Expand Up @@ -940,7 +943,6 @@ def _print_pending_queue(self):
# logger.debug("Last %s Jobs downloaded ")
# logger.debug("Last %s Downloads downloaded ")


def construct_active_downloads_summary(self, task_download_states):
'''
#### ACTIVE DOWNLOADS #####
Expand Down Expand Up @@ -1078,21 +1080,38 @@ def _get_next_downloads(location, endpoint, client, count=1):
return json.loads(response_string).get("data", [])


def _get_job_download(endpoint, client, jid, tid):
params = None
if tid:
params = {'tid': tid}
# logger.debug('params: %s', params)
response_string, response_code = client.make_request(endpoint, params=params, use_api_key=True)
# logger.debug("response code is:\n%s" % response_code)
# logger.debug("response data is:\n%s" % response_string)
if response_code != 201:
return None
download_job = json.loads(response_string)
return download_job
def get_job_downloads(endpoint, client, tids=None, page_size=200):
'''
Fetch all Downloads for the given job.
'''

def _get_job_downloads(endpoint, client, tids, start_cursor, page_size):
params = {"limit": page_size,
'start_cursor': start_cursor}

rbody, rcode = client.make_request(endpoint,
verb="POST",
params=params,
data=json.dumps({"tids": tids}),
use_api_key=True)

if rcode != 201:
return {}
return json.loads(rbody)

downloads = []
start_cursor = None
while True:
logger.info("Fetching downloads...")
response = _get_job_downloads(endpoint, client, tids, start_cursor, page_size)
downloads.extend(response.get("downloads", []))
start_cursor = response.get("next_cursor")
if not start_cursor:
return downloads

# @dec_random_exception(percentage_chance=0.05)


@common.DecRetry(retry_exceptions=CONNECTION_EXCEPTIONS)
def download_file(download_url, filepath, poll_rate=2, state=None):
'''
Expand Down Expand Up @@ -1175,11 +1194,13 @@ def run_downloader(args):
logger.debug('Downloader args: %s', args)

job_ids = args.get("job_id")
task_ids = args.get("task_id")

thread_count = args.get("thread_count")

if job_ids:
Downloader.download_jobs(job_ids,
task_id=args.get("task_id"),
task_ids=task_ids,
thread_count=thread_count,
output_dir=args.get("output"))

Expand Down

0 comments on commit 6e6cc49

Please sign in to comment.