Skip to content

Commit

Permalink
some terst plus uses events to await results instead of spinlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
BinarSkugga committed Oct 11, 2023
1 parent f6b7828 commit 9d46f29
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/tentacule/i_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any, Type, Callable

from i_worker_process import IWorkerProcess
from tentacule.i_worker_process import IWorkerProcess


class IProcessPool(ABC):
Expand Down
49 changes: 27 additions & 22 deletions src/tentacule/process_pool.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import time
from multiprocessing import Queue
from multiprocessing import Queue, Event
from queue import Empty
from threading import Thread
from time import sleep
from typing import Any, Type, List, Callable, Optional

import dill

from i_process_pool import IProcessPool
from i_worker_process import IWorkerProcess
from utils import terminate_process_with_timeout, generate_unique_id
from worker_process import SimpleWorkerProcess
from tentacule.i_process_pool import IProcessPool
from tentacule.i_worker_process import IWorkerProcess
from tentacule.utils import terminate_process_with_timeout, generate_unique_id
from tentacule.worker_process import SimpleWorkerProcess


class ProcessPool(IProcessPool):
Expand All @@ -24,8 +24,9 @@ def __init__(self, workers: int, worker_class: Type[IWorkerProcess] = SimpleWork
self._task_queue = Queue()
self._result_queue = Queue()

self.result_timeout = 30
self.result_timeout = 15
self._results = {}
self._result_events = {}
self._result_thread: Optional[Thread] = Thread(target=self._watch_for_result, daemon=True)

def start(self):
Expand All @@ -50,17 +51,16 @@ def close(self, force: bool = False):
def new_task(self, task: Callable, *args, **kwargs) -> str:
task_id = generate_unique_id()
self._task_queue.put((task_id, dill.dumps(task), args, kwargs))
self._result_events[task_id] = Event()

return task_id

def get_result(self, task_id: str, timeout: int = 30) -> Any:
t = time.monotonic()
while time.monotonic() - t < timeout:
try:
result, t = self._results[task_id]
return result
except KeyError:
sleep(.1)
try:
self._result_events[task_id].wait(timeout)
return self._results[task_id][0]
finally:
self._result_events.pop(task_id)

def _rebalance(self):
self._pool = [p for p in self._pool if p.native_process.is_alive()]
Expand All @@ -75,11 +75,16 @@ def _rebalance(self):

def _watch_for_result(self):
while not self._stop_pool:
task_id, result = self._result_queue.get(timeout=self.result_timeout)
self._results[task_id] = (result, time.monotonic())

self._rebalance()
self._results = {
k: v for k, v in self._results.items()
if time.monotonic() - v[1] < self.result_timeout
}
try:
task_id, result = self._result_queue.get(timeout=self.result_timeout)

self._result_events[task_id].set()
self._results[task_id] = (result, time.monotonic())

self._rebalance()
self._results = {
k: v for k, v in self._results.items()
if time.monotonic() - v[1] < self.result_timeout
}
except Empty:
pass # It's okay if the queue was empty, just retry to get
2 changes: 1 addition & 1 deletion src/tentacule/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import dill

from i_worker_process import IWorkerProcess
from tentacule.i_worker_process import IWorkerProcess


class SimpleWorkerProcess(IWorkerProcess):
Expand Down
13 changes: 13 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest

from tentacule.process_pool import ProcessPool


@pytest.fixture(scope="session")
def pool():
pool = ProcessPool(workers=3)
try:
pool.start()
yield pool
finally:
pool.close()
2 changes: 2 additions & 0 deletions tests/depency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def subtract(a: int, b: int):
return a - b
2 changes: 0 additions & 2 deletions tests/test_dummy.py

This file was deleted.

59 changes: 59 additions & 0 deletions tests/test_simple_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from tests.depency import subtract


def simple_function():
return 5


def test_simple_function(pool):
task_id = pool.new_task(simple_function)

assert pool.get_result(task_id) == 5


def add(a: int, b: int):
return a + b


def test_simple_function_with_args(pool):
task_id = pool.new_task(add, 6, 7)

assert pool.get_result(task_id) == 13


def test_simple_function_with_kwargs(pool):
task_id = pool.new_task(add, a=5, b=5)

assert pool.get_result(task_id) == 10


def default_a(a: int = 0):
return a


def test_simple_function_with_default(pool):
task_id = pool.new_task(default_a)
assert pool.get_result(task_id) == 0

task_id = pool.new_task(default_a, 80)
assert pool.get_result(task_id) == 80


def recurse(a: int = 0):
if a == 10:
return a
return recurse(a + 1)


def test_recursive(pool):
task_id = pool.new_task(recurse)
assert pool.get_result(task_id) == 10


def dependency_subtract(a: int, b: int):
return subtract(a, b)


def test_function_with_dependency(pool):
task_id = pool.new_task(dependency_subtract, 7, 2)
assert pool.get_result(task_id) == 5

0 comments on commit 9d46f29

Please sign in to comment.