Skip to content
This repository has been archived by the owner on Jul 21, 2022. It is now read-only.

Job download paging #230

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions bin/conductor
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,18 @@ def parse_args():
formatter_class=argparse.RawTextHelpFormatter)

downloader_parser.add_argument("--job_id",
type=int,
nargs="+",
help=("The job id(s) to download. When specified "
"will only download those jobs and terminate "
"afterwards"),
action='append')
"afterwards"))

downloader_parser.add_argument("--task_id",
help="Manually download output for this task")
nargs="*",
default=[],
type=int,
help=("Only valid with the --job_id flag. Instead of downloading *all* "
"of the jobs' tasks, will download only the tasks specified."))

downloader_parser.add_argument("--output",
help="Override for the output directory")
Expand Down
170 changes: 99 additions & 71 deletions conductor/lib/downloader2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import Queue
import logging
import logging.handlers
import multiprocessing
import re
import requests
import random
Expand Down Expand Up @@ -233,7 +234,7 @@ class Downloader(object):
# The amount of time to "sleep" before querying the app for more downloads
naptime = 15
endpoint_downloads_next = '/downloads/next'
endpoint_downloads_job = '/downloads/%s'
endpoint_downloads_job = '/jobs/%s/downloads'
endpoint_downloads_status = '/downloads/status'

download_progess_polling = 2
Expand All @@ -251,6 +252,12 @@ class Downloader(object):
# record last threads alive
_threads_alive = ()

# a messaging object to indicate when all jobs have been queued
_all_enqueued = multiprocessing.Array('c', 'false')

# The number of Downloads to fetch per request (only relevant when downloading by job_id)
download_page_size = CONFIG.get("download_page_size", 200)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default of 200 is already defined as the default value for page_size on get_job_downloads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what your point is. If there is a value in the CONFIG, we want to use that value (otherwise, use 200). The default page_size value in the get_job_downloads function is irrelevant (since we explicitly provide the page_size argument when making the call).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that we hardcode a default value of 200 in two different places. This is redundant, and because one of the two places is as a function's default value, I think we should remove the default value for the download_page_size variable and let the function use the default argument value if download_page_size is None there: https://github.com/AtomicConductor/conductor_client/pull/230/files#diff-2ce48d04d28c78c28bd96ccc55ae441cR458

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this before #203 (comment)

I'm not sure what issue concerns you. Is it that it doesn't feel SSOT/DRY?

And are you suggesting that the signature of get_job_downloads function look like this:

def get_job_downloads(endpoint, client, tids=None, page_size=None):
    if page_size == None:
       page_size = 200

and therefore omit a default value here:

download_page_size = CONFIG.get("download_page_size")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that we are hardcoding the same default value in two places, when it doesn't have to be. The default value could be defined as a default value for an argument once, and that would be the end of it. For the sake of avoiding future regressions, we shouldn't hardcode the same default value in more than one place.

I am suggesting this

def get_job_downloads(endpoint, client, tids=None, page_size=200):

and that:

download_page_size = CONFIG.get("download_page_size")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would result in a page_size of None when calling get_job_downloads. So I'm a little confused how this this solution would work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. In that case, your suggestion of:

def get_job_downloads(endpoint, client, tids=None, page_size=None):
    if page_size == None:
       page_size = 200

is preferable IMO.


def __init__(self, thread_count=None, location=None, output_dir=None):

