In [1]:
import json

import asyncio
import aioredis

import nest_asyncio
nest_asyncio.apply()

import rx
from rx.subject import Subject
from rx.core import Observer, Observable, ConnectableObservable, typing
from rx.disposable import Disposable, CompositeDisposable
from rx import operators as ops

from dataclasses import dataclass, field
from typing import Type, Any, List, Tuple, Callable, Optional, Union, Awaitable, Coroutine, NamedTuple

import time
@dataclass
class Item:
    src: str = None
    dst: str = None
    hops: int = 0
    timestamp: float = field(default_factory=lambda: time.time())

@dataclass
class Message(Item):
    value: Any = None

    def __repr__(self):
        return f"{self.__class__.__name__}({self.value}, {self.src}, {self.dst}, {self.hops}, {self.timestamp})"

@dataclass
class CallMethod(Item):
    name: str = None
    args: list = None
    kwargs: dict = None

@dataclass
class PayloadTest:
    name: str ='ciao'
    number: int = 6
    numbers: List = field(default_factory=lambda: [1,2,3])


@dataclass
class Port:
    # TODO: add method to manipulate operators (append, prepend, etc...) and update pipe/input/subscribe?!
    input: Subject = None
    pipe: Observable = None
    operators: List =  field(default_factory=lambda: [])
    output: Subject = None
    disposable: Disposable = None

import pickle

def _serializer(item):
    return pickle.dumps(item, pickle.HIGHEST_PROTOCOL)
    # return json.dumps(item, default=lambda obj: obj.__dict__)

class RedisPublishObserver(Observer):

    def __init__(self, channel, redis = None) -> None:
        self._channel = channel
        self._queue = asyncio.Queue()
        self._redis = redis or aioredis.from_url("redis://localhost")

        self._p_in = Port(input=Subject(), operators=[ops.materialize()])
        self._p_in.pipe = self._p_in.input.pipe(*self._p_in.operators)
        self._p_in.disposable = self._p_in.pipe.subscribe(on_next=lambda item: self._publish(item))

        self._task = asyncio.create_task(self._publish_queue())

        super().__init__(self._p_in.input.on_next, self._p_in.input.on_error, self._p_in.input.on_completed)

    def _publish(self, item):
        self._queue.put_nowait(item)

    async def _publish_queue(self):
        while True:
            item = await self._queue.get()
            print("PUBLISH", self._channel, item)
            await self._redis.publish(self._channel, _serializer(item))

    def dispose(self) -> None:
        self._task.cancel()
        self._p_in.disposable.dispose()

    def __del__(self):
        self.dispose()

In [2]:
import asyncio
import aioredis
import pickle


class RedisSubscribeObservable(Observable):
    def __init__(self, channels_patterns, redis=None) -> None:
        self._redis = redis or aioredis.from_url("redis://localhost")
        self._pubsub = None
        self._channels_pattern = channels_patterns if isinstance(channels_patterns, List) else [channels_patterns]
        self._disposable = CompositeDisposable()

        def on_subscribe(observer, scheduler):
            print("subscribe")

            def _on_publish(message):
                item = pickle.loads(message['data'])
                observer.on_next(item)

            async def init_redis_sub():
                self._pubsub = self._redis.pubsub()

                patterns_callbacks = {pattern: _on_publish for pattern in self._channels_pattern}
                print(patterns_callbacks)
                await self._pubsub.psubscribe(**patterns_callbacks)
                await self._pubsub.run()

            task = asyncio.create_task(init_redis_sub())
            self._disposable.add(Disposable(lambda: task.cancel()))

            return Disposable(lambda: asyncio.create_task(self.unsubscribe()))

            # async def _aio_sub():
            #    observer.on_next(i)
            #     loop.call_soon(observer.on_completed)
            #     loop.call_soon(functools.partial(observer.on_error, e))
            #
            # task = asyncio.ensure_future(_aio_sub(), loop=loop)
            # return Disposable(lambda: task.cancel())

        self._auto_connect = rx.create(on_subscribe).pipe(ops.dematerialize(), ops.share())
        super().__init__()

    def _subscribe_core(self, observer, scheduler = None):
        return self._auto_connect.subscribe(observer, scheduler=scheduler)

    async def unsubscribe(self):
        await self._pubsub.unsubscribe(*self._channels_pattern)
        self._disposable.dispose()

    def __del__(self):
        self.unsubscribe()

from mape.utils import LogObserver

ciao = RedisSubscribeObservable(["loop_uid:element_uid","loop_uid:element_uid2"])
a = ciao.subscribe(LogObserver("a"))
ciao = RedisSubscribeObservable(["loop_uid:element_uid2"])
b = ciao.subscribe(LogObserver("b"))

pub_observer = RedisPublishObserver("loop_uid:element_uid")

pub_observer.on_next(Message(value=PayloadTest(), src='src0'))
pub_observer.on_next(2)
pub_observer.on_error(Message(value="error", src='src2'))
pub_observer.on_completed()


await asyncio.sleep(4)
pub_observer.dispose()

print("dajee")

pub_observer = RedisPublishObserver("loop_uid:element_uid2")

pub_observer.on_next(Message(value=PayloadTest(), src='src10'))
pub_observer.on_next(20)
# pub_observer.on_error(Message(value="error", src='src20'))
pub_observer.on_completed()

await asyncio.sleep(4)

a.dispose()
b.dispose()

subscribe
{'loop_uid:element_uid': <function RedisSubscribeObservable.__init__.<locals>.on_subscribe.<locals>._on_publish at 0x7feb38194790>, 'loop_uid:element_uid2': <function RedisSubscribeObservable.__init__.<locals>.on_subscribe.<locals>._on_publish at 0x7feb38194790>}
PUBLISH loop_uid:element_uid OnNext(Message(PayloadTest(name='ciao', number=6, numbers=[1, 2, 3]), src0, None, 0, 1644254365.8394518))
(3103) on next: Message(PayloadTest(name='ciao', number=6, numbers=[1, 2, 3]), src0, None, 0, 1644254365.8394518) | a
PUBLISH loop_uid:element_uid OnNext(2.0)
(3103) on next: 2 | a
PUBLISH loop_uid:element_uid OnError(Message(error, src2, None, 0, 1644254365.8395813))
on error: Message(error, src2, None, 0, 1644254365.8395813) | a
subscribe
dajee
{'loop_uid:element_uid': <function RedisSubscribeObservable.__init__.<locals>.on_subscribe.<locals>._on_publish at 0x7feb38128820>, 'loop_uid:element_uid2': <function RedisSubscribeObservable.__init__.<locals>.on_subscribe.<locals>._on_publis

  self.unsubscribe()
