Skip to content

Commit

Permalink
feat: initial tentative support for condition
Browse files Browse the repository at this point in the history
Should have much better real life performance for small tasks.
  • Loading branch information
joamag committed Dec 20, 2022
1 parent dff5165 commit 470e6f0
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 88 deletions.
178 changes: 91 additions & 87 deletions src/colony/libs/scheduling_util.py
Expand Up @@ -60,7 +60,7 @@ class Scheduler(threading.Thread):

running_flag = False
""" Flag that controls if the scheduler event loop is currently
running """
running, set at the start of the loop and unset by the end of it """

continue_flag = False
""" Flag controlling the execution of the scheduler, if unset
Expand All @@ -76,11 +76,9 @@ class Scheduler(threading.Thread):
timestamp_map = {}
""" The map associating the timestamp with a list of callables """

timestamp_lock = None
""" The lock that controls the access to the timestamp structures """

action_lock = None
""" The lock that controls the access to the start and stop actions """
condition = None
""" The condition that will control the access to the data structures
and trigger events on the production of new items """

exception_handler = None
""" If set defined an handler (callable) that is going to be called
Expand All @@ -104,88 +102,65 @@ def __init__(self, sleep_step = DEFAULT_SLEEP_STEP):
self.daemon = True
self.timestamp_queue = []
self.timestamp_map = {}
self.timestamp_lock = threading.RLock()
self.action_lock = threading.RLock()
self.condition = threading.Condition()

def run(self):
self.running_flag = True

timeout = None

try:
# iterates while the continue flag is set, this means
# that this is a continuous loop operation
while self.continue_flag:
# acquires the timestamp lock
self.timestamp_lock.acquire()

try:
# retrieves the current timestamp
with self.condition:
while not self.timestamp_queue or timeout:
self.condition.wait(timeout = timeout)
timeout = None

# in case the continue flag has been unset
# (by triggering condition), then breaks loop
if not self.continue_flag: break

# retrieves the current timestamp, to be
# used in comparison operations
current_timestamp = time.time()

# iterates over the timestamp queue
while True:
# in case the timestamp queue is invalid
# (possibly empty)
if not self.timestamp_queue:
# breaks the loop (no more work
# to be processed for now)
break

# retrieves the timestamp from the
# timestamp queue
timestamp = self.timestamp_queue[0]

# in case the final timestamp has been
# reached, meaning that the timestamp
# of the task is greater than the current
# timestamp
if timestamp > current_timestamp:
# breaks the loop (no more work
# to be processed for now)
break

# retrieves the callable (elements) list
# for the timestamp
callable_list = self.timestamp_map[timestamp]

# removes the callable list for the timestamp
# (done before the calling to avoid race condition)
del self.timestamp_map[timestamp]

# pops (removes first element) the timestamp
# from the timestamp queue (done before the
# calling to avoid race condition)
self.timestamp_queue.pop(0)

# sets the busy flag and releases the timestamp
# lock (avoids waiting for callables)
self.busy_flag = True
self.timestamp_lock.release()

try:
# iterates over all the callables to call
# them (calls the proper function)
for callable in callable_list:
try:
# calls the callable (element)
# this can be of long duration
callable()
except Exception as exception:
if self.exception_handler:
self.exception_handler(callable, exception)
else:
print(exception)
finally:
# acquires the timestamp lock (back)
# and unsets the busy flag
self.timestamp_lock.acquire()
self.busy_flag = False
finally:
# releases the timestamp lock
self.timestamp_lock.release()

# sleeps for the amount of time defined
# in the sleep step
time.sleep(self.sleep_step)
# retrieves the timestamp from the
# timestamp queue
timestamp = self.timestamp_queue[0]

# in case the final timestamp has been
# reached, meaning that the timestamp
# of the task is greater than the current
# timestamp
if timestamp > current_timestamp:
# sleeps for the amount of time defined
# in the sleep step
timeout = timestamp - current_timestamp

# breaks the loop (no more work
# to be processed for now)
continue

# retrieves the callable (elements) list
# for the timestamp
callable_list = self.timestamp_map[timestamp]

# removes the callable list for the timestamp
# (done before the calling to avoid race condition)
del self.timestamp_map[timestamp]

# pops (removes first element) the timestamp
# from the timestamp queue (done before the
# calling to avoid race condition)
self.timestamp_queue.pop(0)

# runs the callable calling operation (consuming)
# which should properly handle exceptions avoiding
# internal execution problems
self._handle_callables(callable_list)
finally:
self.running_flag = False
self.continue_flag = False
Expand Down Expand Up @@ -222,6 +197,11 @@ def stop_scheduler(self):
# the unload of the scheduler as soon as possible
self.continue_flag = False

# triggers the condition so that the listener is
# able to stop the waiting process
with self.condition:
self.condition.notify()

def reset_scheduler(self):
"""
Resets the scheduler to the original state.
Expand All @@ -232,10 +212,13 @@ def reset_scheduler(self):
stopping of the scheduler.
"""

self.running_flag = False
self.continue_flag = False
self.busy_flag = False
self.timestamp_queue = []
self.timestamp_map = {}
self.timestamp_lock = threading.RLock()
self.condition = threading.Condition()
exception_handler = None

def add_callable(self, callable, timestamp = None, verify = True):
"""
Expand Down Expand Up @@ -264,10 +247,9 @@ def add_callable(self, callable, timestamp = None, verify = True):
# the current time - immediate task scheduling
if timestamp == None: timestamp = time.time()

