Skip to content

Commit

Permalink
feat: suport for waiting for callable task
Browse files Browse the repository at this point in the history
This makes use of special condirion objects
  • Loading branch information
joamag committed Dec 20, 2022
1 parent 470e6f0 commit 8fbc7fc
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 20 deletions.
97 changes: 87 additions & 10 deletions src/colony/libs/scheduling_util.py
Expand Up @@ -76,14 +76,30 @@ class Scheduler(threading.Thread):
timestamp_map = {}
""" The map associating the timestamp with a list of callables """

tasks = set()
""" The complete set of tasks that are currently under pending
execution, this should be a set of unique identifiers """

waits = set()
""" The sequence that contains the complete set of callables that
are waiting to be notified """

condition = None
""" The condition that will control the access to the data structures
and trigger events on the production of new items """

waits_condition = None
""" Condition that is going to be used in the waits operation and control
the access to the waits set """

exception_handler = None
""" If set defined an handler (callable) that is going to be called
whenever an exception is raised in the execution of the callable units """

_counter = 1
""" The unique identifier counter that is going to be incremented
per each callable to be added """

def __init__(self, sleep_step = DEFAULT_SLEEP_STEP):
"""
Constructor of the class.
Expand All @@ -102,20 +118,34 @@ def __init__(self, sleep_step = DEFAULT_SLEEP_STEP):
self.daemon = True
self.timestamp_queue = []
self.timestamp_map = {}
self.tasks = set()
self.waits = set()
self.condition = threading.Condition()
self.waits_condition = threading.Condition()

def run(self):
self.running_flag = True

# sets the initial value of the timeout, which is an unset
# one, this value is only going to be set in case of deferred
# first item in queue exists
timeout = None

# sets the running flag, effectively indicating that the scheduler
# is running its main loop
self.running_flag = True

try:
# iterates while the continue flag is set, this means
# that this is a continuous loop operation
while self.continue_flag:

# acquires the condition so that we can safely wait for
# new "events" and access the underlying data structures
# for proper and safe consuming of them
with self.condition:
while not self.timestamp_queue or timeout:
# waits for the condition while the system is running and
# either the queue is empty or there's a timeout defined
# (meaning a pending final value has been reached)
while self.continue_flag and (not self.timestamp_queue or timeout):
self.condition.wait(timeout = timeout)
timeout = None

Expand Down Expand Up @@ -144,6 +174,11 @@ def run(self):
# to be processed for now)
continue

# pops (removes first element) the timestamp
# from the timestamp queue, proper timestamp
# consuming operation
self.timestamp_queue.pop(0)

# retrieves the callable (elements) list
# for the timestamp
callable_list = self.timestamp_map[timestamp]
Expand All @@ -152,11 +187,6 @@ def run(self):
# (done before the calling to avoid race condition)
del self.timestamp_map[timestamp]

# pops (removes first element) the timestamp
# from the timestamp queue (done before the
# calling to avoid race condition)
self.timestamp_queue.pop(0)

# runs the callable calling operation (consuming)
# which should properly handle exceptions avoiding
# internal execution problems
Expand Down Expand Up @@ -217,7 +247,10 @@ def reset_scheduler(self):
self.busy_flag = False
self.timestamp_queue = []
self.timestamp_map = {}
self.tasks = set()
self.waits = set()
self.condition = threading.Condition()
self.wait_execution = threading.Condition()
exception_handler = None

def add_callable(self, callable, timestamp = None, verify = True):
Expand Down Expand Up @@ -250,6 +283,11 @@ def add_callable(self, callable, timestamp = None, verify = True):
# acquires the condition to be able to safely
# manipulate the structure and produce item
with self.condition:
# obtains the identifier for the current
# callable operation schedule
identifier = self._counter
self._counter += 1

# starts the index value
index = 0

Expand Down Expand Up @@ -279,13 +317,35 @@ def add_callable(self, callable, timestamp = None, verify = True):
# retrieves the list of callable for the given timestamp
# and then updates it with the given callable object
callable_list = self.timestamp_map.get(timestamp, [])
callable_list.append(callable)
callable_list.append((callable, identifier))
self.timestamp_map[timestamp] = callable_list

# adds the identifier to the sequence that controls the
# tasks that are considered active
self.tasks.add(identifier)

# notifies the condition effectively indicating that a
# new item or set of items is available for consumption
self.condition.notify()

# returns the final identifier for the callable task
# that has just been scheduled
return identifier

def wait_callable(self, identifier):
verify_util.verify(isinstance(identifier, int))

with self.waits_condition:
self.waits.add(identifier)

with self.condition:
if not identifier in self.tasks:
return

with self.waits_condition:
while self.continue_flag and identifier in self.waits:
self.waits_condition.wait()

def set_exception_handler(self, exception_handler):
"""
Sets the handler that will be called in case there's an
Expand Down Expand Up @@ -333,7 +393,7 @@ def _handle_callables(self, callable_list):
try:
# iterates over all the callables to call
# them (calls the proper function)
for callable in callable_list:
for callable, _identifier in callable_list:
try:
# calls the callable (element)
# this can be of long duration
Expand All @@ -345,3 +405,20 @@ def _handle_callables(self, callable_list):
print(exception)
finally:
self.busy_flag = False

