diff --git a/.travis.yml b/.travis.yml index bee53e45..3c96d622 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,7 @@ env: - TOX_ENV=lint-docs - TOX_ENV=py26 SETUP=true - TOX_ENV=py27 SETUP=true + - TOX_ENV=py34 SETUP=true install: - pip install tox diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 6d9f0664..47e0e5f6 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -117,7 +117,7 @@ Before you submit a pull request, check that it meets these guidelines: 1. If the pull request adds functionality, the docs should ideally be updated. Put your new functionality into a function with a docstring. (Until the @jmchilton learns to do this consistently this is only a suggestion though.) -2. The pull request should work for Python 2.6 and 2.7. Check +2. The pull request should work for Python 2.6, 2.7, and 3.4. Check https://travis-ci.org/galaxyproject/planemo/pull_requests and make sure that the tests pass for all supported Python versions. The tests are imperfect and Travis sometimes fails in a transient fashion so diff --git a/HISTORY.rst b/HISTORY.rst index b0da9494..1f9faf7b 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -7,6 +7,7 @@ History 0.4.0.dev0 ------------------------ +* Python 3 support. ------------------------ 0.3.0 (2015-04-12) diff --git a/galaxy/objectstore/__init__.py b/galaxy/objectstore/__init__.py index 5cd54c18..7b8971a5 100644 --- a/galaxy/objectstore/__init__.py +++ b/galaxy/objectstore/__init__.py @@ -184,7 +184,7 @@ class DiskObjectStore(ObjectStore): >>> import tempfile >>> file_path=tempfile.mkdtemp() >>> obj = Bunch(id=1) - >>> s = DiskObjectStore(Bunch(umask=077, job_working_directory=file_path, new_file_path=file_path, object_store_check_old_style=False), file_path=file_path) + >>> s = DiskObjectStore(Bunch(umask=0o077, job_working_directory=file_path, new_file_path=file_path, object_store_check_old_style=False), file_path=file_path) >>> s.create(obj) >>> s.exists(obj) True @@ -287,7 +287,7 @@ def create(self, obj, **kwargs): # Create the file if it does not exist if not dir_only: open(path, 'w').close() # Should be rb? - umask_fix_perms(path, self.config.umask, 0666) + umask_fix_perms(path, self.config.umask, 0o666) def empty(self, obj, **kwargs): return os.path.getsize(self.get_filename(obj, **kwargs)) == 0 @@ -311,7 +311,7 @@ def delete(self, obj, entire_dir=False, **kwargs): if self.exists(obj, **kwargs): os.remove(path) return True - except OSError, ex: + except OSError as ex: log.critical('%s delete error %s' % (self._get_filename(obj, **kwargs), ex)) return False @@ -344,7 +344,7 @@ def update_from_file(self, obj, file_name=None, create=False, **kwargs): force_symlink( os.readlink( file_name ), self.get_filename( obj, **kwargs ) ) else: shutil.copy( file_name, self.get_filename( obj, **kwargs ) ) - except IOError, ex: + except IOError as ex: log.critical('Error copying %s to %s: %s' % (file_name, self._get_filename(obj, **kwargs), ex)) raise ex diff --git a/galaxy/objectstore/rods.py b/galaxy/objectstore/rods.py index 4754156a..c03d6ea3 100644 --- a/galaxy/objectstore/rods.py +++ b/galaxy/objectstore/rods.py @@ -152,7 +152,7 @@ def create(self, obj, **kwargs): # that we can prevent overwriting doi = irods.dataObjInp_t() doi.objPath = rods_path - doi.createMode = 0640 + doi.createMode = 0o640 doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable irods.addKeyVal( doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource ) status = irods.rcDataObjCreate( self.rods_conn, doi ) @@ -195,7 +195,7 @@ def delete( self, obj, entire_dir=False, **kwargs ): return True except AttributeError: log.warning( 'delete(): operation failed: object does not exist: %s', rods_path ) - except AssertionError, e: + except AssertionError as e: # delete() does not raise on deletion failure log.error( 'delete(): operation failed: %s', e ) finally: @@ -281,7 +281,7 @@ def update_from_file(self, obj, file_name=None, create=False, **kwargs): # put will create if necessary doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path( obj, **kwargs ) - doi.createMode = 0640 + doi.createMode = 0o640 doi.dataSize = os.stat( file_name ).st_size doi.numThreads = 0 irods.addKeyVal( doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource ) diff --git a/galaxy/objectstore/s3.py b/galaxy/objectstore/s3.py index 361c40a9..3dc2c5a0 100644 --- a/galaxy/objectstore/s3.py +++ b/galaxy/objectstore/s3.py @@ -192,13 +192,13 @@ def _get_bucket(self, bucket_name): def _fix_permissions(self, rel_path): """ Set permissions on rel_path""" for basedir, _, files in os.walk(rel_path): - umask_fix_perms(basedir, self.config.umask, 0777, self.config.gid) + umask_fix_perms(basedir, self.config.umask, 0o777, self.config.gid) for filename in files: path = os.path.join(basedir, filename) # Ignore symlinks if os.path.islink(path): continue - umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) + umask_fix_perms( path, self.config.umask, 0o666, self.config.gid ) def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): rel_path = os.path.join(*directory_hash_id(obj.id)) @@ -439,7 +439,7 @@ def size(self, obj, **kwargs): if self._in_cache(rel_path): try: return os.path.getsize(self._get_cache_path(rel_path)) - except OSError, ex: + except OSError as ex: log.info("Could not get size of file '%s' in local cache, will try S3. Error: %s", rel_path, ex) elif self.exists(obj, **kwargs): return self._get_size_in_s3(rel_path) diff --git a/galaxy/tools/deps/resolvers/modules.py b/galaxy/tools/deps/resolvers/modules.py index a07edcdb..84b013f0 100644 --- a/galaxy/tools/deps/resolvers/modules.py +++ b/galaxy/tools/deps/resolvers/modules.py @@ -8,7 +8,7 @@ """ from os import environ, pathsep from os.path import exists, isdir, join -from StringIO import StringIO +from six import StringIO from subprocess import Popen, PIPE from ..resolvers import DependencyResolver, INDETERMINATE_DEPENDENCY, Dependency @@ -107,13 +107,13 @@ def has_module(self, module, version): for module_name, module_version in module_generator: names_match = module == module_name - module_match = names_match and (version == None or module_version == version) + module_match = names_match and (version is None or module_version == version) if module_match: return True return False def __modules(self): - raw_output = self.__module_avail_output() + raw_output = self.__module_avail_output().decode("utf-8") for line in StringIO(raw_output): line = line and line.strip() if not line or line.startswith("-"): diff --git a/galaxy/util/odict.py b/galaxy/util/odict.py index 93679ae8..37b76c48 100644 --- a/galaxy/util/odict.py +++ b/galaxy/util/odict.py @@ -2,7 +2,14 @@ Ordered dictionary implementation. """ -from UserDict import UserDict +try: + from galaxy import eggs + eggs.require("six") +except ImportError: + # Allow code to operate outside Galaxy. + pass + +from six.moves import UserDict class odict(UserDict): diff --git a/galaxy/util/pastescript/serve.py b/galaxy/util/pastescript/serve.py index 4bd3a0c9..2e6b23e4 100644 --- a/galaxy/util/pastescript/serve.py +++ b/galaxy/util/pastescript/serve.py @@ -1,3 +1,4 @@ +from __future__ import print_function # Most of this code is: # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) @@ -16,7 +17,7 @@ # code here, stripping out uneeded functionality. # All top level imports from each package moved here and organized -import ConfigParser +from six.moves.configparser import ConfigParser import atexit import errno import getpass @@ -31,7 +32,7 @@ import time from logging.config import fileConfig -from loadwsgi import loadapp, loadserver +from .loadwsgi import loadapp, loadserver difflib = None @@ -367,15 +368,15 @@ def run(self, args): commands = get_commands().items() commands.sort() if not commands: - print 'No commands registered.' - print 'Have you installed Paste Script?' - print '(try running python setup.py develop)' + print('No commands registered.') + print('Have you installed Paste Script?') + print('(try running python setup.py develop)') return 2 - print 'Known commands:' + print('Known commands:') longest = max([len(n) for n, c in commands]) for name, command in commands: - print ' %s %s' % (self.pad(name, length=longest), - command.load().summary) + print(' %s %s' % (self.pad(name, length=longest), + command.load().summary)) return 2 @@ -542,7 +543,7 @@ def command(self): if os.environ.get(self._reloader_environ_key): from paste import reloader if self.verbose > 1: - print 'Running reloading file monitor' + print('Running reloading file monitor') reloader.install(int(self.options.reload_interval)) if self.requires_config_file: reloader.watch_file(self.args[0]) @@ -559,7 +560,7 @@ def command(self): if cmd == 'restart' or cmd == 'stop': result = self.stop_daemon() if result: - print "Could not stop daemon" + print("Could not stop daemon") # It's ok to continue trying to restart if stop_daemon returns # a 1, otherwise shortcut and return. if cmd == 'restart' and result != 1: @@ -594,7 +595,7 @@ def command(self): if self.options.log_file: try: writeable_log_file = open(self.options.log_file, 'a') - except IOError, ioe: + except IOError as ioe: msg = 'Error: Unable to write to log file: %s' % ioe raise BadCommand(msg) writeable_log_file.close() @@ -603,7 +604,7 @@ def command(self): if self.options.pid_file: try: writeable_pid_file = open(self.options.pid_file, 'a') - except IOError, ioe: + except IOError as ioe: msg = 'Error: Unable to write to pid file: %s' % ioe raise BadCommand(msg) writeable_pid_file.close() @@ -611,9 +612,9 @@ def command(self): if getattr(self.options, 'daemon', False): try: self.daemonize() - except DaemonizeException, ex: + except DaemonizeException as ex: if self.verbose > 0: - print str(ex) + print(str(ex)) return if (self.options.monitor_restart @@ -647,24 +648,24 @@ def command(self): msg = 'Starting server in PID %i.' % os.getpid() else: msg = 'Starting server.' - print msg + print(msg) def serve(): try: server(app) - except (SystemExit, KeyboardInterrupt), e: + except (SystemExit, KeyboardInterrupt) as e: if self.verbose > 1: raise if str(e): msg = ' '+str(e) else: msg = '' - print 'Exiting%s (-v to see traceback)' % msg + print('Exiting%s (-v to see traceback)' % msg) if jython_monitor: # JythonMonitor has to be ran from the main thread threading.Thread(target=serve).start() - print 'Starting Jython file monitor' + print('Starting Jython file monitor') jython_monitor.periodic_reload() else: serve() @@ -677,7 +678,7 @@ def daemonize(self): % (pid, self.options.pid_file)) if self.verbose > 0: - print 'Entering daemon mode' + print('Entering daemon mode') pid = os.fork() if pid: # The forked process also has a handle on resources, so we @@ -716,7 +717,7 @@ def daemonize(self): def record_pid(self, pid_file): pid = os.getpid() if self.verbose > 1: - print 'Writing PID %s to %s' % (pid, pid_file) + print('Writing PID %s to %s' % (pid, pid_file)) f = open(pid_file, 'w') f.write(str(pid)) f.close() @@ -725,19 +726,19 @@ def record_pid(self, pid_file): def stop_daemon(self): pid_file = self.options.pid_file or 'paster.pid' if not os.path.exists(pid_file): - print 'No PID file exists in %s' % pid_file + print('No PID file exists in %s' % pid_file) return 1 pid = read_pidfile(pid_file) if not pid: - print "Not a valid PID file in %s" % pid_file + print("Not a valid PID file in %s" % pid_file) return 1 pid = live_pidfile(pid_file) if not pid: - print "PID in %s is not valid (deleting)" % pid_file + print("PID in %s is not valid (deleting)" % pid_file) try: os.unlink(pid_file) - except (OSError, IOError), e: - print "Could not delete: %s" % e + except (OSError, IOError) as e: + print("Could not delete: %s" % e) return 2 return 1 for j in range(10): @@ -747,7 +748,7 @@ def stop_daemon(self): os.kill(pid, signal.SIGTERM) time.sleep(1) else: - print "failed to kill web process %s" % pid + print("failed to kill web process %s" % pid) return 3 if os.path.exists(pid_file): os.unlink(pid_file) @@ -756,17 +757,17 @@ def stop_daemon(self): def show_status(self): pid_file = self.options.pid_file or 'paster.pid' if not os.path.exists(pid_file): - print 'No PID file %s' % pid_file + print('No PID file %s' % pid_file) return 1 pid = read_pidfile(pid_file) if not pid: - print 'No PID in file %s' % pid_file + print('No PID in file %s' % pid_file) return 1 pid = live_pidfile(pid_file) if not pid: - print 'PID %s in %s is not running' % (pid, pid_file) + print('PID %s in %s is not running' % (pid, pid_file)) return 1 - print 'Server running in PID %s' % pid + print('Server running in PID %s' % pid) return 0 def restart_with_reloader(self): @@ -775,9 +776,9 @@ def restart_with_reloader(self): def restart_with_monitor(self, reloader=False): if self.verbose > 0: if reloader: - print 'Starting subprocess with file monitor' + print('Starting subprocess with file monitor') else: - print 'Starting subprocess with monitor parent' + print('Starting subprocess with monitor parent') while 1: args = [self.quote_first_command_arg(sys.executable)] + sys.argv new_environ = os.environ.copy() @@ -793,7 +794,7 @@ def restart_with_monitor(self, reloader=False): exit_code = proc.wait() proc = None except KeyboardInterrupt: - print '^C caught in monitor process' + print('^C caught in monitor process') if self.verbose > 1: raise return 1 @@ -812,7 +813,7 @@ def restart_with_monitor(self, reloader=False): if exit_code != 3: return exit_code if self.verbose > 0: - print '-'*20, 'Restarting', '-'*20 + print('-'*20, 'Restarting', '-'*20) def change_user_group(self, user, group): if not user and not group: @@ -844,8 +845,8 @@ def change_user_group(self, user, group): gid = entry.pw_gid uid = entry.pw_uid if self.verbose > 0: - print 'Changing user to %s:%s (%s:%s)' % ( - user, group or '(unknown)', uid, gid) + print('Changing user to %s:%s (%s:%s)' % ( + user, group or '(unknown)', uid, gid)) if hasattr(os, 'initgroups'): os.initgroups(user, gid) else: @@ -903,7 +904,7 @@ def live_pidfile(pidfile): try: os.kill(int(pid), 0) return pid - except OSError, e: + except OSError as e: if e.errno == errno.EPERM: return pid return None @@ -937,26 +938,26 @@ def _remove_pid_file(written_pid, filename, verbosity): pass else: if pid_in_file != current_pid: - print "PID file %s contains %s, not expected PID %s" % ( - filename, pid_in_file, current_pid) + print("PID file %s contains %s, not expected PID %s" % ( + filename, pid_in_file, current_pid)) return if verbosity > 0: - print "Removing PID file %s" % filename + print("Removing PID file %s" % filename) try: os.unlink(filename) return - except OSError, e: + except OSError as e: # Record, but don't give traceback - print "Cannot remove PID file: %s" % e + print("Cannot remove PID file: %s" % e) # well, at least lets not leave the invalid PID around... try: f = open(filename, 'w') f.write('') f.close() - except OSError, e: - print 'Stale PID left in file: %s (%e)' % (filename, e) + except OSError as e: + print('Stale PID left in file: %s (%e)' % (filename, e)) else: - print 'Stale PID removed' + print('Stale PID removed') def ensure_port_cleanup(bound_addresses, maxtries=30, sleeptime=2): @@ -981,7 +982,7 @@ def _cleanup_ports(bound_addresses, maxtries=30, sleeptime=2): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect(bound_address) - except socket.error, e: + except socket.error as e: if e.args[0] != errno.ECONNREFUSED: raise break @@ -1039,7 +1040,7 @@ def run(args=None): if options.do_help: args = ['help'] + args if not args: - print 'Usage: %s COMMAND' % sys.argv[0] + print('Usage: %s COMMAND' % sys.argv[0]) args = ['help'] command_name = args[0] if command_name not in commands: @@ -1053,7 +1054,7 @@ def invoke(command, command_name, options, args): try: runner = command(command_name) exit_code = runner.run(args) - except BadCommand, e: - print e.message + except BadCommand as e: + print(e.message) exit_code = e.exit_code sys.exit(exit_code) diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index e81b4cce..44b633a7 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -83,7 +83,9 @@ class FileActionMapper(object): ... f.close() ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None) ... mapper = FileActionMapper(mock_client) - ... mapper = FileActionMapper(config=mapper.to_dict()) # Serialize and deserialize it to make sure still works + ... as_dict = config=mapper.to_dict() + ... # print(as_dict["paths"]) + ... mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works ... unlink(f.name) ... return mapper >>> mapper = mapper_for(default_action='none', config_contents=json_string) @@ -171,7 +173,7 @@ def to_dict(self): ssh_user=self.ssh_user, ssh_port=self.ssh_port, ssh_host=self.ssh_host, - paths=map(lambda m: m.to_dict(), self.mappers) + paths=list(map(lambda m: m.to_dict(), self.mappers)) ) def __client_to_config(self, client): @@ -261,7 +263,7 @@ def unstructured_map(self, path_helper): if self.staging_needed: # To ensure uniqueness, prepend unique prefix to each name prefix = unique_path_prefix(self.path) - for path, name in unstructured_map.iteritems(): + for path, name in unstructured_map.items(): unstructured_map[path] = join(prefix, name) else: path_rewrites = {} @@ -446,7 +448,7 @@ def _serialized_key(self): def __serialize_ssh_key(self): f = tempfile.NamedTemporaryFile(delete=False) if self.ssh_key is not None: - f.write(self.ssh_key) + f.write(self.ssh_key.encode("utf-8")) else: raise Exception("SSH_KEY not available") return f.name @@ -644,7 +646,7 @@ def to_dict(self): def mappers_from_dicts(mapper_def_list): - return map(lambda m: _mappper_from_dict(m), mapper_def_list) + return list(map(lambda m: _mappper_from_dict(m), mapper_def_list)) def _mappper_from_dict(mapper_dict): diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 27bb22ab..d7f3439b 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -1,12 +1,14 @@ import os -from json import dumps -from json import loads + +from six import string_types from .destination import submit_params from .setup_handler import build as build_setup_handler from .job_directory import RemoteJobDirectory from .decorators import parseJson from .decorators import retry +from .util import json_dumps +from .util import json_loads from .util import copy from .util import ensure_directory from .util import to_base64_json @@ -109,19 +111,19 @@ def launch(self, command_line, dependencies_description=None, env=[], remote_sta launch_params = dict(command_line=command_line, job_id=self.job_id) submit_params_dict = submit_params(self.destination_params) if submit_params_dict: - launch_params['params'] = dumps(submit_params_dict) + launch_params['params'] = json_dumps(submit_params_dict) if dependencies_description: - launch_params['dependencies_description'] = dumps(dependencies_description.to_dict()) + launch_params['dependencies_description'] = json_dumps(dependencies_description.to_dict()) if env: - launch_params['env'] = dumps(env) + launch_params['env'] = json_dumps(env) if remote_staging: - launch_params['remote_staging'] = dumps(remote_staging) + launch_params['remote_staging'] = json_dumps(remote_staging) if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from # destination arguments. Hence, must have Pulsar setup job # before queueing. setup_params = _setup_params_from_job_config(job_config) - launch_params["setup_params"] = dumps(setup_params) + launch_params["setup_params"] = json_dumps(setup_params) return self._raw_execute("submit", launch_params) def full_status(self): @@ -174,10 +176,12 @@ def put_file(self, path, input_type, name=None, contents=None, action_type='tran # action type == 'message' should either copy or transfer # depending on default not just fallback to transfer. if action_type in ['transfer', 'message']: + if isinstance(contents, string_types): + contents = contents.encode("utf-8") return self._upload_file(args, contents, input_path) elif action_type == 'copy': path_response = self._raw_execute('path', args) - pulsar_path = loads(path_response)['path'] + pulsar_path = json_loads(path_response)['path'] copy(path, pulsar_path) return {'path': pulsar_path} diff --git a/pulsar/client/config_util.py b/pulsar/client/config_util.py index ab5453d0..21103bff 100644 --- a/pulsar/client/config_util.py +++ b/pulsar/client/config_util.py @@ -1,12 +1,14 @@ """ Generic interface for reading YAML/INI/JSON config files into nested dictionaries. """ +import codecs try: from galaxy import eggs eggs.require('PyYAML') except Exception: # If not in Galaxy, ignore this. pass + try: import yaml except ImportError: @@ -70,8 +72,9 @@ def __read_ini(path): def __read_json(path): + reader = codecs.getreader("utf-8") with open(path, "rb") as f: - return json.load(f) + return json.load(reader(f)) EXT_READERS = { CONFIG_TYPE_JSON: __read_json, diff --git a/pulsar/client/decorators.py b/pulsar/client/decorators.py index 94035f5b..659b5c63 100644 --- a/pulsar/client/decorators.py +++ b/pulsar/client/decorators.py @@ -1,5 +1,6 @@ import time -import json + +from .util import json_loads MAX_RETRY_COUNT = 5 RETRY_SLEEP_TIME = 0.1 @@ -10,7 +11,7 @@ class parseJson(object): def __call__(self, func): def replacement(*args, **kwargs): response = func(*args, **kwargs) - return json.loads(response) + return json_loads(response) return replacement diff --git a/pulsar/client/interface.py b/pulsar/client/interface.py index af13a02e..9d625f72 100644 --- a/pulsar/client/interface.py +++ b/pulsar/client/interface.py @@ -2,10 +2,7 @@ from abc import abstractmethod from string import Template -try: - from StringIO import StringIO as BytesIO -except ImportError: - from io import BytesIO +from six import BytesIO try: from six import text_type except ImportError: @@ -145,7 +142,7 @@ def execute(self, command, args={}, data=None, input_path=None, output_path=None def __build_body(self, data, input_path): if data is not None: - return BytesIO(data.encode('utf-8')) + return BytesIO(data) elif input_path is not None: return open(input_path, 'rb') else: diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index f403fd17..0e06c909 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -144,6 +144,8 @@ def shutdown(self, ensure_cleanup=False): def __nonzero__(self): return self.active + __bool__ = __nonzero__ # Both needed Py2 v 3 + def get_client(self, destination_params, job_id, **kwargs): if job_id is None: raise Exception("Cannot generate Pulsar client for empty job_id.") diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index cfb901d5..00191009 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -94,7 +94,7 @@ def __collect_outputs(self): if output_generated: self._attempt_collect_output('output', output_file) - for galaxy_path, pulsar in self.pulsar_outputs.output_extras(output_file).iteritems(): + for galaxy_path, pulsar in self.pulsar_outputs.output_extras(output_file).items(): self._attempt_collect_output('output', path=galaxy_path, name=pulsar) # else not output generated, do not attempt download. diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 6fd01136..b88bfea2 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -142,7 +142,7 @@ def __initialize_referenced_arbitrary_files(self): for path in paths: if path not in referenced_arbitrary_path_mappers: referenced_arbitrary_path_mappers[path] = mapper - for path, mapper in referenced_arbitrary_path_mappers.iteritems(): + for path, mapper in referenced_arbitrary_path_mappers.items(): action = self.action_mapper.action(path, path_type.UNSTRUCTURED, mapper) unstructured_map = action.unstructured_map(self.path_helper) self.arbitrary_files.update(unstructured_map) @@ -152,7 +152,7 @@ def __upload_tool_files(self): self.transfer_tracker.handle_transfer(referenced_tool_file, path_type.TOOL) def __upload_arbitrary_files(self): - for path, name in self.arbitrary_files.iteritems(): + for path, name in self.arbitrary_files.items(): self.transfer_tracker.handle_transfer(path, path_type.UNSTRUCTURED, name=name) def __upload_input_files(self): diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index 8c50926f..5dc739ef 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -14,6 +14,8 @@ import traceback from io import open +from six import binary_type + from pulsar.client import submit_job from pulsar.client import finish_job from pulsar.client import PulsarOutputs @@ -203,10 +205,7 @@ def run(options): __assert_contents(temp_output3_path, "moo_override", result_status) else: __assert_contents(temp_output3_path, "moo_default", result_status) - if client.default_file_action != "none": - rewritten_index_path = open(temp_output4_path, 'r', encoding='utf-8').read() - # Path written to this file will differ between Windows and Linux. - assert re.search(r"123456[/\\]unstructured[/\\]\w+[/\\]bwa[/\\]human.fa", rewritten_index_path) is not None + __assert_has_rewritten_bwa_path(client, temp_output4_path) __exercise_errors(options, client, temp_output_path, temp_directory) client_manager.shutdown() except BaseException: @@ -267,7 +266,10 @@ def shutdown(self): def __assert_contents(path, expected_contents, pulsar_state): if not os.path.exists(path): raise AssertionError("File %s not created. Final Pulsar response state [%s]" % (path, pulsar_state)) - file = open(path, 'r', encoding="utf-8") + if isinstance(expected_contents, binary_type): + file = open(path, 'rb') + else: + file = open(path, 'r', encoding="utf-8") try: contents = file.read() if contents != expected_contents: @@ -278,6 +280,14 @@ def __assert_contents(path, expected_contents, pulsar_state): file.close() +def __assert_has_rewritten_bwa_path(client, temp_output4_path): + if client.default_file_action != "none": + rewritten_index_path = open(temp_output4_path, 'r', encoding='utf-8').read() + # Path written to this file will differ between Windows and Linux. + if re.search(r"123456[/\\]unstructured[/\\]\w+[/\\]bwa[/\\]human.fa", rewritten_index_path) is None: + raise AssertionError("[%s] does not container rewritten path." % rewritten_index_path) + + def __exercise_errors(options, client, temp_output_path, temp_directory): """ Exercise error conditions. @@ -321,7 +331,7 @@ def __client(temp_directory, options): if default_file_action in ["remote_scp_transfer", "remote_rsync_transfer"]: test_key = os.environ["PULSAR_TEST_KEY"] if not test_key.startswith("----"): - test_key = open(test_key, "rb").read() + test_key = open(test_key, "r").read() client_options["ssh_key"] = test_key client_options["ssh_user"] = os.environ.get("USER") client_options["ssh_port"] = 22 diff --git a/pulsar/client/transport/curl.py b/pulsar/client/transport/curl.py index 983b9786..90c5aa98 100644 --- a/pulsar/client/transport/curl.py +++ b/pulsar/client/transport/curl.py @@ -1,7 +1,11 @@ try: - from cStringIO import StringIO + from galaxy import eggs + eggs.require("six") except ImportError: - from io import StringIO + pass + +from six import string_types +from six import BytesIO try: from pycurl import Curl, HTTP_CODE @@ -36,7 +40,7 @@ def execute(self, url, method=None, data=None, input_path=None, output_path=None c.setopt(c.INFILESIZE, filesize) if data: c.setopt(c.POST, 1) - if type(data).__name__ == 'unicode': + if isinstance(data, string_types): data = data.encode('UTF-8') c.setopt(c.POSTFIELDS, data) c.perform() @@ -76,7 +80,7 @@ def get_file(url, path): def _open_output(output_path): - return open(output_path, 'wb') if output_path else StringIO() + return open(output_path, 'wb') if output_path else BytesIO() def _new_curl_object_for_url(url): diff --git a/pulsar/client/util.py b/pulsar/client/util.py index 04908db6..560fb677 100644 --- a/pulsar/client/util.py +++ b/pulsar/client/util.py @@ -1,3 +1,4 @@ +from functools import wraps from threading import Lock, Event from weakref import WeakValueDictionary from os import walk @@ -8,12 +9,32 @@ import hashlib import shutil import json -import base64 +import sys + +from six import binary_type + +# Variant of base64 compat layer inspired by BSD code from Bcfg2 +# https://github.com/Bcfg2/bcfg2/blob/maint/src/lib/Bcfg2/Compat.py +if sys.version_info >= (3, 0): + from base64 import b64encode as _b64encode, b64decode as _b64decode + + @wraps(_b64encode) + def b64encode(val, **kwargs): + try: + return _b64encode(val, **kwargs) + except TypeError: + return _b64encode(val.encode('UTF-8'), **kwargs).decode('UTF-8') + + @wraps(_b64decode) + def b64decode(val, **kwargs): + return _b64decode(val.encode('UTF-8'), **kwargs).decode('UTF-8') +else: + from base64 import b64encode, b64decode def unique_path_prefix(path): m = hashlib.md5() - m.update(path) + m.update(path.encode('utf-8')) return m.hexdigest() @@ -75,15 +96,17 @@ def filter_destination_params(destination_params, prefix): def to_base64_json(data): """ - >>> x = from_base64_json(to_base64_json(dict(a=5))) - >>> x["a"] + >>> enc = to_base64_json(dict(a=5)) + >>> dec = from_base64_json(enc) + >>> dec["a"] 5 """ - return base64.b64encode(json.dumps(data)) + dumped = json_dumps(data) + return b64encode(dumped) def from_base64_json(data): - return json.loads(base64.b64decode(data)) + return json.loads(b64decode(data)) class PathHelper(object): @@ -175,3 +198,23 @@ def release(self): def fail(self): self.failed = True + + +def json_loads(obj): + if isinstance(obj, binary_type): + obj = obj.decode("utf-8") + return json.loads(obj) + + +def json_dumps(obj): + if isinstance(obj, binary_type): + obj = obj.decode("utf-8") + return json.dumps(obj, cls=ClientJsonEncoder) + + +class ClientJsonEncoder(json.JSONEncoder): + + def default(self, obj): + if isinstance(obj, binary_type): + return obj.decode("utf-8") + return json.JSONEncoder.default(self, obj) diff --git a/pulsar/core.py b/pulsar/core.py index efe180a0..35bda94c 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -134,4 +134,4 @@ def only_manager(self): # convience method for tests, etc... where when we know there # is only one manager. assert len(self.managers) == 1 - return self.managers.values()[0] + return list(self.managers.values())[0] diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index 2ed6dc50..707da732 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -28,10 +28,10 @@ def __job_complete_dict(complete_status, manager, job_id): return_code = manager.return_code(job_id) if return_code == PULSAR_UNKNOWN_RETURN_CODE: return_code = None - stdout_contents = manager.stdout_contents(job_id) - stderr_contents = manager.stderr_contents(job_id) + stdout_contents = manager.stdout_contents(job_id).decode("utf-8") + stderr_contents = manager.stderr_contents(job_id).decode("utf-8") job_directory = manager.job_directory(job_id) - return dict( + as_dict = dict( job_id=job_id, complete="true", # Is this still used or is it legacy. status=complete_status, @@ -43,6 +43,7 @@ def __job_complete_dict(complete_status, manager, job_id): outputs_directory_contents=job_directory.outputs_directory_contents(), system_properties=manager.system_properties(), ) + return as_dict def submit_job(manager, job_config): diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 8b4633c4..9f3cd682 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -302,7 +302,7 @@ def load_metadata(self, metadata_name, default=None): if contents is DEFAULT_RAW: return default else: - return json.loads(contents) + return json.loads(contents.decode()) def has_metadata(self, metadata_name): return self.contains_file(metadata_name) diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 445bfcf3..3b57b97d 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -30,10 +30,10 @@ def return_code(self, job_id): return int(return_code_str) if return_code_str and return_code_str != PULSAR_UNKNOWN_RETURN_CODE else return_code_str def stdout_contents(self, job_id): - return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, default="") + return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, default=b"") def stderr_contents(self, job_id): - return self._read_job_file(job_id, JOB_FILE_STANDARD_ERROR, default="") + return self._read_job_file(job_id, JOB_FILE_STANDARD_ERROR, default=b"") def _stdout_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_OUTPUT) diff --git a/pulsar/managers/base/external.py b/pulsar/managers/base/external.py index b753641e..14c62f94 100644 --- a/pulsar/managers/base/external.py +++ b/pulsar/managers/base/external.py @@ -4,6 +4,7 @@ from pulsar.managers import status from .directory import DirectoryBaseManager +from six import binary_type DEFAULT_JOB_NAME_TEMPLATE = "pulsar_$job_id" JOB_FILE_EXTERNAL_ID = "external_id" @@ -50,6 +51,8 @@ def _get_job_id(self, input_job_id): return str(self.id_assigner(input_job_id)) def _register_external_id(self, job_id, external_id): + if isinstance(external_id, binary_type): + external_id = external_id.decode("utf-8") self._job_directory(job_id).store_metadata(JOB_FILE_EXTERNAL_ID, external_id) self._external_ids[job_id] = external_id return external_id diff --git a/pulsar/messaging/__init__.py b/pulsar/messaging/__init__.py index a0b53a91..720486a5 100644 --- a/pulsar/messaging/__init__.py +++ b/pulsar/messaging/__init__.py @@ -33,6 +33,8 @@ def deactivate(self): def __nonzero__(self): return self.active + __bool__ = __nonzero__ # Both needed Py2 v 3 + def join(self, timeout=None): for t in self.threads: t.join(timeout) diff --git a/pulsar/messaging/bind_amqp.py b/pulsar/messaging/bind_amqp.py index 3f489ebe..b0051ab9 100644 --- a/pulsar/messaging/bind_amqp.py +++ b/pulsar/messaging/bind_amqp.py @@ -21,7 +21,7 @@ def get_exchange(connection_string, manager_name, conf): # HACK: Fixup non-string parameters - utlimately this should reuse spec # stuff from Galaxy. - for param, to_type in TYPED_PARAMS.iteritems(): + for param, to_type in TYPED_PARAMS.items(): if param in conf: val = conf[param] conf[param] = to_type(val) diff --git a/pulsar/web/framework.py b/pulsar/web/framework.py index c913552c..d94f6942 100644 --- a/pulsar/web/framework.py +++ b/pulsar/web/framework.py @@ -10,7 +10,7 @@ from os.path import exists import re -from json import dumps +from pulsar.client.util import json_dumps from six import Iterator @@ -158,7 +158,7 @@ def controller_replacement(environ, start_response, **args): def body(self, result): body = 'OK' if self.response_type == 'json': - body = dumps(result) + body = json_dumps(result) return body def _prepare_controller_args(self, req, args): @@ -184,6 +184,6 @@ def __iter__(self): def __next__(self): buffer = self.input.read(1024) - if(buffer == ""): + if(buffer == b""): raise StopIteration return buffer diff --git a/test/action_mapper_test.py b/test/action_mapper_test.py index 8d598a0a..696f87f7 100644 --- a/test/action_mapper_test.py +++ b/test/action_mapper_test.py @@ -12,7 +12,7 @@ def test_endpoint_validation(): mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') except Exception as e: exception_found = True - assert "files_endpoint" in e.message + assert "files_endpoint" in str(e) assert exception_found @@ -24,7 +24,7 @@ def test_ssh_key_validation(): mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') except Exception as e: exception_found = True - assert "ssh_key" in e.message + assert "ssh_key" in str(e) assert exception_found diff --git a/test/amqp_test.py b/test/amqp_test.py index 04e60359..d2206076 100644 --- a/test/amqp_test.py +++ b/test/amqp_test.py @@ -1,3 +1,4 @@ +import time import threading from .test_utils import skip_unless_module @@ -17,15 +18,13 @@ def test_amqp(): thread1.start() thread2.start() thread3.start() - manager1_exchange.publish("manager_test", "cow1") - manager2_exchange.publish("manager2_test", "cow2") - manager3_exchange.publish("manager3_test", "cow3") - thread1.join(1) - thread2.join(1) - thread3.join(1) - assert thread1.message == "cow1", thread1.message - assert thread2.message == "cow2", thread2.message - assert thread3.message == "cow3", thread3.message + manager1_exchange.publish("manager_test", u"cow1") + manager2_exchange.publish("manager2_test", u"cow2") + manager3_exchange.publish("manager3_test", u"cow3") + time.sleep(0.1) + thread1.wait_for_message(u"cow1") + thread2.wait_for_message(u"cow2") + thread3.wait_for_message(u"cow3") class TestThread(threading.Thread): @@ -40,9 +39,20 @@ def __init__(self, queue_name, exchange): def __nonzero__(self): return self.message is None + __bool__ = __nonzero__ # Both needed Py2 v 3 + def run(self): def callback(body, message): self.message = body message.ack() self.exchange.consume(self.queue_name, callback=callback, check=self) + + def wait_for_message(self, expected_message): + while self: + time.sleep(.05) + if self.message != expected_message: + msg = "Expected [%s], got [%s]." % (expected_message, self.message) + raise AssertionError(msg) + + self.join(2) diff --git a/test/client_staging_test.py b/test/client_staging_test.py index 4488cb09..d68372cd 100644 --- a/test/client_staging_test.py +++ b/test/client_staging_test.py @@ -45,9 +45,9 @@ def __setup_inputs(self): os.makedirs(files_directory) self.input1 = os.path.join(files_directory, "dataset_1.dat") self.input1_files_path = os.path.join(files_directory, "dataset_1_files") - open(self.input1, "wb").write(u"012345") + open(self.input1, "wb").write(b"012345") self.input2 = os.path.join(files_directory, "dataset_2.dat") - open(self.input2, "wb").write(u"6789") + open(self.input2, "wb").write(b"6789") return [self.input1, self.input2] def test_tool_file_rewrite(self): diff --git a/test/client_test.py b/test/client_test.py index 0795c36d..bf72339d 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -120,7 +120,7 @@ def test_launch(): """ Test the launch method of client. """ client = TestClient() request_checker = RequestChecker("jobs/543/submit", {"command_line": "python"}) - client.expect_open(request_checker, b'OK') + client.expect_open(request_checker, 'OK') client.launch("python") request_checker.assert_called() @@ -161,7 +161,7 @@ def test_upload_config(): temp_file.write("Hello World!") finally: temp_file.close() - modified_contents = "Hello World! " + modified_contents = b"Hello World! " request_checker = RequestChecker("jobs/543/files", {"name": os.path.basename(temp_file_path), "type": "config"}, modified_contents) client.expect_open(request_checker, b'{"path" : "C:\\\\tools\\\\foo"}') upload_result = client.put_file(temp_file_path, 'config', contents=modified_contents) @@ -194,7 +194,7 @@ def test_get_status_queued(): def test_kill(): client = TestClient() request_checker = RequestChecker("jobs/543/cancel") - client.expect_open(request_checker, b'OK') + client.expect_open(request_checker, 'OK') client.kill() request_checker.assert_called() @@ -202,6 +202,6 @@ def test_kill(): def test_clean(): client = TestClient() request_checker = RequestChecker("jobs/543") - client.expect_open(request_checker, b'OK') + client.expect_open(request_checker, 'OK') client.clean() request_checker.assert_called() diff --git a/test/client_transport_test.py b/test/client_transport_test.py index fe2339f0..c3a2cb6d 100644 --- a/test/client_transport_test.py +++ b/test/client_transport_test.py @@ -27,7 +27,7 @@ def _test_transport(transport): # Testing simple get response = transport.execute(request_url, data=None) - assert response.find("Test123") >= 0 + assert response.find(b"Test123") >= 0 # Testing writing to output file temp_file = NamedTemporaryFile(delete=True) @@ -45,11 +45,11 @@ def test_curl_put_get(): input = os.path.join(directory, "input") output = os.path.join(directory, "output") - open(input, "wb").write(u"helloworld") + open(input, "w").write(u"helloworld") post_file(request_url, input) get_file(request_url, output) - assert open(output, "rb").read() == u"helloworld" + assert open(output, "r").read() == u"helloworld" def test_curl_status_code(): diff --git a/test/integration_test.py b/test/integration_test.py index d9c04278..8082f86d 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -48,7 +48,11 @@ def _run_in_app(self, app_conf, **kwds): def _run_in_test_server(self, app_conf, **kwds): with test_pulsar_server(app_conf=app_conf) as server: options = Bunch(url=server.application_url, **kwds) - self._update_options_for_app(options, server.test_app.application, **kwds) + # TODO: sync Py 2 v 3 approach so following hack is unneeded. + app = server.test_app + if hasattr(app, "application"): + app = app.application + self._update_options_for_app(options, app, **kwds) run(options) def _run_direct(self, app_conf, **kwds): @@ -68,9 +72,9 @@ def __setup_job_properties(self, app_conf, job_conf_props): config = configparser.ConfigParser() section_name = "manager:_default_" config.add_section(section_name) - for key, value in job_conf_props.iteritems(): + for key, value in job_conf_props.items(): config.set(section_name, key, value) - with open(job_conf, "wb") as configf: + with open(job_conf, "w") as configf: config.write(configf) app_conf["job_managers_config"] = job_conf diff --git a/test/integration_test_restart.py b/test/integration_test_restart.py index 36185376..ba32c0cd 100644 --- a/test/integration_test_restart.py +++ b/test/integration_test_restart.py @@ -118,8 +118,12 @@ def join(self): self.thread.join(10) def wait_for_messages(self, n=1): + accumulate_time = 0.0 while len(self.messages) < n: - time.sleep(.05) + time.sleep(.1) + accumulate_time += 0.05 + if accumulate_time > 3.0: + raise Exception("Waited too long for messages.") def _run(self): self.exchange.consume("status_update", self._callback, check=self) @@ -130,3 +134,5 @@ def _callback(self, body, message): def __nonzero__(self): return self.active + + __bool__ = __nonzero__ # Both needed Py2 v 3 diff --git a/test/main_util_test.py b/test/main_util_test.py index 8ff8df3d..5c6c71db 100644 --- a/test/main_util_test.py +++ b/test/main_util_test.py @@ -1,6 +1,6 @@ """ Test utilities in pulsar.main """ from os.path import join -from test_utils import temp_directory +from .test_utils import temp_directory from pulsar import main diff --git a/test/pulsar_objectstore_test.py b/test/pulsar_objectstore_test.py index c801de0d..ec111815 100644 --- a/test/pulsar_objectstore_test.py +++ b/test/pulsar_objectstore_test.py @@ -15,7 +15,7 @@ def __write(self, contents, name): directory = dirname(path) if not exists(directory): makedirs(directory) - open(path, "w").write(contents) + open(path, "wb").write(contents) return path def test_pulsar_objectstore(self): @@ -73,10 +73,10 @@ def test_pulsar_objectstore(self): # Test get_data data = object_store.get_data(hello_world_dataset) - assert data == b"Hello World!" + assert data == "Hello World!" data = object_store.get_data(hello_world_dataset, start=1, count=6) - assert data == b"ello W" + assert data == "ello W" # Test Size diff --git a/test/scripts_config_test.py b/test/scripts_config_test.py index cc6a3030..4ba017bc 100644 --- a/test/scripts_config_test.py +++ b/test/scripts_config_test.py @@ -11,7 +11,7 @@ from pulsar.scripts import config -from test_utils import temp_directory +from .test_utils import temp_directory def test_default_web_config(): diff --git a/test/test_objectstore.py b/test/test_objectstore.py index da3602e9..b8a9804c 100644 --- a/test/test_objectstore.py +++ b/test/test_objectstore.py @@ -24,22 +24,22 @@ def test_disk_store(): # Write empty dataset 2 in second backend, ensure it is empty and # exists. empty_dataset = MockDataset(2) - directory.write(b"", "files1/000/dataset_2.dat") + directory.write("", "files1/000/dataset_2.dat") assert object_store.exists(empty_dataset) assert object_store.empty(empty_dataset) # Write non-empty dataset in backend 1, test it is not emtpy & exists. hello_world_dataset = MockDataset(3) - directory.write(b"Hello World!", "files1/000/dataset_3.dat") + directory.write("Hello World!", "files1/000/dataset_3.dat") assert object_store.exists(hello_world_dataset) assert not object_store.empty(hello_world_dataset) # Test get_data data = object_store.get_data(hello_world_dataset) - assert data == b"Hello World!" + assert data == "Hello World!" data = object_store.get_data(hello_world_dataset, start=1, count=6) - assert data == b"ello W" + assert data == "ello W" # Test Size @@ -58,13 +58,13 @@ def test_disk_store(): output_dataset = MockDataset(4) output_real_path = os.path.join(directory.temp_directory, "files1", "000", "dataset_4.dat") assert not os.path.exists(output_real_path) - output_working_path = directory.write(b"NEW CONTENTS", "job_working_directory1/example_output") + output_working_path = directory.write("NEW CONTENTS", "job_working_directory1/example_output") object_store.update_from_file(output_dataset, file_name=output_working_path, create=True) assert os.path.exists(output_real_path) # Test delete to_delete_dataset = MockDataset(5) - to_delete_real_path = directory.write(b"content to be deleted!", "files1/000/dataset_5.dat") + to_delete_real_path = directory.write("content to be deleted!", "files1/000/dataset_5.dat") assert object_store.exists(to_delete_dataset) assert object_store.delete(to_delete_dataset) assert not object_store.exists(to_delete_dataset) @@ -165,7 +165,8 @@ def write(self, contents, name): directory = os.path.dirname(path) if not os.path.exists(directory): os.makedirs(directory) - expanded_contents = Template(contents).safe_substitute(temp_directory=self.temp_directory) + contents_template = Template(contents) + expanded_contents = contents_template.safe_substitute(temp_directory=self.temp_directory) open(path, "w").write(expanded_contents) return path diff --git a/test/test_tool_deps.py b/test/test_tool_deps.py index 7c206489..cc85bdcd 100644 --- a/test/test_tool_deps.py +++ b/test/test_tool_deps.py @@ -193,7 +193,7 @@ def __assert_foo_exported( commands ): command = ["bash", "-c", "%s; echo \"$FOO\"" % "".join(commands)] process = Popen(command, stdout=PIPE) output = process.communicate()[0].strip() - assert output == 'bar', "Command %s exports FOO as %s, not bar" % (command, output) + assert output == b'bar', "Command %s exports FOO as %s, not bar" % (command, output) def __setup_galaxy_package_dep(base_path, name, version, contents=""): @@ -372,7 +372,7 @@ def __parse_resolvers(xml_content): def __dependency_manager(xml_content): with __test_base_path() as base_path: f = tempfile.NamedTemporaryFile() - f.write(xml_content) + f.write(xml_content.encode("utf-8")) f.flush() dm = DependencyManager( default_base_path=base_path, conf_file=f.name ) yield dm diff --git a/test/test_utils.py b/test/test_utils.py index e8e16898..bd32d005 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -146,8 +146,8 @@ def _test_simple_execution(self, manager): manager.launch(job_id, command) while manager.get_status(job_id) not in ['complete', 'cancelled']: pass - self.assertEquals(manager.stderr_contents(job_id), 'moo') - self.assertEquals(manager.stdout_contents(job_id), 'Hello World!') + self.assertEquals(manager.stderr_contents(job_id), b'moo') + self.assertEquals(manager.stdout_contents(job_id), b'Hello World!') self.assertEquals(manager.return_code(job_id), 0) manager.clean(job_id) self.assertEquals(len(listdir(self.staging_directory)), 0) @@ -204,10 +204,11 @@ def server_for_test_app(app): try: from paste.exceptions.errormiddleware import ErrorMiddleware error_app = ErrorMiddleware(app.app, debug=True, error_log="errors.log") + server = StopableWSGIServer.create(error_app) except ImportError: # paste.exceptions not available for Python 3. - error_app = app - server = StopableWSGIServer.create(error_app) + error_app = app.app + server = StopableWSGIServer.create(error_app) try: server.wait() yield server diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py index 514bdfa6..4155167e 100644 --- a/test/transfer_action_test.py +++ b/test/transfer_action_test.py @@ -7,23 +7,23 @@ def test_write_to_file(): with files_server() as (server, directory): from_path = os.path.join(directory, "remote_get") - open(from_path, "wb").write(u"123456") + open(from_path, "wb").write(b"123456") to_path = os.path.join(directory, "local_get") url = server.application_url + "?path=%s" % from_path RemoteTransferAction(to_path, url=url).write_to_path(to_path) - assert open(to_path, "rb").read() == u"123456" + assert open(to_path, "rb").read() == b"123456" def test_write_from_file(): with files_server() as (server, directory): from_path = os.path.join(directory, "local_post") - open(from_path, "wb").write(u"123456") + open(from_path, "wb").write(b"123456") to_path = os.path.join(directory, "remote_post") url = server.application_url + "?path=%s" % to_path RemoteTransferAction(to_path, url=url).write_from_path(from_path) posted_contents = open(to_path, "rb").read() - assert posted_contents == u"123456", posted_contents + assert posted_contents == b"123456", posted_contents diff --git a/test/wsgi_app_test.py b/test/wsgi_app_test.py index fa133c74..a486bcb2 100644 --- a/test/wsgi_app_test.py +++ b/test/wsgi_app_test.py @@ -1,8 +1,9 @@ import os import json -import urllib import time +from six.moves.urllib.parse import quote + def test_standard_requests(): """ Tests app controller methods. These tests should be @@ -13,7 +14,7 @@ def test_standard_requests(): with test_pulsar_app(test_conf={"extra_environ": {"REMOTE_ADDR": "127.101.101.98"}}) as app: staging_directory = app.app.staging_directory setup_response = app.post("/jobs?job_id=12345") - setup_config = json.loads(setup_response.body) + setup_config = json.loads(setup_response.body.decode("utf-8")) assert setup_config["working_directory"].startswith(staging_directory) outputs_directory = setup_config["outputs_directory"] assert outputs_directory.startswith(staging_directory) @@ -23,7 +24,7 @@ def test_standard_requests(): def test_upload(upload_type): url = "/jobs/%s/files?name=input1&type=%s" % (job_id, upload_type) upload_input_response = app.post(url, "Test Contents") - upload_input_config = json.loads(upload_input_response.body) + upload_input_config = json.loads(upload_input_response.body.decode("utf-8")) staged_input_path = upload_input_config["path"] staged_input = open(staged_input_path, "r") try: @@ -39,7 +40,7 @@ def test_upload(upload_type): finally: test_output.close() download_response = app.get("/jobs/%s/files?name=test_output&type=output" % job_id) - assert download_response.body == "Hello World!" + assert download_response.body.decode("utf-8") == "Hello World!" try: app.get("/jobs/%s/files?name=test_output2&type=output" % job_id) @@ -47,24 +48,24 @@ def test_upload(upload_type): except: pass - command_line = urllib.quote("""python -c "import sys; sys.stdout.write('test_out')" """) + command_line = quote("""python -c "import sys; sys.stdout.write('test_out')" """) launch_response = app.post("/jobs/%s/submit?command_line=%s" % (job_id, command_line)) - assert launch_response.body == 'OK' + assert launch_response.body.decode("utf-8") == 'OK' # Hack: Call twice to ensure postprocessing occurs and has time to # complete. Monitor thread should get this. - time.sleep(.2) + time.sleep(.3) check_response = app.get("/jobs/%s/status" % job_id) - time.sleep(.2) + time.sleep(.3) check_response = app.get("/jobs/%s/status" % job_id) - check_config = json.loads(check_response.body) + check_config = json.loads(check_response.body.decode("utf-8")) assert check_config['returncode'] == 0 assert check_config['stdout'] == "test_out" assert check_config['stderr'] == "" kill_response = app.put("/jobs/%s/cancel" % job_id) - assert kill_response.body == 'OK' + assert kill_response.body.decode("utf-8") == 'OK' clean_response = app.delete("/jobs/%s" % job_id) - assert clean_response.body == 'OK' + assert clean_response.body.decode("utf-8") == 'OK' assert os.listdir(staging_directory) == []