In [41]:
from __future__ import annotations
import json, threading, traceback
from typing import Callable, Literal, Optional
import time

In [None]:
class CancellableTask:
    """Run a function in its own thread that can be started and stopped via pipe messages."""

    def __init__(self, func: Callable[[threading.Event], None]):
        self._func = func
        self._thread: Optional[threading.Thread] = None
        self._stop_evt = threading.Event()

    def __call__(self, message: Literal["start", "stop"]):
        match message:
            case "start":
                return self.start()
            case "stop":
                return self.stop()
            case _:
                raise ValueError(f"Unknown message: {message}")

    def is_running(self) -> bool:
        """Check if the task is currently running."""
        return self._thread is not None and self._thread.is_alive()

    def start(self):
        """Start the task if it is not already running."""
        if not self.is_running():
            self._stop_evt.clear()
            self._thread = threading.Thread(target=self._run, daemon=True)
            self._thread.start()
            return {"status": "started"}
        return {"status": "already_running"}

    def stop(self):
        """Stop the task if it is running."""
        if self.is_running():
            self._stop_evt.set()
            self._thread.join()
            return {"status": "stopped"}
        return {"status": "not_running"}

    # ––– internal –––
    def _run(self):
        try:
            self._func(self._stop_evt)
        except Exception as ex:  # make sure a task crash doesn't kill the server thread
            print("CancellableTask crashed:", ex)
            traceback.print_exc()

In [43]:
class FakePipe:
    """Fake named pipe for testing purposes."""
    
    def __init__(self, name: str = r"\\.\pipe\MatPy"):
        self.name = name
        self.messages = []
        self.closed = False

    def write(self, data: bytes):
        if self.closed:
            raise BrokenPipeError("Pipe is closed")
        self.messages.append(data)

    def read(self, size: int) -> bytes:
        if not self.messages:
            return b""
        return self.messages.pop(0)

    def close(self):
        self.closed = True

In [44]:
class NamedPipeServer:
    """Single‑client, message‑framed named‑pipe server that survives callback errors."""

    def __init__(self, *, fake_pipe: FakePipe, callback=None, bufsize: int = 65536):
        self.pipe = fake_pipe  # FakePipe instance for testing
        self.callback = callback  # Python callable (dict → dict | None)
        self.bufsize = bufsize
        self._stop_event = threading.Event()
        self._thread = threading.Thread(target=self._listen, daemon=True)

    # ––– public API –––
    def start(self):
        """Begin listening in a background thread (returns immediately)."""
        self._thread.start()

    def stop(self):
        """Request shutdown and join the thread."""
        self._stop_event.set()
        self._thread.join()

    # ––– internal –––
    def _listen(self):
        while not self._stop_event.is_set():
            # Use the fake pipe for testing instead of creating a Windows named pipe
            pipe = self.pipe
            try:
                # Simulate client connection for fake pipe
                # client loop
                while not self._stop_event.is_set():
                    try:
                        raw = pipe.read(self.bufsize)
                    except Exception:
                        break  # pipe closed or error
                    if not raw:
                        continue  # no message available
                    try:
                        message = json.loads(raw.decode())
                    except json.JSONDecodeError as ex:
                        continue

                    if self.callback:
                        try:
                            self.callback(message["message"])
                        except Exception as ex:
                            print("Callback exception:", ex)
                            traceback.print_exc()
            finally:
                time.sleep(0.01)  # avoid busy-waiting

In [None]:
pipe = FakePipe()

def task_function(event: threading.Event):
    print("Task has started")
    for i in range(15):
        if event.is_set():
            print("Task has been cancelled")
            return
        time.sleep(1)
        print("Task is running, iteration ", i + 1)
    raise RuntimeError("Task completed unexpectedly")

task = CancellableTask(task_function)

server = NamedPipeServer(fake_pipe=pipe, callback=task)

In [None]:
server.start()

Task has started
Task is running
Task is running
Task is running
Task has started
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running
Task is running


In [54]:
pipe.write(json.dumps({"message": "start"}).encode())

In [26]:
raw=pipe.read(0)
if raw:
    print(json.loads(raw.decode()))

In [56]:
pipe.write(json.dumps({"message": "stop"}).encode())

In [55]:
server.stop()

In [14]:
json.loads(json.dumps({"message": "start"}).encode())

{'message': 'start'}

In [62]:
from types import SimpleNamespace

MESSAGES = SimpleNamespace(
    COMMAND_KEY="cmd",
    START_CMD="start",
    STOP_CMD="stop"
)

In [64]:
MESSAGES.START_CMD

'start'