# Guidelines for event messages

## Introduction

Interoperability is the scourge of all information systems: knowing what data means, how it is formatted, what the semantics of the various fields are, etc., can be a daunting task. Micro-services are generally designed to process messages in potentially large quantities and, in order to do that, need to know what the messages mean. They are also generally designed to be de-coupled from each other so they can easily be replaced or updated. That means that to construct a system with a micro-service-oriented architecture, you need to have them communicate with each other and you need messages to do that. Those messages have to be well-defined and follow some ground rules.

Certain meta-data is needed in any message: some way to trace messages through a multi-micro-service system and tie them back to the original event or command that caused them and the transaction they are a part of, and information to tie it back to the user or tenant (service) that authorized the event or command. This is important for various reasons:

- meta-data can be used to trace a single request or event through the entire system: a purchase transaction, for example, will start in the UI and be assigned a random *correlation ID*, after which that correlation ID is used for every command and event that results from that transaction. If anything goes wrong for a particular transaction, the correlation ID is what you can use to know what happened, and where.
- meta-data can be used to observe the behavior of any micro-service, or any device if it's produced by a device: a *producer ID* can be added to the message that can then be used to order events if they are produced by a device, or to debug a micro-service if it's showing intermittent bad behavior, for example.
- meta-data can be used to provide authentication and authorization in a way that is consistent with zero-trust principles of cybersecurity, using an authorization token to allow an action only if that token is authentic and authorizes that action, and consistently generating an audit trail for observability.

Any message will also have a type and its own id. The former, along with the topic the message is published to, allows the message to be routed to its intended recipients and/or the recipient to filter messages it needs to handle. The latter can be used to make sure a single message is only processed once, especially if the bus being used by the system does not provide a deliver-only-once guarantee.

Micro-services also tend to be re-used, and should be designed for re-use and to be composable. They may be re-used on different bus implementations, such as an AMQP bus that has build-in routing, or a Kafka bus that is optimized for high volume and resiliency but leaves the routing to the user. To abstract this away from the user, and to allow messages to bridge buses through data pumps if needed, a few additional recommendations come to mind, which are discussed in this post.

The recommendations discussed in this post, to address the concerns outlined above, are:

1. Use a common super-schema for all messages
2. Generate events for anything that happens in the system and post them to an approriate topic on the bus (they can always be ignored if not needed).
3. Generate commands *only* if the micro-service needs something to be done by a *generic* micro-service on the bus and that service should be de-coupled from the events generated by the other services (e.g. sending E-mail, running garbage collection tasks, etc.)
4. If available, include the authorizing JWT token with the message.



## Using a super-schema for all events

Different event bus implementations have different ways to handle meta-data. For micro-services to be both interoperable and agnostic of the bus they're used on, the schema should include any meta-data necessary to parse, track, and trace the event. Individual connectors can always copy the meta-data into header fields for the particular bus if needed.

This leads to a schema such as this one:

```json
{
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "type": { "type": "string" },
        "metadata": {
            "type": "object",
            "properties": {
                "cid": { "type": "string", "format": "uuid" },
                "tid": { "type": "string", "format": "uuid" },
                "pid": { "type": "string", "format": "uuid" },
                "uid": { "type": "string", "format": "uuid" },
                "token": { "type": "string" }
            },
            "required": [ "cid" ]
        },
        "data": { "type": "object" }
    },
    "required": [ "id", "type", "metadata" ]
}
```

The value of the `id` field is a UUID generated by the micro-service for a specific transaction or message. If more than one event is generated for the same transaction they *may* have the same `id` if (and only if) they are not of the same type. If more than one event is generated for the same transaction, the `metadata.tid` field (described below) will have the same value for all of them, but the `id` field will be different between messages.

The value of the `type` field is the event type. This could be something like "PurchaseOrderReceived". The event type dictates the schema of the `data` field to be used when parsing (see below).