# Turn on the SIGINT handler. This will catch
Expand All @@ -274,29 +281,30 @@ def start_daemon(cls, thread_count=None, location=None, output_dir=None, summary
downloader = cls(thread_count=thread_count, location=location, output_dir=output_dir)
thread_states = downloader.start(summary_interval=summary_interval)
while not common.SIGINT_EXIT:
pass
# sleep_time = 5
# logger.debug("sleeping1: %s", sleep_time)
# time.sleep(sleep_time)
time.sleep(0.1)
downloader._print_download_history()
downloader.print_uptime()

@classmethod
def download_jobs(cls, job_ids, task_id=None, thread_count=None, output_dir=None):
def download_jobs(cls, job_ids, task_ids=None, thread_count=None, output_dir=None):
'''
Run the downloader for explicit jobs, and terminate afterwards.
'''
downloader = cls(thread_count=thread_count, output_dir=output_dir)
thread_states = downloader.start(job_ids, task_id=task_id)
while not common.SIGINT_EXIT and (not downloader.pending_queue.empty() or not downloader.downloading_queue.empty()):
sleep_time = 2
# logger.debug("sleeping2: %s", sleep_time)
time.sleep(sleep_time)
thread_states = downloader.start(job_ids, task_ids=task_ids)

while not common.SIGINT_EXIT:
# Check if all jobs have been enqueued and if our work queues are now empty.
# This indicates that all work has finished, and to exit.
if downloader._all_enqueued.value == "true":
if not downloader.pending_queue.qsize():
if not downloader.downloading_queue.qsize():
break
time.sleep(0.1)
downloader._print_download_history()
downloader.print_uptime()

def start(self, job_ids=None, task_id=None, summary_interval=10):
def start(self, job_ids=None, task_ids=None, summary_interval=10):
# Create new queues
self.start_time = time.time()
self.pending_queue = Queue.Queue()
Expand All @@ -305,12 +313,10 @@ def start(self, job_ids=None, task_id=None, summary_interval=10):

# If a job id has been specified then only load the queue up with that work
if job_ids:
self.history_queue_max = None
self.get_jobs_downloads(job_ids, task_id)
# Increase the default history max.
self.history_queue_max = 500

# otherwise create a queue thread the polls the app for wor
else:
self.start_queue_thread()
self.start_queue_thread(job_ids, task_ids)

task_download_states = self.start_download_threads(self.downloading_queue, self.pending_queue)
thread_states = {"task_downloads": task_download_states}
Expand All @@ -329,15 +335,22 @@ def print_uptime(self):
human_duration = common.get_human_duration(seconds)
logger.info("Uptime: %s", human_duration)

def start_queue_thread(self):
def start_queue_thread(self, job_ids=None, task_ids=None):
'''
Start and return a thread that is responsible for pinging the app for
Downloads to download (and populating the queue)
Start a thread that is responsible for filling the work queue (pending_queue)
'''

thread = threading.Thread(name="QueueThread",
target=self.queue_target,
args=(self.pending_queue, self.downloading_queue))
# If job ids have been provided then we use a different queuing mechanism (for finite/explicit work)
if job_ids:
thread = threading.Thread(name="QueueThread",
target=self.job_queue_target,
args=(self.pending_queue, self.downloading_queue, job_ids, task_ids))

# Otherwise, use a queuing mechanism that queries for work indefinitely (must be interrupted to exit)
else:
thread = threading.Thread(name="QueueThread",
target=self.queue_target,
args=(self.pending_queue, self.downloading_queue))
thread.setDaemon(True)
thread.start()
return thread
Expand Down Expand Up @@ -402,10 +415,6 @@ def queue_target(self, pending_queue, downloading_queue):
empty_queue_slots = (self.thread_count * 2) - pending_queue.qsize()
# If the queue is full, then sleep
if empty_queue_slots <= 0:
# logger.debug('Pending download queue is full (%s Downloads). Not adding any more Downloads' % self.thread_count)
# sleep_time = 0.5
# logger.debug("sleeping3: %s", sleep_time)
# time.sleep(sleep_time)
continue

logger.debug("empty_queue_slots: %s", empty_queue_slots)
Expand Down Expand Up @@ -434,6 +443,28 @@ def queue_target(self, pending_queue, downloading_queue):
logger.exception("Exception occurred in QueueThead:\n")
self.nap()

def job_queue_target(self, pending_queue, downloading_queue, jids, tids=None):
'''
Fill the work queue (pending_queue) by querying for the given jobs' downloads.
'''
for jid in jids:
try:
# Get the the next download
self.enqueue_job_downloads(jid, tids=tids, page_size=self.download_page_size)

except:
logger.exception("Exception occurred in QueueThead:\n")
self.nap()

# when all of the jobs' downloads have been queried for and enqueued, set this value so
# that the parent process knows that there's no more work coming (and can safely exit once
# the work queues have been emptied/finished
self._all_enqueued.value = "true"

# hang here indefinitely (hack to stay consistent with with queue_target() behavior)
while not common.SIGINT_EXIT:
time.sleep(0.1)

def nap(self):
while not common.SIGINT_EXIT:
# print "Sleeping4 for %s" % self.naptime
Expand All @@ -448,15 +479,45 @@ def get_next_downloads(self, count):
except Exception as e:
logger.exception('Could not get next download')

def get_jobs_downloads(self, job_ids, task_id):
def enqueue_job_downloads(self, jid, tids=None, page_size=200):
'''
Fetch and enqueue all Downloads for the given job.
'''
endpoint = self.endpoint_downloads_job % jid

start_cursor = None
while not common.SIGINT_EXIT:
empty_queue_slots = (self.thread_count * 2) - self.pending_queue.qsize()

# If the queue is full, then sleep and repeat
if empty_queue_slots <= 0:
time.sleep(0.1)
continue

logger.info("Fetching job %s's downloads...", jid)
response = self._get_job_downloads(endpoint, self.api_client, tids, start_cursor, page_size)
downloads = response.get("downloads", [])
logger.debug("Got %s downloads", len(downloads))
for task_download in downloads:
logger.debug("Download %s contains %s files", task_download["download_id"], len(task_download.get("files", [])))
self.pending_queue.put_nowait(task_download)
start_cursor = response.get("next_cursor")
if not start_cursor:
return

@staticmethod
def _get_job_downloads(endpoint, client, tids, start_cursor, page_size):
params = {"limit": page_size,
'start_cursor': start_cursor}

for job_id in job_ids:
endpoint = self.endpoint_downloads_job % job_id
downloads = _get_job_download(endpoint, self.api_client, job_id, task_id)
if downloads:
for task_download in downloads.get("downloads", []):
print "putting in queue: %s" % task_download
self.pending_queue.put(task_download, block=True)
rbody, rcode = client.make_request(endpoint,
verb="POST",
params=params,
data=json.dumps({"tids": tids}),
use_api_key=True)
if rcode != 201:
return {}
return json.loads(rbody)

@common.dec_catch_exception(raise_=True)
def download_target(self, pending_queue, downloading_queue, task_download_state):
Expand Down Expand Up @@ -628,9 +689,6 @@ def reporter_target(self, task_download_state, downloader_thread):
while True:

try:
# logger.debug("threading.threading.currentThread(): %s", threading.currentThread())
# logger.debug('bytes_downloaded is %s' % bytes_downloaded)
# logger.debug('done is %s' % done)

if common.SIGINT_EXIT:
task_download_state.status = TaskDownloadState.STATE_ERROR
Expand Down Expand Up @@ -928,24 +986,6 @@ def _print_pending_queue(self):
else:
logger.info('##### PENDING QUEUE ###### [None]')


#
# logger.info('Downloading Queue (Active Downloads) contains %s items', self.downloading_queue.qsize())
# for item in list(self.downloading_queue.queue):
# logger.info('\tDownloading: %s', item["download_id"])
#


#
# logger.debug("TOTAL THREADS: %s", len(active_threads))
# logger.debug("DOWNLOAD THREADS: %s", len(active_threads))
#
# logger.debug("Pending Queue: ")
# logger.debug("Pending Queue: ")
# logger.debug("Last %s files downloaded ")
# logger.debug("Last %s Jobs downloaded ")
# logger.debug("Last %s Downloads downloaded ")

def construct_active_downloads_summary(self, task_download_states):
'''
#### ACTIVE DOWNLOADS #####
Expand Down Expand Up @@ -1083,20 +1123,6 @@ def _get_next_downloads(location, endpoint, client, count=1):
return json.loads(response_string).get("data", [])


def _get_job_download(endpoint, client, jid, tid):
params = None
if tid:
params = {'tid': tid}
# logger.debug('params: %s', params)
response_string, response_code = client.make_request(endpoint, params=params, use_api_key=True)
# logger.debug("response code is:\n%s" % response_code)
# logger.debug("response data is:\n%s" % response_string)
if response_code != 201:
return None
download_job = json.loads(response_string)
return download_job


# @dec_random_exception(percentage_chance=0.05)
@common.DecRetry(retry_exceptions=CONNECTION_EXCEPTIONS)
def download_file(download_url, filepath, poll_rate=2, state=None):
Expand Down Expand Up @@ -1180,11 +1206,13 @@ def run_downloader(args):
logger.debug('Downloader args: %s', args)

job_ids = args.get("job_id")
task_ids = args.get("task_id")

thread_count = args.get("thread_count")

if job_ids:
Downloader.download_jobs(job_ids,
task_id=args.get("task_id"),
task_ids=task_ids,
thread_count=thread_count,
output_dir=args.get("output"))

Expand Down