# RabbitMQ worker - Worker  - Data Processor

We have three use cases in our app for the queues:
1. The API will trigger creation of a new job.  So this will likely be a new connection to RabbitMQ, sending a message to the queue, and then closing the connection.
2. The first worker will listen to a message, do a job and then post to another queue.
3. The second worker will only listen to a message.  

Both 2 and 3 can use blocking connections, in that they can remain open and listen for messages.

So for refactoring we can split this into these building blocks:
* A base that creates a connection to RabbitMQ, creates a channel and ensures the right queues are available.
* A component for the API to send a new message
* A component for the first worker that listens to a message, does a job and posts to another queue
* A component for the second worker that listens to a message and then does a job

We will have two queues:
* `start_fetch` - to trigger a new fetch job
* `data_processing` - to trigger a data processing on fetched data

In [1]:
# import pika

## Base - Our connection building block

In [2]:
import pika
from rich import pretty, print
pretty.install()

def open_channel():
    """Opens a connection, a channel, creates queues and then returns this to the caller."""
    credentials = pika.PlainCredentials("DEV_USER", "CHANGE_ME")
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host="localhost", credentials=credentials)
    )
    channel = connection.channel()
    channel.queue_declare(queue="start_fetch", durable=True)
    channel.queue_declare(queue="data_processing", durable=True)
    return channel

## Component - worker 2 job Processor

In [None]:
LISTEN_QUEUE = "data_processing"


def run_processing_worker():
    """Does the worker 2 processing of data."""

    # Get a new channel from the base
    channel = open_channel()

    # Callback that would be run when a message is received
    def callback(ch, method, properties, body):
        print("Received message from queue: {}".format(LISTEN_QUEUE))
        print(f"Message payload:{body}")

        # Do the data processing
        print("Doing the data processing")
        result = body

        # Do something with the data
        print("Finished my processing job")

        # Acknowledge the incoming message to remove it from the queue
        ch.basic_ack(delivery_tag=method.delivery_tag)

    # Register the callback
    channel.basic_consume(queue=LISTEN_QUEUE, on_message_callback=callback)

    # This is a blocking connection
    channel.start_consuming()


run_processing_worker()

These all seem to work.

Now I can move them into the bases and components