# acquires the timestamp lock
self.timestamp_lock.acquire()

try:
# acquires the condition to be able to safely
# manipulate the structure and produce item
with self.condition:
# starts the index value
index = 0

Expand All @@ -278,8 +260,8 @@ def add_callable(self, callable, timestamp = None, verify = True):
# timestamp contains a value smaller than
# the timestamp to be inserted
if timestamp < _timestamp:
# breaks the loop (position for
# insertion reached)
# breaks the loop (position as
# insertion has been reached)
break

# increments the index
Expand All @@ -299,9 +281,10 @@ def add_callable(self, callable, timestamp = None, verify = True):
callable_list = self.timestamp_map.get(timestamp, [])
callable_list.append(callable)
self.timestamp_map[timestamp] = callable_list
finally:
# releases the timestamp lock
self.timestamp_lock.release()

# notifies the condition effectively indicating that a
# new item or set of items is available for consumption
self.condition.notify()

def set_exception_handler(self, exception_handler):
"""
Expand Down Expand Up @@ -341,3 +324,24 @@ def is_running(self, pedantic = False):

if pedantic: return self.running_flag and self.continue_flag
return self.continue_flag

def _handle_callables(self, callable_list):
# sets the busy flag indicating that there's execution
# of callable object happening
self.busy_flag = True

try:
# iterates over all the callables to call
# them (calls the proper function)
for callable in callable_list:
try:
# calls the callable (element)
# this can be of long duration
callable()
except Exception as exception:
if self.exception_handler:
self.exception_handler(callable, exception)
else:
print(exception)
finally:
self.busy_flag = False
10 changes: 9 additions & 1 deletion src/colony/test/libs/scheduling_util.py
Expand Up @@ -119,11 +119,19 @@ def update_values_raise():
self.assertEqual(scheduler.is_running(pedantic = True), True)

def exception_handler(callable, exception):
values["callable"] = callable
values["exception"] = exception.__class__

scheduler.set_exception_handler(exception_handler)

scheduler.add_callable(update_values_raise)
time.sleep(0.25)
self.assertEqual(values, dict(a = 1, exception = Exception))
self.assertEqual(
values,
dict(
a = 1,
callable = update_values_raise,
exception = Exception
)
)
self.assertEqual(scheduler.is_running(pedantic = True), True)

0 comments on commit 470e6f0

Please sign in to comment.