In [4]:
# %%capture
# !pip install kombu
# !pip install scikit-learn
# !pip install fastapi
# !pip install uvicorn

# Decorator

In [12]:
from functools import wraps

def output_decorator(func):
    print("output_decorator")
    def func_wrapper(*args, **kwargs):
        print(args, kwargs)
        output = func(*args, **kwargs)
        print("postprocess")
        return output
    return func_wrapper

In [13]:
@output_decorator
def test(*args):
    print("inside test")
    return 0
test(1,2)

output_decorator
(1, 2) {}
inside test
postprocess


0

In [5]:
from functools import wraps
def somedec(somearg=1, someopt=None):
    def somedec_outer(fn):
        @wraps(fn)
        def somedec_inner(*args, **kwargs):
            # do stuff with somearg, someopt, args and kwargs
            response = fn(*args, **kwargs)
            return response, somearg, someopt

        return somedec_inner

    return somedec_outer

# @somedec(1, someopt=2)
# def f1(x: int = 1) -> int:
#     return x + 1

@somedec()
def f1(x: int = 1) -> int:
    return x + 1

f1(1)


(2, 1, None)

# Kombu queues example

## Simple example

In [3]:
from kombu import Connection, Exchange, Queue, Producer

rabbit_url = "memory://localhost/"

conn = Connection(rabbit_url)

channel = conn.channel()

exchange = Exchange("example-exchange", type="direct")

producer = Producer(exchange=exchange, channel=channel, routing_key="BOB")

queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")
queue.maybe_bind(conn)
queue.declare()

producer.publish("Hello there!")

In [4]:
from kombu import Connection, Exchange, Queue, Consumer

rabbit_url = "memory://localhost/"

conn = Connection(rabbit_url)

exchange = Exchange("example-exchange", type="direct")

queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")


def process_message(body, message):
    print("The body is {}".format(body))
    message.ack()


with Consumer(conn, queues=queue, callbacks=[process_message], accept=["text/plain"]):
    conn.drain_events(timeout=2)

The body is Hello there!


## Complete example

In [1]:
from kombu import Connection, Exchange, Queue, Producer, Consumer, Message
from kombu.log import get_logger
from kombu.mixins import ConsumerMixin, ConsumerProducerMixin
from kombu.utils.functional import reprcall

from threading import Thread

rabbit_url = "memory://localhost/"
conn = Connection(rabbit_url)
channel = conn.channel()

exchange = Exchange("exchange1", type="direct")
queue = Queue(name="queue1", exchange=exchange, routing_key="rk1")
queue.maybe_bind(conn)
queue.declare()

tgt_exchange = Exchange("exchange2", type="direct")
tgt_queue = Queue(name="queue2", exchange=tgt_exchange, routing_key="rk2")
tgt_queue.maybe_bind(conn)
tgt_queue.declare()



'queue2'

In [2]:

producer = Producer(exchange=exchange, channel=channel, routing_key="rk1")

producer.publish("Hello there!")

In [3]:
class Worker(ConsumerProducerMixin):

    def __init__(self, connection, queue, tgt_exchange):
        self.connection = connection
        self.queue = queue
        self.tgt_exchange = tgt_exchange

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=self.queue,
                         on_message=self.handle_message,
                        #  accept="text/plain",
                         prefetch_count=10)]

    def handle_message(self, message):
        print(message)
        self.producer.publish(
            'hello to you',
            exchange=self.tgt_exchange,
            routing_key="rk2",
            # correlation_id=message.properties['correlation_id'],
            retry=True,
        )


def start_consumer_producer(queue, tgt_exchange):
    with Connection('memory://localhost/') as conn:
        queue.maybe_bind(conn)
        queue.declare()
        worker = Worker(connection=conn, queue=queue, tgt_exchange=tgt_exchange)
        worker.run()

t = Thread(
        target=start_consumer_producer,
        kwargs={"queue": queue, "tgt_exchange": tgt_exchange}
    )
t.start()
# with Connection('memory://localhost/') as conn:

# worker = Worker(connection=conn, queue=queue, tgt_exchange=tgt_exchange)
# worker.run()

<Message object at 0x7f1ae02985e0 with details {'state': 'RECEIVED', 'content_type': 'text/plain', 'delivery_tag': '76aeeb23-2990-4463-91ca-30ff9fe96f86', 'body_length': 12, 'properties': {}, 'delivery_info': {'exchange': 'exchange1', 'routing_key': 'rk1'}}>


In [4]:
def process_message(body, message):
    print("The body is {}".format(body))
    message.ack()


with Consumer(conn, queues=tgt_queue, callbacks=[process_message], accept=["text/plain"]):
    conn.drain_events(timeout=2)

The body is hello to you


# Kombu decorator

In [1]:
from sklearn.neural_network import MLPClassifier
from model2queue.decorators import predict_from_queue, enqueue_task

X = [[0., 0.], [1., 1.]]
y = [0, 1]
clf = MLPClassifier(solver='lbfgs', alpha=1e-5,
                     hidden_layer_sizes=(5, 2), random_state=1)

clf.fit(X, y)


In [None]:
@enqueue_task
def hello():
    return "hello"

In [3]:
@predict_from_queue
def predict_input(input):
    print(input)
    # clf.predict(input)
    return input

In [1]:
from kombu import Connection, Exchange, Queue, Producer, Consumer, Message
from kombu.log import get_logger
from kombu.mixins import ConsumerMixin, ConsumerProducerMixin
from kombu.utils.functional import reprcall

from threading import Thread

conn_url = "memory://localhost/"
connection = Connection(conn_url)
channel = connection.channel()

in_exchange = Exchange("in_excahnge", type="direct")
in_queue = Queue(name="queue1", exchange=in_exchange, routing_key="api_model_input")
in_queue.maybe_bind(connection)
in_queue.declare()

out_exchange = Exchange("out_exchange", type="direct")
out_queue = Queue(
    name="out_queue", exchange=out_exchange, routing_key="api_model_output"
)
out_queue.maybe_bind(connection)
out_queue.declare()


'out_queue'

In [4]:

producer = Producer(exchange=in_exchange, channel=channel, routing_key="api_model_input")

producer.publish("Hello there!")

<Message object at 0x7f999462d790 with details {'state': 'RECEIVED', 'content_type': 'text/plain', 'delivery_tag': 'b72bbe7e-b34b-413b-a4ea-157ed989d2f8', 'body_length': 12, 'properties': {}, 'delivery_info': {'exchange': 'in_excahnge', 'routing_key': 'api_model_input'}}>
publishing: hello to you


In [None]:
def process_message(body, message):
    print("The body is {}".format(body))
    message.ack()


with Consumer(connection, queues=out_queue, callbacks=[process_message]):
    connection.drain_events(timeout=2)