Skip to content
2 changes: 1 addition & 1 deletion e3/anod/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def add_decision(self, decision_class, root, left, right):
|-> right

:param decision_class: Decision subclass to use
:type decision_class: T
:type decision_class: () -> Decision
:param root: parent node of the decision node
:type root: e3.anod.action.Action
:param left: left decision (child of Decision node)
Expand Down
14 changes: 2 additions & 12 deletions e3/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ class ArchiveError(e3.error.E3Error):
pass


def __check_type(filename, unpack=True,
force_extension=None):
def __check_type(filename, force_extension=None):
"""Internal function used by create_archive and unpack_archive.

:param filename: the name of the archive to extract the extension
:type filename: str
:param unpack: to know if we are called by unpack_archive or create_archive
:type unpack: bool
:param force_extension: specify the archive extension if not in the
filename
:type force_extension: str | None
Expand Down Expand Up @@ -73,7 +70,6 @@ def unpack_archive(filename,
dest,
selected_files=None,
remove_root_dir=False,
tar='tar',
unpack_cmd=None,
force_extension=None,
delete=False,
Expand All @@ -94,8 +90,6 @@ def unpack_archive(filename,
if it is possible. If not do not raise an exception in that case and
fallback on the other method.
:type remove_root_dir: bool
:param tar: path/to/tar binary (else use 'tar')
:type tar: str
:param unpack_cmd: command to run to unpack the archive, if None use
default methods or raise ArchiveError if archive format is not
supported. If unpack_cmd is not None, then remove_root_dir is ignored.
Expand Down Expand Up @@ -148,7 +142,6 @@ def unpack_archive(filename,

ext = __check_type(
filename,
unpack=True,
force_extension=force_extension,)

# If remove_root_dir is set then extract to a temp directory first.
Expand Down Expand Up @@ -268,7 +261,7 @@ def is_match(name, files):
e3.fs.rm(tmp_dest, True)


def create_archive(filename, from_dir, dest, tar='tar', force_extension=None,
def create_archive(filename, from_dir, dest, force_extension=None,
from_dir_rename=None, no_root_dir=False):
"""Create an archive file (.tgz, .tar.gz, .tar or .zip).