The `metadata` field contains five fields:
- `metadata.cid`, a "*correlation ID*" tracks all events and transactions that results from an initial (user) action;
- `metadata.tid`, a "*transaction ID*" (optional) tracks all events that result from the same transaction within the same micro-service or device;
- `metadata.pid`, a "*producer ID*" (optional) is the unique ID of the producer of the event;
- `metadata.uid`, a *user ID* (optional) indicates the user's unique ID, which may be required for some events;
- `metadata.token`, an *authorization token* (optional), the JWT token authorizing the event and any actions that may result from it.

The schema of the `data` element is owned by, and documented with, the micro-service that generates the event. This schema is used to validate and parse the event when it is received, and contains the names and types of the data fields. Note that it does not contain the units and semantics of those data fields: those still need to be captured in application logic.

As indicated above, the producer ID can be used, at least by some brokers, to ensure events from the same producer are delivered in-order.

### Sending and receiving events

There are generally two things you can with a message: you can send one, or you can receive one. Following a single super-schema for all messages certainly makes receiving messages easier: you can validate the received message against the schema and dispatch it to the appropriate handler by parsing only the common fields in the message. Hence, it becomes possible to provide a single, shared implementation for all your micro-services to receive messages from a bus, encapsulating the details of dispatching events to handlers, parsing messages, and even connecting to the bus itself. In this post, I will show this by implementing just such a module in Python, and making it open source to boot.

But receiving messages it not the only thing that is facilitated by a common schema: the semantics of most of the metadata fields are well-defined, so some defauly behaviors can be set up for sending messages as well. Again, with this post, I will provide a Python implementation of such default behavior.

#### Receiving events in Python

Any message received from a bus will eventually need to be *handled* by some event handler that semantically knows what to do with the data. Dispatching events to those handlers is a fairly generic job, so let's look at that first. This is the first place where the super-schema shines: in a few lines of code, it allows you to find the right event handler for the event, and to handle errors gracefully.

In [None]:
from jsonschema import validators
Validator = validators.Draft202012Validator

_super_schema = {
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "type": { "type": "string" },
        "metadata": {
            "type": "object",
            "properties": {
                "cid": { "type": "string", "format": "uuid" },
                "tid": { "type": "string", "format": "uuid" },
                "pid": { "type": "string", "format": "uuid" },
                "user": { "type": "string", "format": "uuid" },
                "token": { "type": "string" }
            },
            "required": [ "cid" ]
        },
        "data": { "type": "object" }
    },
    "required": [ "id", "type", "metadata" ]
}
_super_schema_validator = Validator(_super_schema)

def get_event_dispatcher(err, handlers):
    def dispatch(event):
        is_valid = _super_schema_validator.is_valid(event)
        if not is_valid:
            err({ 'error': 'SchemaMismatchError', 'message': 'Event does not match event schema' })
            return
        base_event_name = event['type'].rsplit(':', 1)[0]
        if event['type'] in handlers:
            handlers[event['type']](err, event)
        elif base_event_name in handlers:
            handlers[base_event_name](err, event)
        elif '__default__' in handlers and handlers['__default__']:
            handlers['__default__'](err, event)
    return dispatch

Note that this also allows you to handle versioning of your schema: if you have two versions of the same event, but with incompatible schema changes between the two, you can append the version number of the schema you're using like this: `MyEvent:1`, `MyEvent:2` and you can choose which one you want to use as the default handler. The `dispatch` function returned by `get_event_dispatcher` will take the best match first, but will fall back on the more generic handler otherwise. Events for which no handlers are found are sent to the default handler if one is provided, or ignored otherwise.

##### An example event handler


![Diagram of Crassula](diagram.png)

