In [1]:
import sys
sys.path.append('../')
import conproc

In [2]:
import os

class EchoProcess(conproc.BaseWorkerProcess):
    '''Process that sends back the same data it receives.'''
    
    def __call__(self):
        '''Main process loop.'''
        print(f'Starting process {os.getpid()}')
        while True:
            data = self.messenger.receive_data(blocking=True)
            print(f'process {os.getpid()} received: {data}')
            self.messenger.send_reply(data)

resource = conproc.WorkerResource(EchoProcess)
resource

WorkerResource(worker_process_type=<class '__main__.EchoProcess'>, method=None, _proc=None, _messenger=None)

In [3]:
with conproc.WorkerResource(EchoProcess) as w:
    w.messenger.send_data_noreply('Hello, world!')
    
    print('received:', w.messenger.receive_data())

Starting process 776895


process 776895 received: Hello, world!
received: Hello, world!


In [4]:
with conproc.WorkerResource(EchoProcess) as w:
    w.messenger.send_request('Hello, world!')
    
    print(f'{w.messenger.remaining()=}')
    
    print('received:', w.messenger.receive_data())
    
    print(f'{w.messenger.remaining()=}')

Starting process 776904
process 776904 received: Hello, world!
w.messenger.remaining()=1
received: Hello, world!
w.messenger.remaining()=0


In [5]:
with conproc.WorkerResource(EchoProcess) as w:
    w.messenger.send_request('Hello, world!')
    w.messenger.send_request('Hello, world!')
    w.messenger.send_request('Hello, world!')
    
    print(f'{w.messenger.remaining()=}')
    
    while w.messenger.remaining() > 0:
        print('received:', w.messenger.receive_data())

Starting process 776913
process 776913 received: Hello, world!w.messenger.remaining()=3

process 776913 received: Hello, world!
process 776913 received: Hello, world!
received: Hello, world!
received: Hello, world!
received: Hello, world!


In [6]:
import dataclasses
            
@dataclasses.dataclass
class HighPriorityMessage:
    value: int
    priority: int = -1 # high priority
    
@dataclasses.dataclass
class LowPriorityMessage:
    value: int
    priority: int = 1 # low priority

In [7]:
class MyProcess:
    '''Process that sends back the same data it receives.'''
    def __init__(self, messenger: conproc.PriorityMessenger, verbose: bool = False):
        self.messenger = messenger
        self.verbose = verbose
    
    def __call__(self):
        if self.verbose: print(f'Starting process {os.getpid()}')
        
        while True:
            x = self.messenger.receive_data(blocking=True)
            if x > 10:
                self.messenger.send_reply(HighPriorityMessage(x))
            else:
                self.messenger.send_reply(LowPriorityMessage(x))

import time

with conproc.WorkerResource(MyProcess) as w:
    w.messenger.send_request(1)
    w.messenger.send_request(100)
    w.messenger.send_request(2)
    
    time.sleep(0.1)
    
    while w.messenger.remaining() > 0:
        print('received:', w.messenger.receive_data())


received: HighPriorityMessage(value=100, priority=-1)
received: LowPriorityMessage(value=1, priority=1)
received: LowPriorityMessage(value=2, priority=1)


## Custom Worker Interfaces

In [8]:
@dataclasses.dataclass
class StartPrinting:
    pass

@dataclasses.dataclass
class StopPrinting:
    pass

@dataclasses.dataclass
class ChangePrintBehavior:
    print_frequency: int
    print_char: str
    
@dataclasses.dataclass
class Receipt:
    pass

In [9]:
class AlreadyPrintingError(Exception):
    '''Raised when user requests start but process is already printing.'''

class AlreadyNotPrintingError(Exception):
    '''Raised when user requests stop but process is already not printing.'''

In [10]:
import typing

@dataclasses.dataclass
class PrintingProcess:
    messenger: conproc.PriorityMessenger # every process must accept this
    print_frequency: int # these must be set at worker start
    print_char: str
    keep_printing: bool # this can optionally be set
    
    def __call__(self):
        print(f'{self.keep_printing=}')
        while True:
            try:
                # if we're not printing, keep waiting for a new message
                msg = self.messenger.receive_data(blocking = not self.keep_printing)
            except IndexError:
                msg = None
                
            if msg is not None:
                self.handle_message(msg)
                
            if self.keep_printing:
                print(self.print_char, end='', flush=True)
                time.sleep(self.print_frequency)
    
    def handle_message(self, msg: typing.Union[StartPrinting, StopPrinting, ChangePrintBehavior]):
        if isinstance(msg, StartPrinting):
            if self.keep_printing:
                self.messenger.send_error(AlreadyPrintingError())
            self.keep_printing = True
            
        elif isinstance(msg, StopPrinting):
            if not self.keep_printing:
                self.messenger.send_error(AlreadyNotPrintingError())
            self.keep_printing = False
            
        elif isinstance(msg, ChangePrintBehavior):
            self.print_frequency = msg.print_frequency
            self.print_char = msg.print_char
            
        else:
            # process should die in this case
            self.messenger.send_error(NotImplementedError(f'Unknown message type: {type(msg)}'))
        
        self.messenger.send_reply(Receipt())


In [11]:
@dataclasses.dataclass
class PrintProcessController:
    messenger: conproc.PriorityMessenger
    def start_printing(self):
        self.messenger.send_request(StartPrinting())
        return self.messenger.receive_data() # wait for receipt
    
    def stop_printing(self):
        self.messenger.send_request(StopPrinting())
        return self.messenger.receive_data() # wait for receipt
    
    def change_behavior(self, print_frequency: int, print_char: str):
        self.messenger.send_request(ChangePrintBehavior(print_frequency, print_char))
        return self.messenger.receive_data() # wait for receipt

class Printer:
    
    def __init__(self, print_frequency: int, print_char: str, start_printing: bool = False):
        # passed to PrintingProcess when starting
        self.process_kwargs = dict(
            print_frequency = print_frequency, 
            print_char = print_char, 
            keep_printing = start_printing
        )
        self.resource = conproc.WorkerResource(PrintingProcess)
        
    def __enter__(self) -> PrintProcessController:
        self.resource.start(**self.process_kwargs)
        return PrintProcessController(self.resource.messenger)
    
    def __exit__(self, *args):
        self.resource.terminate()

In [13]:
with Printer(print_frequency=0.05, print_char='x', start_printing=False) as p:
    print('\n----')
    time.sleep(0.2) # shouldnot be printing
    print('\n----')
    p.start_printing()
    print('\n----')
    time.sleep(0.2) # should be printing
    print('\n----')
    p.change_behavior(0.01, 'y')
    print('\n----')
    time.sleep(0.2) # should be printing
    p.stop_printing()
    time.sleep(0.2) # should not be printing
    
    try: # this error is being sent by the printer process
        p.stop_printing()
    except AlreadyNotPrintingError:
        print('Already not printing! oh well.')

self.keep_printing=False



----

----
xxxx
----

----
yyyyyyyyyy
----
y

NoneType: None


Already not printing! oh well.