Expand All @@ -284,8 +277,6 @@ def create_archive(filename, from_dir, dest, tar='tar', force_extension=None,
:type from_dir: str
:param dest: destination directory (should exist)
:type dest: str
:param tar: path/to/tar binary (else use 'tar')
:type tar: str
:param force_extension: specify the archive extension if not in the
filename. If filename has no extension and force_extension is None
create_archive will fail.
Expand All @@ -303,7 +294,6 @@ def create_archive(filename, from_dir, dest, tar='tar', force_extension=None,

ext = __check_type(
filename,
unpack=False,
force_extension=force_extension)

if from_dir_rename is None:
Expand Down
2 changes: 1 addition & 1 deletion e3/binarydata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ def image(self, indent=0):
"""
return self.FORMATTER % self.value

@abc.abstractproperty
@staticmethod
@abc.abstractproperty
def FORMATTER():
"""image() returns the result of FORMATTER % value.

Expand Down
2 changes: 1 addition & 1 deletion e3/electrolyt/plans.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def load(self, filename):
source_code = fd.read()
self.load_chunk(source_code, filename)

def check(self, ast):
def check(self, code_ast):
"""Check plan coding style."""
pass

Expand Down
2 changes: 2 additions & 0 deletions e3/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def get_platform(value, machine=None):

:param value: a string representing a platform or None
:type value: str | None
:param machine: machine name
:type machine: str | None
:rtype: a Platform instance or None
"""
if value is None:
Expand Down
46 changes: 23 additions & 23 deletions e3/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,55 +325,55 @@ def rm(path, recursive=False, glob=True):
else:
file_list = set(path)

def onerror(func, path, exc_info):
def onerror(func, error_path, exc_info):
"""When shutil.rmtree fail, try again to delete the file.

:param func: function to call on error
:type func: () -> None
:param path: file or directory to remove
:type path: str
:param error_path: file or directory to remove
:type error_path: str
:param exc_info: exception raised when the first delete attempt was
made
:type exc_info: tuple
"""
del exc_info
e3.log.debug('error when running %s on %s', func, path)
e3.log.debug('error when running %s on %s', func, error_path)

# First check whether the file we are trying to delete exist. If not
# the work is already done, no need to continue trying removing it.
if not os.path.exists(path):
if not os.path.exists(error_path):
return

if func in (os.remove, os.unlink):
# Cannot remove path, call chmod and redo an attempt
# Cannot remove error_path, call chmod and redo an attempt

# This function is only called when deleting a file inside a
# directory to remove, it is safe to change the parent directory
# permission since the parent directory will also be removed.
os.chmod(os.path.dirname(path), 0o700)
os.chmod(os.path.dirname(error_path), 0o700)

# ??? It seems that this might be needed on windows
os.chmod(path, 0o700)
e3.os.fs.safe_remove(path)
os.chmod(error_path, 0o700)
e3.os.fs.safe_remove(error_path)

elif func == os.rmdir:
# Cannot remove path, call chmod and redo an attempt
os.chmod(path, 0o700)
# Cannot remove error_path, call chmod and redo an attempt
os.chmod(error_path, 0o700)

# Also change the parent directory permission if it will also
# be removed.
if recursive and path not in file_list:
# If path not in the list of directories to remove it means
# that we are already in a subdirectory.
os.chmod(os.path.dirname(path), 0o700)
e3.os.fs.safe_rmdir(path)
if recursive and error_path not in file_list:
# If error_path not in the list of directories to remove it
# means that we are already in a subdirectory.
os.chmod(os.path.dirname(error_path), 0o700)
e3.os.fs.safe_rmdir(error_path)

elif func in (os.listdir, os.open):
# Cannot read the directory content, probably a permission issue
os.chmod(path, 0o700)
os.chmod(error_path, 0o700)

# And continue to delete the subdir
shutil.rmtree(path, onerror=onerror)
shutil.rmtree(error_path, onerror=onerror)

for f in file_list:
try:
Expand Down Expand Up @@ -482,11 +482,11 @@ def sync_tree(source, target, ignore=None,

# normalize ignore patterns
if ignore is not None:
norm_ignore_list = [p.replace('\\', '/') for p in ignore]
abs_ignore_patterns = [p for p in norm_ignore_list
if p.startswith('/')]
rel_ignore_patterns = [p for p in norm_ignore_list
if not p.startswith('/')]
norm_ignore_list = [fn.replace('\\', '/') for fn in ignore]
abs_ignore_patterns = [fn for fn in norm_ignore_list
if fn.startswith('/')]
rel_ignore_patterns = [fn for fn in norm_ignore_list
if not fn.startswith('/')]

def is_in_ignore_list(p):
"""Check if a file should be ignored.
Expand Down
5 changes: 5 additions & 0 deletions e3/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, uid, data, notify_end):
self.data = data
self.notify_end = notify_end
self.slot = None
self.handle = None
self.thread = None
self.start_time = None
self.stop_time = None
Expand Down Expand Up @@ -95,6 +96,10 @@ class ProcessJob(Job):

__metaclass__ = abc.ABCMeta

def __init__(self, uid, data, notify_end):
super(ProcessJob, self).__init__(uid, data, notify_end)
self.proc_handle = None

def run(self):
"""Internal function."""
cmd_options = self.cmd_options
Expand Down
67 changes: 39 additions & 28 deletions e3/job/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ def __init__(self,
if self.collect is None:
self.collect = lambda x: False

self.active_jobs = []
self.queued_jobs = 0
self.all_jobs_queued = False
self.message_queue = None
self.dag_iterator = None
self.start_time = None
self.stop_time = None
self.max_active_jobs = 0

# Initialize named queues
self.queues = {}
self.tokens = {}
Expand All @@ -77,9 +86,10 @@ def simple_provider(cls, job_class):
"""Return a simple provider based on a given Job class.

:param job_class: a subclass of Job
:type job_class: T
:type job_class: () -> Job
"""
def provider(uid, data, predecessors, notify_end):
del predecessors
return job_class(uid, data, notify_end)
return provider

Expand Down Expand Up @@ -198,38 +208,39 @@ def wait(self):

# Wait for message from one the active jobs
while True:
try:
# The first job in active jobs is the oldest one
# compute the get timeout based on its startup information
deadline = datetime.now() - self.active_jobs[0].start_time
deadline = self.job_timeout - deadline.total_seconds()
# The first job in active jobs is the oldest one
# compute the get timeout based on its startup information
deadline = datetime.now() - self.active_jobs[0].start_time
deadline = self.job_timeout - deadline.total_seconds()

# Ensure waiting time is a positive number
deadline = max(0.0, deadline)
# Ensure waiting time is a positive number
deadline = max(0.0, deadline)

try:
uid = self.message_queue.get(True, deadline)
break
logger.info('job %s finished', uid)
job_index, job = next(
((index, job)
for index, job in enumerate(self.active_jobs)
if job.uid == uid))
self.slots.append(job.slot)

# Liberate the resources taken by the job
self.tokens[job.queue_name] += job.tokens

if self.collect(job):
# Requeue when needed
self.queues[job.queue_name].append(job)
self.queued_jobs += 1
else:
# Mark the job as completed
self.dag_iterator.leave(job.uid)

del self.active_jobs[job_index]
return

except Empty:
# If after timeout we get an empty result, it means that
# the oldest job has reached the timeout. Interrupt it
# and wait for the queue to receive the end notification
self.active_jobs[0].interrupt()

logger.info('job %s finished', uid)
job_index, job = next(((index, job)
for index, job in enumerate(self.active_jobs)
if job.uid == uid))
self.slots.append(job.slot)

# Liberate the resources taken by the job
self.tokens[job.queue_name] += job.tokens

if self.collect(job):
# Requeue when needed
self.queues[job.queue_name].append(job)
self.queued_jobs += 1
else:
# Mark the job as completed
self.dag_iterator.leave(job.uid)

del self.active_jobs[job_index]
6 changes: 3 additions & 3 deletions e3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ def __init__(self, name=None,
self.argument_parser = argument_parser
self.__log_handlers_set = False

def sigterm_handler(signal, frame):
def sigterm_handler(sig, frame):
"""Automatically convert SIGTERM to SystemExit exception.

This is done to give enough time to an application killed by
rlimit to perform the needed cleanup steps
:param signal: signal action
:param sig: signal action
:param frame: the interrupted stack frame
"""
del signal, frame
del sig, frame
logging.critical('SIGTERM received')
raise SystemExit('SIGTERM received')

Expand Down
10 changes: 4 additions & 6 deletions e3/net/smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ def system_sendmail():
# No system sendmail, return False
return False

result = False

for smtp_server in smtp_servers:
try:
s = smtplib.SMTP(smtp_server)
Expand All @@ -79,7 +77,6 @@ def system_sendmail():
if not s.sendmail(from_email, to_emails, mail_as_string):
# sendmail returns an empty dictionary if the message
# was accepted for delivery to all addresses
result = True
break
continue
except (socket.error, smtplib.SMTPException) as e:
Expand All @@ -97,9 +94,10 @@ def system_sendmail():

else:
logger.debug('no valid smtp server found')
result = system_sendmail()
if not system_sendmail():
return False

if result and message_id is not None:
if message_id is not None:
logger.debug('Message-ID: %s sent successfully', message_id)

return result
return True
7 changes: 5 additions & 2 deletions e3/os/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ def wait_for_processes(process_list, timeout):
fd_r, fd_w = os.pipe()

def handler(signum, frame):
del signum, frame
os.write(fd_w, b'a')

signal.signal(signal.SIGCHLD, handler)
Expand All @@ -661,11 +662,11 @@ def handler(signum, frame):
while True:
try:
l_r, _, _ = select.select(*select_args)
if l_r:
os.read(fd_r, 1)
break
except select.error:
pass
if l_r:
os.read(fd_r, 1)

remain = timeout - time.time() + start
finally:
Expand Down Expand Up @@ -749,6 +750,8 @@ def kill_process_tree(pid, timeout=3):

:param pid: pid of the toplevel process
:type pid: int | psutil.Process
:param timeout: wait timeout after sending the kill signal
:type timeout: int
:return: True if all processes either don't exist or have been killed,
False if there are some processes still alive.
:rtype: bool
Expand Down
Loading