Skip to content

Commit

Permalink
Prepare execution helper
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasst committed Feb 14, 2024
1 parent 6cf89d3 commit e7bbc23
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
17 changes: 2 additions & 15 deletions tasktiger/executor.py
Expand Up @@ -93,23 +93,10 @@ def execute(
locks: Collection[Lock],
queue_lock: Optional[Semaphore],
) -> bool:
all_task_ids = {task.id for task in tasks}

# The tasks must use the same function.
assert len(tasks)
serialized_task_func = tasks[0].serialized_func
task_func = tasks[0].func
assert all(
[
serialized_task_func == task.serialized_func
for task in tasks[1:]
]
)

# Before executing periodic tasks, queue them for the next period.
if serialized_task_func in self.tiger.periodic_task_funcs:
tasks[0]._queue_for_next_period()
serialized_task_func = tasks[0].serialized_func

all_task_ids = {task.id for task in tasks}
with g_fork_lock:
child_pid = os.fork()

Expand Down
17 changes: 17 additions & 0 deletions tasktiger/worker.py
Expand Up @@ -618,6 +618,21 @@ def _process_from_queue(self, queue: str) -> Tuple[List[str], int]:

return task_ids, processed_count

def _prepare_execution(self, tasks: List[Task]):
# The tasks must use the same function.
assert len(tasks)
serialized_task_func = tasks[0].serialized_func
assert all(
[
serialized_task_func == task.serialized_func
for task in tasks[1:]
]
)

# Before executing periodic tasks, queue them for the next period.
if serialized_task_func in self.tiger.periodic_task_funcs:
tasks[0]._queue_for_next_period()

def _execute_task_group(
self,
queue: str,
Expand Down Expand Up @@ -683,6 +698,8 @@ def _execute_task_group(
if self.stats_thread:
self.stats_thread.report_task_start()

self._prepare_execution(ready_tasks)

success = self.executor.execute(
queue, ready_tasks, log, locks, queue_lock
)
Expand Down

0 comments on commit e7bbc23

Please sign in to comment.