# runs a final waits condition operation that will
# make sure that the pending waits values are notified
# in case they are in a waiting state, it will also
# remove the multiple callable tasks from the sequence
# that controls the pending tasks
with self.waits_condition:
notify = False

for callable, identifier in callable_list:
if identifier in self.waits:
self.waits.remove(identifier)
notify = True
self.tasks.remove(identifier)

if notify:
self.waits_condition.notify_all()
48 changes: 38 additions & 10 deletions src/colony/test/libs/scheduling_util.py
Expand Up @@ -62,10 +62,38 @@ def test_basic(self):
def update_values():
values["a"] = 1

self.assertNotEqual(values, dict(a = 1))
self.assertEqual(values, dict())

scheduler.add_callable(update_values)
identifier = scheduler.add_callable(update_values)
scheduler.wait_callable(identifier)
self.assertEqual(identifier, 1)
self.assertEqual(values, dict(a = 1))

def test_delayed(self):
"""
Tests that a delayed work can be scheduled and executed
in the proper time.
"""

scheduler = colony.Scheduler()
scheduler.start_scheduler()
self.assertEqual(scheduler.is_running(), True)

values = dict()

def update_values():
values["a"] = 1

self.assertEqual(values, dict())

initial = time.time()
identifier = scheduler.add_callable(update_values, time.time() + 0.5)
self.assertEqual(identifier, 1)
time.sleep(0.25)
self.assertEqual(values, dict())

scheduler.wait_callable(identifier)
self.assertEqual(time.time() - initial >= 0.5, True)
self.assertEqual(values, dict(a = 1))

def test_stopped(self):
Expand All @@ -88,11 +116,11 @@ def test_stopped(self):
def update_values():
values["a"] = 1

self.assertNotEqual(values, dict(a = 1))
self.assertEqual(values, dict())

scheduler.add_callable(update_values, verify = False)
time.sleep(0.25)
self.assertNotEqual(values, dict(a = 1))
identifier = scheduler.add_callable(update_values, verify = False)
scheduler.wait_callable(identifier)
self.assertEqual(values, dict())

self.assert_raises(RuntimeError, scheduler.start_scheduler)

Expand All @@ -113,8 +141,8 @@ def update_values_raise():
values["a"] = 1
raise Exception()

scheduler.add_callable(update_values_raise)
time.sleep(0.25)
identifier = scheduler.add_callable(update_values_raise)
scheduler.wait_callable(identifier)
self.assertEqual(values, dict(a = 1))
self.assertEqual(scheduler.is_running(pedantic = True), True)

Expand All @@ -124,8 +152,8 @@ def exception_handler(callable, exception):

scheduler.set_exception_handler(exception_handler)

scheduler.add_callable(update_values_raise)
time.sleep(0.25)
identifier = scheduler.add_callable(update_values_raise)
scheduler.wait_callable(identifier)
self.assertEqual(
values,
dict(
Expand Down

0 comments on commit 8fbc7fc

Please sign in to comment.