Let's look at an example of how this approach might be used: imaging a flower shop's on-line store (the "Crassula" application I wrote about earlier ([here](https://rlc.vlinder.ca/blog/2023/05/13/dfmea), and [here](https://applied-paranoia.com/2023/05/13/should-you-encrypt-everything.html)). Let's set the context a bit: the overall application looks like the image above. The micro-service we will be discussing here is the *order* service in that figure, which receives a "PurchaseOrderRequest" event. We will gloss over quite a few details here, but suffice it for now that a PurchaseOrderRequest obviously requires data, and therefore a data schema. The schema in question is listed below.

```json
{
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "requestDate": { "type": "string", "format": "date" },
        "items": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": { "type": "string" },
                    "code": { "type": "string" },
                    "qty": { "type": "integer", "minimum": 0 },
                    "price": { "type": "number", "minimum": 0 },
                    "discount": { "type": "number", "minimum": 0, "maximum": 100 }
                }
            }
        }
    },
    "required": [ "requestDate", "items" ]
}
```

As described in [a previous post](https://rlc.vlinder.ca/blog/2023/05/13/dfmea), when a purchase order request is received, the order is entered into the database, some business rules may be applied to determine whether it is automatically approved and invoiced later, or whether payment needs to be processed first (this would typically depend on the user's profile), inventory may need to be reserved, etc. For now, we'll just concern ourselves within dispatching and handling the message, which means the first thing we do is define the PO schema in code, set up a schema validator, and set up the minimal bit of boilerplate an event handler needs.

In [None]:
from jsonschema import validators
Validator = validators.Draft202012Validator

_purchase_order_request_schema = {
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "requestDate": { "type": "string", "format": "date" },
        "items": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": { "type": "string" },
                    "code": { "type": "string" },
                    "qty": { "type": "integer", "minimum": 0 },
                    "price": { "type": "number", "minimum": 0 },
                    "discount": { "type": "number", "minimum": 0, "maximum": 100 }
                }
            }
        }
    },
    "required": [ "requestDate", "items" ]
}
purchase_order_request_schema_validator = Validator(_purchase_order_request_schema)

def on_purchase_order_requested_event(err, event):
    if (not 'uid' in event['metadata'] or not event['metadata']['uid']) and (not 'token' in event['metadata'] or not event['metadata']['token']) :
        err({ 'error': 'MissingUserError', 'message': 'Event does have uid (required for this event)' })
        return
    # Check for authorization here:
    #   If the token is there, authenticate it and check whether the claims allow for this purchase order to be processed. The claims should at least contain the uid.
    #     If they don't business rules for PO authorization may require us to set it aside for now.
    #   If the token is not there, the PO is not pre-authorized. Again, business rules for PO authorization may require us to set it aside for now.
    # If we have a token but it's not authentic, this is where we bail out.
    # If we have an authentic token, or a valid uid, but no authorization, we will carry on but the business logic below may set the PO aside for human intervention.
    if not 'data' in event or not event['data']:
        err({ 'error': 'MissingDataError', 'message': 'Event does have data' })
        return
    is_valid = purchase_order_request_schema_validator.is_valid(event['data'])
    if not is_valid:
        err({ 'error': 'SchemaMismatchError', 'message': 'Event data does not match event schema' })
        return
    # Do something useful with the event. What that is depends on business logic, and on the outcome of authorization logic above.

Note that, while the boilerplate in this case is boilerplate in that it is the same pattern for all event handlers, it is not as straight-forward as for the dispatcher to parameterize this code and make it generic so for now, I won't bother. The comments in the code describe some of what needs to be done.

##### Dispatching, and error handling

To get to the event handler, the event has to be dispatched. Let's walk through a few examples of what that might look like.

To know how to dispatch an event, we need a `dispatch` function. That function will need to know two things:

1. how to handle errors, which we will do by calling a call-back function whenever one occurs, and
2. where to dispatch the events to: which function to call for a specific type of events.

