diff --git a/e3/anod/context.py b/e3/anod/context.py index b662612d..aa9e6708 100644 --- a/e3/anod/context.py +++ b/e3/anod/context.py @@ -127,7 +127,7 @@ def add_decision(self, decision_class, root, left, right): |-> right :param decision_class: Decision subclass to use - :type decision_class: T + :type decision_class: () -> Decision :param root: parent node of the decision node :type root: e3.anod.action.Action :param left: left decision (child of Decision node) diff --git a/e3/archive.py b/e3/archive.py index 7bddbae9..7f965229 100644 --- a/e3/archive.py +++ b/e3/archive.py @@ -22,14 +22,11 @@ class ArchiveError(e3.error.E3Error): pass -def __check_type(filename, unpack=True, - force_extension=None): +def __check_type(filename, force_extension=None): """Internal function used by create_archive and unpack_archive. :param filename: the name of the archive to extract the extension :type filename: str - :param unpack: to know if we are called by unpack_archive or create_archive - :type unpack: bool :param force_extension: specify the archive extension if not in the filename :type force_extension: str | None @@ -73,7 +70,6 @@ def unpack_archive(filename, dest, selected_files=None, remove_root_dir=False, - tar='tar', unpack_cmd=None, force_extension=None, delete=False, @@ -94,8 +90,6 @@ def unpack_archive(filename, if it is possible. If not do not raise an exception in that case and fallback on the other method. :type remove_root_dir: bool - :param tar: path/to/tar binary (else use 'tar') - :type tar: str :param unpack_cmd: command to run to unpack the archive, if None use default methods or raise ArchiveError if archive format is not supported. If unpack_cmd is not None, then remove_root_dir is ignored. @@ -148,7 +142,6 @@ def unpack_archive(filename, ext = __check_type( filename, - unpack=True, force_extension=force_extension,) # If remove_root_dir is set then extract to a temp directory first. @@ -268,7 +261,7 @@ def is_match(name, files): e3.fs.rm(tmp_dest, True) -def create_archive(filename, from_dir, dest, tar='tar', force_extension=None, +def create_archive(filename, from_dir, dest, force_extension=None, from_dir_rename=None, no_root_dir=False): """Create an archive file (.tgz, .tar.gz, .tar or .zip). @@ -284,8 +277,6 @@ def create_archive(filename, from_dir, dest, tar='tar', force_extension=None, :type from_dir: str :param dest: destination directory (should exist) :type dest: str - :param tar: path/to/tar binary (else use 'tar') - :type tar: str :param force_extension: specify the archive extension if not in the filename. If filename has no extension and force_extension is None create_archive will fail. @@ -303,7 +294,6 @@ def create_archive(filename, from_dir, dest, tar='tar', force_extension=None, ext = __check_type( filename, - unpack=False, force_extension=force_extension) if from_dir_rename is None: diff --git a/e3/binarydata/__init__.py b/e3/binarydata/__init__.py index 51ade704..8d18fbd8 100644 --- a/e3/binarydata/__init__.py +++ b/e3/binarydata/__init__.py @@ -197,8 +197,8 @@ def image(self, indent=0): """ return self.FORMATTER % self.value - @abc.abstractproperty @staticmethod + @abc.abstractproperty def FORMATTER(): """image() returns the result of FORMATTER % value. diff --git a/e3/electrolyt/plans.py b/e3/electrolyt/plans.py index 714e21dc..b8f8445b 100644 --- a/e3/electrolyt/plans.py +++ b/e3/electrolyt/plans.py @@ -34,7 +34,7 @@ def load(self, filename): source_code = fd.read() self.load_chunk(source_code, filename) - def check(self, ast): + def check(self, code_ast): """Check plan coding style.""" pass diff --git a/e3/env.py b/e3/env.py index 162595ec..ed8786b7 100644 --- a/e3/env.py +++ b/e3/env.py @@ -251,6 +251,8 @@ def get_platform(value, machine=None): :param value: a string representing a platform or None :type value: str | None + :param machine: machine name + :type machine: str | None :rtype: a Platform instance or None """ if value is None: diff --git a/e3/fs.py b/e3/fs.py index 45d1c113..71be31a5 100644 --- a/e3/fs.py +++ b/e3/fs.py @@ -325,55 +325,55 @@ def rm(path, recursive=False, glob=True): else: file_list = set(path) - def onerror(func, path, exc_info): + def onerror(func, error_path, exc_info): """When shutil.rmtree fail, try again to delete the file. :param func: function to call on error :type func: () -> None - :param path: file or directory to remove - :type path: str + :param error_path: file or directory to remove + :type error_path: str :param exc_info: exception raised when the first delete attempt was made :type exc_info: tuple """ del exc_info - e3.log.debug('error when running %s on %s', func, path) + e3.log.debug('error when running %s on %s', func, error_path) # First check whether the file we are trying to delete exist. If not # the work is already done, no need to continue trying removing it. - if not os.path.exists(path): + if not os.path.exists(error_path): return if func in (os.remove, os.unlink): - # Cannot remove path, call chmod and redo an attempt + # Cannot remove error_path, call chmod and redo an attempt # This function is only called when deleting a file inside a # directory to remove, it is safe to change the parent directory # permission since the parent directory will also be removed. - os.chmod(os.path.dirname(path), 0o700) + os.chmod(os.path.dirname(error_path), 0o700) # ??? It seems that this might be needed on windows - os.chmod(path, 0o700) - e3.os.fs.safe_remove(path) + os.chmod(error_path, 0o700) + e3.os.fs.safe_remove(error_path) elif func == os.rmdir: - # Cannot remove path, call chmod and redo an attempt - os.chmod(path, 0o700) + # Cannot remove error_path, call chmod and redo an attempt + os.chmod(error_path, 0o700) # Also change the parent directory permission if it will also # be removed. - if recursive and path not in file_list: - # If path not in the list of directories to remove it means - # that we are already in a subdirectory. - os.chmod(os.path.dirname(path), 0o700) - e3.os.fs.safe_rmdir(path) + if recursive and error_path not in file_list: + # If error_path not in the list of directories to remove it + # means that we are already in a subdirectory. + os.chmod(os.path.dirname(error_path), 0o700) + e3.os.fs.safe_rmdir(error_path) elif func in (os.listdir, os.open): # Cannot read the directory content, probably a permission issue - os.chmod(path, 0o700) + os.chmod(error_path, 0o700) # And continue to delete the subdir - shutil.rmtree(path, onerror=onerror) + shutil.rmtree(error_path, onerror=onerror) for f in file_list: try: @@ -482,11 +482,11 @@ def sync_tree(source, target, ignore=None, # normalize ignore patterns if ignore is not None: - norm_ignore_list = [p.replace('\\', '/') for p in ignore] - abs_ignore_patterns = [p for p in norm_ignore_list - if p.startswith('/')] - rel_ignore_patterns = [p for p in norm_ignore_list - if not p.startswith('/')] + norm_ignore_list = [fn.replace('\\', '/') for fn in ignore] + abs_ignore_patterns = [fn for fn in norm_ignore_list + if fn.startswith('/')] + rel_ignore_patterns = [fn for fn in norm_ignore_list + if not fn.startswith('/')] def is_in_ignore_list(p): """Check if a file should be ignored. diff --git a/e3/job/__init__.py b/e3/job/__init__.py index 19356e08..069a2472 100644 --- a/e3/job/__init__.py +++ b/e3/job/__init__.py @@ -53,6 +53,7 @@ def __init__(self, uid, data, notify_end): self.data = data self.notify_end = notify_end self.slot = None + self.handle = None self.thread = None self.start_time = None self.stop_time = None @@ -95,6 +96,10 @@ class ProcessJob(Job): __metaclass__ = abc.ABCMeta + def __init__(self, uid, data, notify_end): + super(ProcessJob, self).__init__(uid, data, notify_end) + self.proc_handle = None + def run(self): """Internal function.""" cmd_options = self.cmd_options diff --git a/e3/job/scheduler.py b/e3/job/scheduler.py index c490007d..d9e01b42 100644 --- a/e3/job/scheduler.py +++ b/e3/job/scheduler.py @@ -51,6 +51,15 @@ def __init__(self, if self.collect is None: self.collect = lambda x: False + self.active_jobs = [] + self.queued_jobs = 0 + self.all_jobs_queued = False + self.message_queue = None + self.dag_iterator = None + self.start_time = None + self.stop_time = None + self.max_active_jobs = 0 + # Initialize named queues self.queues = {} self.tokens = {} @@ -77,9 +86,10 @@ def simple_provider(cls, job_class): """Return a simple provider based on a given Job class. :param job_class: a subclass of Job - :type job_class: T + :type job_class: () -> Job """ def provider(uid, data, predecessors, notify_end): + del predecessors return job_class(uid, data, notify_end) return provider @@ -198,38 +208,39 @@ def wait(self): # Wait for message from one the active jobs while True: - try: - # The first job in active jobs is the oldest one - # compute the get timeout based on its startup information - deadline = datetime.now() - self.active_jobs[0].start_time - deadline = self.job_timeout - deadline.total_seconds() + # The first job in active jobs is the oldest one + # compute the get timeout based on its startup information + deadline = datetime.now() - self.active_jobs[0].start_time + deadline = self.job_timeout - deadline.total_seconds() - # Ensure waiting time is a positive number - deadline = max(0.0, deadline) + # Ensure waiting time is a positive number + deadline = max(0.0, deadline) + try: uid = self.message_queue.get(True, deadline) - break + logger.info('job %s finished', uid) + job_index, job = next( + ((index, job) + for index, job in enumerate(self.active_jobs) + if job.uid == uid)) + self.slots.append(job.slot) + + # Liberate the resources taken by the job + self.tokens[job.queue_name] += job.tokens + + if self.collect(job): + # Requeue when needed + self.queues[job.queue_name].append(job) + self.queued_jobs += 1 + else: + # Mark the job as completed + self.dag_iterator.leave(job.uid) + + del self.active_jobs[job_index] + return + except Empty: # If after timeout we get an empty result, it means that # the oldest job has reached the timeout. Interrupt it # and wait for the queue to receive the end notification self.active_jobs[0].interrupt() - - logger.info('job %s finished', uid) - job_index, job = next(((index, job) - for index, job in enumerate(self.active_jobs) - if job.uid == uid)) - self.slots.append(job.slot) - - # Liberate the resources taken by the job - self.tokens[job.queue_name] += job.tokens - - if self.collect(job): - # Requeue when needed - self.queues[job.queue_name].append(job) - self.queued_jobs += 1 - else: - # Mark the job as completed - self.dag_iterator.leave(job.uid) - - del self.active_jobs[job_index] diff --git a/e3/main.py b/e3/main.py index c5aed6bd..c74da711 100644 --- a/e3/main.py +++ b/e3/main.py @@ -126,15 +126,15 @@ def __init__(self, name=None, self.argument_parser = argument_parser self.__log_handlers_set = False - def sigterm_handler(signal, frame): + def sigterm_handler(sig, frame): """Automatically convert SIGTERM to SystemExit exception. This is done to give enough time to an application killed by rlimit to perform the needed cleanup steps - :param signal: signal action + :param sig: signal action :param frame: the interrupted stack frame """ - del signal, frame + del sig, frame logging.critical('SIGTERM received') raise SystemExit('SIGTERM received') diff --git a/e3/net/smtp.py b/e3/net/smtp.py index af869896..0f476a7d 100644 --- a/e3/net/smtp.py +++ b/e3/net/smtp.py @@ -65,8 +65,6 @@ def system_sendmail(): # No system sendmail, return False return False - result = False - for smtp_server in smtp_servers: try: s = smtplib.SMTP(smtp_server) @@ -79,7 +77,6 @@ def system_sendmail(): if not s.sendmail(from_email, to_emails, mail_as_string): # sendmail returns an empty dictionary if the message # was accepted for delivery to all addresses - result = True break continue except (socket.error, smtplib.SMTPException) as e: @@ -97,9 +94,10 @@ def system_sendmail(): else: logger.debug('no valid smtp server found') - result = system_sendmail() + if not system_sendmail(): + return False - if result and message_id is not None: + if message_id is not None: logger.debug('Message-ID: %s sent successfully', message_id) - return result + return True diff --git a/e3/os/process.py b/e3/os/process.py index e53ac288..55ff6c5f 100644 --- a/e3/os/process.py +++ b/e3/os/process.py @@ -640,6 +640,7 @@ def wait_for_processes(process_list, timeout): fd_r, fd_w = os.pipe() def handler(signum, frame): + del signum, frame os.write(fd_w, b'a') signal.signal(signal.SIGCHLD, handler) @@ -661,11 +662,11 @@ def handler(signum, frame): while True: try: l_r, _, _ = select.select(*select_args) + if l_r: + os.read(fd_r, 1) break except select.error: pass - if l_r: - os.read(fd_r, 1) remain = timeout - time.time() + start finally: @@ -749,6 +750,8 @@ def kill_process_tree(pid, timeout=3): :param pid: pid of the toplevel process :type pid: int | psutil.Process + :param timeout: wait timeout after sending the kill signal + :type timeout: int :return: True if all processes either don't exist or have been killed, False if there are some processes still alive. :rtype: bool diff --git a/e3/vcs/git.py b/e3/vcs/git.py index ad9dace2..c4dd46f8 100644 --- a/e3/vcs/git.py +++ b/e3/vcs/git.py @@ -205,6 +205,11 @@ def write_log(self, stream, max_count=50, rev_range=None, :type stream: file :param max_count: max number of commit to display :type max_count: int + :param rev_range: git revision range, see ``git log -h`` for details + :type rev_range: str + :param with_gerrit_notes: if True also fetch Gerrit notes containing + review data such as Submitted-at, Submitted-by. + :type with_gerrit_notes: bool :raise: GitError """ # Format: