Skip to content
This repository has been archived by the owner on Jul 21, 2022. It is now read-only.

Feature/ct 194/memcached uploader urls #192

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions bin/conductor
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ def parse_args():
description=uploader_parser_desciption,
formatter_class=argparse.RawTextHelpFormatter)

uploader_parser.add_argument("--database_filepath",
help=("The filepath to the local md5 caching database. If no filepath "
"is specified, the database will be created in a temp directory. "
"Note that this flag is only active when --local_upload is True."))

uploader_parser.add_argument("--location",
help=('An optional string to indicate which location this uploader '
'executable should register as. This option is only relevant '
Expand All @@ -280,10 +285,25 @@ def parse_args():
"everyday, while storing the last 7 days "
"of logs"))

uploader_parser.add_argument("--md5_caching",
help=("Use cached md5s. This can dramatically improve the uploading "
"times, as md5 checking can be very time consuming. Caching md5s "
"allows subsequent uploads (of the same files) to skip the "
"md5 generation process (if the files appear to not have been "
"modified since the last time they were submitted). The cache is "
"stored locally and uses a file's modification time and file size "
"to intelligently guess whether the file has changed. Set this "
"flag to False if there is concern that files may not be getting "
"re-uploaded properly. "
"Note that this flag is only active when --local_upload is True."),
choices=[False, True],
type=cast_to_bool,
default=None)

uploader_parser.add_argument("--thread_count",
type=int,
default=conductor.CONFIG.get("thread_count"),
help=('The number of threads that should download simultaneously'))
help=('The number of threads that should upload simultaneously'))

uploader_parser.add_argument("--alt",
help=('Run an alternative version of the downloader'),
Expand Down Expand Up @@ -427,13 +447,18 @@ def run_submit(args):


def run_uploader(args):
'''
Run the Uploader
If the user has indicated to use the alternative uploader (and the system is not on Windows)
then run the alternative uploader. Otherwise run the standard uploader.
'''
args_dict = vars(args)
if sys.platform == "win32":
uploader.run_uploader(args)
if args_dict.get("alt"):
uploader_v2.run_uploader(args)
else:
uploader.run_uploader(args)
use_alt = bool(args_dict.pop("alt", False))

if use_alt and sys.platform != "win32":
return uploader_v2.run_uploader(args)

return uploader.run_uploader(args)


def run_downloader(args):
Expand Down
13 changes: 12 additions & 1 deletion conductor/lib/api_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import gzip
import json
import logging
import os
import requests
import StringIO
import time
import urlparse
import jwt
Expand Down Expand Up @@ -57,7 +59,7 @@ def _make_request(self, verb, conductor_url, headers, params, data, raise_on_err

def make_request(self, uri_path="/", headers=None, params=None, data=None,
verb=None, conductor_url=None, raise_on_error=True, tries=5,
use_api_key=False):
compress=False, use_api_key=False):
'''
verb: PUT, POST, GET, DELETE, HEAD, PATCH
'''
Expand Down Expand Up @@ -91,6 +93,15 @@ def make_request(self, uri_path="/", headers=None, params=None, data=None,

assert verb in self.http_verbs, "Invalid http verb: %s" % verb

# GZip Compress the content of the request
if compress:
headers["Content-Encoding"] = "gzip"
logger.debug("gzipping content...")
out_file = StringIO.StringIO()
with gzip.GzipFile(fileobj=out_file, mode="wb") as gzipper:
gzipper.write(data)
data = out_file.getvalue()

# Create a retry wrapper function
retry_wrapper = common.DecRetry(retry_exceptions=CONNECTION_EXCEPTIONS,
tries=tries)
Expand Down
20 changes: 20 additions & 0 deletions conductor/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import multiprocessing
import os
import platform
from pprint import pformat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from x import y go after import x statements.

eg. this is what isort gives me:

import base64
import datetime
import functools
import hashlib
import json
import logging
import multiprocessing
import os
import platform
import random
import signal
import subprocess
import sys
import time
import traceback
from pprint import pformat

import yaml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably decide as a team on how we want to sort imports. I haven't seen much in google style guide or pep8 (aside from grouping imports by builtin, thirdparty, internal).
Personally, I sort imports alphabetically, considering the uppermost namespace (parent packages) when evaluating order. I don't distinguish/consider whether it's a from vs import. I find that it makes it easier for me to look something up, if there is one continuous alphabetical ordering, rather than having multiple sections of their own ordering. But we're all weird humans, thinking in weird ways.

also, I noticed that isort has several different options that make dramatic differences (--no-sections, --order-by-type, --project, --thirdparty , etc). Might be worth playing around with these to find a workflow something that we all like (dislike ;) ).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, let's make this a team decision.

import random
import signal
import subprocess
Expand Down Expand Up @@ -671,3 +672,22 @@ class TmpLoader(loader):

with open(filepath) as f:
return yaml.load(f, loader) # nosec (ignore bandit static analysis warning for not using safe_load [B506:yaml_load] )


def sstr(object_, char_count=1000, pretty=True):
'''
Return a string representation of the given object, shortened to the given
char_count. This can be useful when printing/logging out data for debugging
purposes, but don't want an overwhelming wall of text to scroll through.

pretty: bool. If true, will pretty print the object
'''

try:
s_str = pformat(object_) if pretty else str(object_)
except Exception:
s_str = "<object cannot be cast to string (%s)>" % type(object_)

if len(s_str) > char_count:
s_str = s_str[:char_count] + "...<TRUNCATED>"
return s_str
2 changes: 1 addition & 1 deletion conductor/lib/conductor_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ def validate_args(self):
if self.gpu_config.get("type") not in supported_gpu_types:
raise BadArgumentError("GPU type %s is not one of %s" % (self.gpu_config.get("type"), supported_gpu_types))


def send_job(self, upload_files, upload_size):
'''
Construct args for two different cases:
Expand Down Expand Up @@ -378,6 +377,7 @@ def send_job(self, upload_files, upload_size):
logger.info("Sending Job...")
response, response_code = self.api_client.make_request(uri_path="jobs/",
data=json.dumps(submit_dict),
compress=True,
raise_on_error=False,
use_api_key=True)
if response_code not in [201, 204]:
Expand Down
Loading