The snippet of code below calls the module's `get_event_dispatcher` function, which returns the dispatch function, and gives it both of those pieces of information. Whenever the dispatch function is called with an event, it will do two things: it will validate the event against the super-schema and, if valid, it will call the appropriate event handler function. If something goes wrong, the error handler is called. The error handler is also passed to the event handler as its first argument.

Of course, you already knew all of this because you read the code above. The code below just puts all of that into action:

In [None]:
import logging

dispatch = get_event_dispatcher(
    err=lambda e : logging.error(f"Error {e['error']}: {e['message']}"),
    handlers={ 'PurchaseOrderRequested': on_purchase_order_requested_event }
    )

Let's give this a try: events received that don't have all the information needed are reported as errors and otherwise ignored.

In [None]:
from uuid import uuid4 as uuid

# this one will fail
dispatch({ 'id': str(uuid()), 'type': 'PurchaseOrderRequested', 'metadata': { 'cid': str(uuid()) } })

# this one will fail later
dispatch({ 'id': str(uuid()), 'type': 'PurchaseOrderRequested', 'metadata': { 'cid': str(uuid()), 'uid': str(uuid()) } })

An event that has all the necessary data reaches the event handler's business logic:

In [None]:
dispatch({ 'id': str(uuid()), 'type': 'PurchaseOrderRequested', 'metadata': { 'cid': str(uuid()), 'uid': str(uuid()) } , 'data': {
    'requestDate': '2024-05-29',
    'items': [
        { 'name': 'Rose bouquet with 12 red roses', 'code': 'ROSE12BQ1', 'qty': 1, 'price': 132.99, 'discount': 0 }
    ]
}})

This means we can create a client using any popular event bus and just call `dispatch` with the received events!

We'll show how to do this using different bus implementations shortly. Let's first look at what sending events would look like.

#### Sending events in Python

Ideally, all we ever want to do to send a message is

```py
send(event_type, event_data)
```

but to get there, a few things need to happen. Namely, we need to at least identify the required parameters for the message, and we need to provide enough information to put it on the bus to allow the consumers to consume it in some meaningful way. So, the information we need is:

- the event's type
- the CID (if this is part of a correlated set of events)
- additional meta-data as needed (PID, TID, UID, token)
- the event's payload, if any.

If we were to write a generic `send` function, it would look something like this:


In [None]:
def send_event(event_type, event_data=None, cid=None, tid=None, pid=None, uid=None, token=None):
    # do something meaningful here
    pass

pThe producer ID won't change from one call to another, so a slightly better version might look like this:

In [None]:
def get_send_event_function(pid=None):
    def send_event(event_type, event_data=None, cid=None, tid=None, uid=None, token=None):
        # do something meaningful here
        pass

This doesn't tell us how to format the event type and data into actual events we can put on a bus, so let's write a function to do that:

In [None]:
def _get_format_event_function(data_preprocessors=None):
    if not data_preprocessors:
        data_preprocessors = { '__default__': lambda a : a }
    def format_event(type, cid=None, id=None, pid=None, tid=None, uid=None, token=None, data=None):
        if data:
            if (not type in data_preprocessors or not data_preprocessors[type]) and (not '__default__' in data_preprocessors or not data_preprocessors['__default__']):
                raise Exception(f'No data formatter for type {type}, and data was provided')
            elif not type in data_preprocessors or not data_preprocessors[type]:
                formatted_data = data_preprocessors['__default__'](data)
            else:
                formatted_data = data_preprocessors[type](data)
        else:
            formatted_data = None
        event_id = id if id else str(uuid())
        metadata = {}
        metadata['cid'] = cid if cid else event_id
        if uid:
            metadata['uid'] = uid
        if token:
            metadata['token'] = token
        if pid:
            metadata['pid'] = pid
        metadata['tid'] = tid if tid else event_id
        
        formatted_event = {
            'id': event_id,
            'type': type,
            'metadata': metadata
        }
        if formatted_data:
            formatted_event['data'] = formatted_data
        
        return formatted_event
    return format_event

