# Messaging with Tracking

*Characteristics:*
- Collaboration Style: **Events = Choreography**
- System Style: **Reactive**
- Communication Style: **Asynchronous (Non-Blocking)**
- Flow: **Tracking**

## Process / Scenario

The following "Order Fulfilment" process model depicts the tracked choreography.

![](/images/Track-Flow.png)

### Tasks
Instruction to run/do the exercise:
1. You may register on [CloudAMQP for a free RabbitMQ](https://www.cloudamqp.com) instance (optional).
2. Add the AMQP URL of your own RabbitMQ instance or a provided one to the follwoing textbox:

In [2]:
amqp_url = ''

3. Make sure that you use the right tenant:

In [1]:
tenant = 'showcase'

4. Download the process model ([download BPMN model - here](https://cdn.jsdelivr.net/gh/DigiBP/digibpnotes@main/modelling/Distributed%20Systems%20and%20Errors/Track-Flow.bpmn)) and open it in the Camunda Modeler.
5. Deploy the process model with the Camunda Modeler to the classroom instance with your **tenant**.
6. Make sure that you have the correct Camunda engine URL:

In [None]:
camunda_eninge_rest = 'https://digibp.herokuapp.com/engine-rest'

7. Run the notebook in Deepnote.
8. Investigate the process instantiated in the Camunda Cockpit and inspect the Deepnote output.

In [None]:
if not amqp_url:
    import os
    amqp_url = os.environ.get('AMQP_URL')

## Message Model

Below you will find the message model that is used for the message exchange, including `businessKey` generation using a UUID and `tenant` configuration:

In [None]:
from typing import Optional
from pydantic import BaseModel
import uuid


class Message(BaseModel):
    businessKey: str
    tenant: str
    command: Optional[str] = None
    event: Optional[str] = None
    payload: Optional[str] = None


message = Message(
    **{
        "businessKey": str(uuid.uuid1()),
        "tenant": tenant
    }
)

## Message Queues and Routing

In the following, the [Pika](https://pika.readthedocs.io) library providing AMQP protocol in Python is used to connect to RabbitMQ and declare four queues `order`, `payment`, `inventory`, and the **flow tracking** queue `order_fulfillment`. In order to be able to track the message exchange and thus the flow, so-called bindings with `exchange` and routings (`routing_key`) is defined.

In [None]:
import pika

params = pika.URLParameters(amqp_url)
# Connect to RabbitMQ
connection = pika.BlockingConnection(params)
channel = connection.channel()
# Declare order queue
channel.queue_declare(queue="order")
# Declare payment queue
channel.queue_declare(queue="payment")
# Declare inventory queue
channel.queue_declare(queue="inventory")
# Declare order fulfillment queue
channel.queue_declare(queue="order_fulfillment")

# Declare message routings
channel.exchange_declare(exchange=tenant)
channel.queue_bind(exchange=tenant, queue="order", routing_key="order")
channel.queue_bind(exchange=tenant, queue="payment", routing_key="payment")
channel.queue_bind(exchange=tenant, queue="inventory", routing_key="inventory")
channel.queue_bind(exchange=tenant, queue="order_fulfillment", routing_key="order")
channel.queue_bind(exchange=tenant, queue="order_fulfillment", routing_key="payment")
channel.queue_bind(exchange=tenant, queue="order_fulfillment", routing_key="inventory")
channel.queue_bind(exchange=tenant, queue="order_fulfillment", routing_key="shipment")

connection.close()


## Initial Event

In the following, an initial event `Order_Placed` is published to mimic a checkout and initialize the **tracked** choreography:

In [None]:
import pika, pickle

params = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

# Define and publish (send) the Order_Placed event
message.event = "Order_Placed"
channel.basic_publish(exchange=tenant, routing_key="order", body=pickle.dumps(message))
print("Order_Placed message sent.")

connection.close()


## Message Subscription of Microservices and Tracker

In the following, three microservices are imitated, each of which has a subscription of one queue. After a message is received, a subsequent event is queued again. In addition, an `order_fulfillment` microservice is implemented, which receives all events of the choreography and forwards them to the BPMN process as messages via the Camunda REST API.

In [None]:
import pika, pickle, requests, signal

params = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

# Order fulfillment tracking microservice callback function
def order_fulfillment(channel, method, properties, body):
    message = pickle.loads(body)
    print("Received " + str(message))
    request_data = {
        "messageName": message.event,
        "businessKey": message.businessKey,
        "tenantId": message.tenant,
    }
    response = requests.post(camunda_eninge_rest + "/message", json=request_data)
    return


# Payment microservice callback function
def payment(channel, method, properties, body):
    message = pickle.loads(body)
    print("Received " + str(message))
    message.event = "Payment_Done"
    channel.basic_publish(
        exchange=tenant, routing_key="payment", body=pickle.dumps(message)
    )
    return


# Inventory microservice callback function
def inventory(channel, method, properties, body):
    message = pickle.loads(body)
    print("Received " + str(message))
    message.event = "Goods_Fetched"
    channel.basic_publish(
        exchange=tenant, routing_key="inventory", body=pickle.dumps(message)
    )
    return


# Shipment microservice callback function
def shipment(channel, method, properties, body):
    message = pickle.loads(body)
    print("Received " + str(message))
    message.event = "Goods_Shipped"
    channel.basic_publish(
        exchange=tenant, routing_key="shipment", body=pickle.dumps(message)
    )
    return


# Subscriptions on queues
channel.basic_consume("order_fulfillment", order_fulfillment, auto_ack=True)
channel.basic_consume("order", payment, auto_ack=True)
channel.basic_consume("payment", inventory, auto_ack=True)
channel.basic_consume("inventory", shipment, auto_ack=True)


def close_handler(signal, frame):
    connection.close()


# Run consuming server
try:
    signal.signal(signal.SIGINT, close_handler)
    channel.start_consuming()
except Exception as e:
    print(e)


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=5f4dd5da-1c7b-41a5-9634-cafd01851cb8' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>