You might wonder why we have a data pre-processor here. Sometimes, data isn't just presented as a `dict` we can serialize into JSON, so it may need some pre-processing. Examples of this are any time the data is encapsulated into a `class` of some sort.

Take our purchase order for example: it could be encapsulated in a class:

In [None]:
from datetime import datetime

class PurchaseOrder:
    def __init__(self):
        self._id = str(uuid())
        self._request_date = datetime.today().strftime('%Y-%m-%d')
        self._items = []

    def add_item(self, code, qty, name=None, price=None, discount=None):
        item = { 'code': code, 'qty': qty }
        if name:
            item['name'] = name
        if price:
            item['price'] = price
        if discount:
            item['discount'] = discount
        self._items.append(item)

    def serialize(self):
        serialized = {
            'id': self._id,
            'requestDate': self._request_date,
            'items': self._items
        }
        return serialized

in which case our data preprocessor could be:


In [None]:
data_preprocessors={
    'PurchaseRequested': lambda a : a.serialize()
}

This brings our generic `get_send_event_function` to:

In [None]:
def get_send_event_function(data_preprocessors=None, pid=None):
    format_event = _get_format_event_function(data_preprocessors)
    if not pid:
        pid = str(uuid())
    def send_event(event_type, event_data=None, cid=None, tid=None, uid=None, token=None):
        formatted_event = format_event(type=event_type, cid=cid, pid=pid, tid=tid, uid=uid, token=token, data=event_data)
        # send the formatted_event
    return send_event

This function still doesn't actually send anything. To have it send something to a bus (any bus), we need to give it a way to actually send.

In [None]:
def get_send_event_function(send, data_preprocessors=None, pid=None):
    if not data_preprocessors:
        data_preprocessors = { '__default__': lambda a : a }
    format_event = _get_format_event_function(data_preprocessors)
    if not pid:
        pid = str(uuid())
    def send_event(event_type, event_data=None, cid=None, uid=None, token=None):
        formatted_event = format_event(type=event_type, cid=cid, pid=pid, tid=tid, uid=uid, token=token, data=event_data)
        send(formatted_event)
    return send_event


Before we go much further, though, we need to decide how to organize messages on the bus. In this example, we will use topics as the name suggests: they group what the messages are *about*. That means that the purchase order requests and the approved purchase orders go on the same topic (`purchase-orders`), but are consumed by different micro-services. Each micro-service may in turn have more than one instance running in parallel. We'll take the "order" micro-service as an example, using Kafka.

```py
consumer = KafkaConsumer('purchase-orders', group_id='services-order')
handlers = {'PurchaseRequested': on_purchase_order_event}
dispatch = get_event_dispatcher(lambda err : logging.error(f'Error {err["error"]}: {err["message"]}', exc_info=err), handlers)
for event in consumer:
    try:
        event_dict = json.loads(str(event.value, encoding='utf-8'))
        dispatch(event_dict)
    except:
        pass
```


### Sending on Kafka

Kafka organizes the bus into "topics", but does not route between topics and queues by itself (topics are essentially queues). However, you can use an ETL to do that, essentially having it function as a data pump.

To send anything to a kafka bus, you need a `KafkaProducer`. 

Let's say we want to order two magnolias and five red roses:

```python
import json
from kafka import KafkaProducer
from uuid import uuid4 as uuid

def get_send_to_kafka_function(producer, topic):
    def send_to_kafka(event):
        producer.send(topic, json.dumps(event).encode())
    return send_to_kafka

producer = KafkaProducer()

send_event = get_send_event_function(get_send_to_kafka_function(producer, 'purchase-orders'), data_preprocessors={
    'PurchaseRequested': lambda a : a.serialize()
})

po = PurchaseOrder()
po.add_item('MAGNOLIA', 2)
po.add_item('RED_ROSE', 5)

send_event('PurchaseRequested', po, uid=str(uuid()))

producer.flush()
```