![NVIDIA Logo](images/nvidia.png)

# Control Messages

In this notebook you'll learn about the Morpheus `ControlMessage`, their consituent parts, and how to interact with them.

---

## Objectives

By the time you complete this notebook you will be able to:

- Construct `ControlMessages`.
- Add and interact with a `ControlMessage`'s payload.
- Add, edit, and utilize a `ControlMessage`'s metadata.
- Add, edit, and utilize a `ControlMessage`'s tasks dictionary.
- Understand how `ControlMessages` facilitate SIMD processing in the context of a Morpheus pipeline.

---

## Imports

In [1]:
import time

import cudf

from morpheus.messages import ControlMessage, MessageMeta

## Read Data

In this notebook we will return to working with our simple user authentication data, which we load now into a cuDF dataframe.

In [2]:
df = cudf.read_json('data/simple_user_log.jsonlines', lines=True)

In [3]:
df

Unnamed: 0,timestamp,user,ip_address,request_time,status,error_message
0,2025-02-01T10:15:30Z,user123,192.168.1.10,200.45,success,
1,2025-02-01T10:17:00Z,user123,192.168.1.20,150.55,failure,Invalid credentials
2,2025-02-01T10:18:10Z,user456,10.0.0.5,180.6,success,
3,2025-02-01T10:19:25Z,user789,192.168.1.30,215.25,failure,Timeout
4,2025-02-01T10:20:00Z,user456,10.0.0.6,120.1,success,
5,2025-02-01T10:22:30Z,user123,192.168.1.40,175.35,failure,Access denied
6,2025-02-01T10:23:45Z,user321,192.168.1.50,205.5,success,
7,2025-02-01T10:25:05Z,user864,192.168.1.60,190.15,failure,Invalid session
8,2025-02-01T10:26:20Z,user123,192.168.1.70,210.8,success,
9,2025-02-01T10:27:40Z,user456,10.0.0.7,160.95,failure,Account locked


---

## Create a MessageMeta

As you recall, we can create a Morpheus `MessageMeta` out of a dataframe. Here we create a `MessageMeta` instance using our authentication log dataframe.

In [4]:
mm = MessageMeta(df)

---

## Control Messages

![control-messages](images/control-messages.png)

In Morpheus, the `ControlMessage` is a more robust and flexible alternative to `MessageMeta` allowing us to include together with a `MessageMeta` payload additional metadata, task definitions, and multidimensional tensors.

In this notebook we'll look at `ControlMessage` payloads, metadata and tasks. We'll refrain presently from discussing tensors, which we'll look at later in the workshop when we discuss inference in Morpheus pipelines.

---

## Control Message Payloads

In its most simple form, a `ControlMessage` instance can serve as a simple wrapper around a `MultiMessage` payload.

Here we instantiate a `ControlMessage` and set as its payload the `MessageMeta` we created above by way of its `payload` method.

In [5]:
cm = ControlMessage()

In [6]:
cm.payload(mm)

We can access a `ControlMessage`'s `MetaMessage` payload through its `payload()` method, invoked without any arguments.

In [7]:
payload = cm.payload()

In [8]:
type(payload)

morpheus._lib.messages.MessageMeta

As we've seen earlier in the workshop, the payload's underlying dataframe can be accessed through its `get_data()` method.

In [9]:
df = payload.get_data()

In [10]:
type(df)

cudf.core.dataframe.DataFrame

In [11]:
df

Unnamed: 0,timestamp,user,ip_address,request_time,status,error_message
0,2025-02-01T10:15:30Z,user123,192.168.1.10,200.45,success,
1,2025-02-01T10:17:00Z,user123,192.168.1.20,150.55,failure,Invalid credentials
2,2025-02-01T10:18:10Z,user456,10.0.0.5,180.6,success,
3,2025-02-01T10:19:25Z,user789,192.168.1.30,215.25,failure,Timeout
4,2025-02-01T10:20:00Z,user456,10.0.0.6,120.1,success,
5,2025-02-01T10:22:30Z,user123,192.168.1.40,175.35,failure,Access denied
6,2025-02-01T10:23:45Z,user321,192.168.1.50,205.5,success,
7,2025-02-01T10:25:05Z,user864,192.168.1.60,190.15,failure,Invalid session
8,2025-02-01T10:26:20Z,user123,192.168.1.70,210.8,success,
9,2025-02-01T10:27:40Z,user456,10.0.0.7,160.95,failure,Account locked


---

## Control Message Metadata

`ControlMessage` instances have an API for getting and setting metadata. Metadata are key value pairs and can be used in a number of arbitrary and helpful ways. You can think of them as dictionaries that apply to the entire message.

In [12]:
cm.set_metadata("priority", "high")

In [13]:
cm.has_metadata("priority")

True

In [14]:
cm.get_metadata()

{'priority': 'high'}

In [15]:
cm.get_metadata()['priority']

'high'

In [16]:
cm.list_metadata()

['priority']

---

## Control Message Tasks

`ControlMessage` tasks are a dictionary intended to store information about tasks that can or should be applied to the message's payload. `ControlMessage` tasks facilitate Single Instruction Multiple Data (SIMD) programming.

In [17]:
task = {
    "IP": "127.0.0.1",
    "port": "443"
}

In [18]:
cm.add_task("filter", task)

In [19]:
cm.get_tasks()

{'filter': [{'IP': '127.0.0.1', 'port': '443'}]}

In [20]:
cm.has_task("filter")

True

In [21]:
task_details = cm.remove_task("filter")
task_details

{'IP': '127.0.0.1', 'port': '443'}

In [22]:
cm.has_task("filter")

False

---

## Control Message Intuition

The following is a very simple example of how we might utilize a `ControlMessage`'s metadata and tasks to drive SIMD processing of its payload.

`process` is the function to actually perform work on the data. Here for our simple example, it just uppercases a specified column.

In [23]:
def process(df, column, sleep_secs=2):
    time.sleep(sleep_secs)
    df[column] = df[column].str.upper()

Here we instantiate a new `ControlMessage` instance, and set its payload as the `MessageMeta` instance created earlier out of the user authentication logs data.

In [24]:
cm = ControlMessage()

In [25]:
cm.payload(mm)

Here we set some metadata on the control message, in this case specifying that it is in need of performance tracing.

In [26]:
cm.set_metadata("tracing", True)

Here we articulate the details that will be required to run our `process` task, in this super simple case, the name of the column to be processed. We then add the task details to `cm`.

In [27]:
process_task_details = {
    "column": "status"
}

In [28]:
cm.add_task("process", process_task_details)

Let's have a look at our configured `ControlMessage` metadata and task list.

In [29]:
cm.get_metadata()

{'tracing': True}

In [30]:
cm.get_tasks()

{'process': [{'column': 'status'}]}

Now we see how we might put everything together for actual use.

First we check if the message's metadata indicates the need for performance tracing, if so, we tag the processing entry time. Then, we check to see if a processing tasks exists, if so, we pop the task from the `ControlMessage` and perform the necessary processing. When completed, if performance tracing was specified in the metadata, the completion time is tagged. Upon completion, the data has been transformed and we can inspect runtime performance details.

In [31]:
# Tag beginning time of message processing
if cm.get_metadata("tracing"):
    cm.set_metadata("start_time", time.time())

# Use the message meta context manager to prevent race conditions.
with cm.payload().mutable_dataframe() as df:

    # Perform necessary processing
    if cm.has_task("process"):
        process_task = cm.remove_task("process")
        column = process_task["column"]

        process(df, column, sleep_secs=2)   

# Tag end time of message processing
if cm.get_metadata("tracing"):
    cm.set_metadata("end_time", time.time())

# Calculate runtime performance details
if cm.get_metadata("tracing"):
    runtime = cm.get_metadata("end_time") - cm.get_metadata("start_time")
    print(f"Message processed in {runtime:.2f}s")

Message processed in 2.00s


Here we note that the process task is no longer in the task list, meaning the task was removed and the data was processed.

In [32]:
cm.get_tasks()

{'process': []}

Here we note that the metadata has been updated with performance traces, allowing us to inspect runtime performance details on a per message basis.

In [33]:
cm.get_metadata()

{'end_time': 1758603987.9728162,
 'start_time': 1758603985.9678571,
 'tracing': True}

The results of our processing are in the `status` column, were all values are now capitalized.

In [34]:
cm.payload().get_data()

Unnamed: 0,timestamp,user,ip_address,request_time,status,error_message
0,2025-02-01T10:15:30Z,user123,192.168.1.10,200.45,SUCCESS,
1,2025-02-01T10:17:00Z,user123,192.168.1.20,150.55,FAILURE,Invalid credentials
2,2025-02-01T10:18:10Z,user456,10.0.0.5,180.6,SUCCESS,
3,2025-02-01T10:19:25Z,user789,192.168.1.30,215.25,FAILURE,Timeout
4,2025-02-01T10:20:00Z,user456,10.0.0.6,120.1,SUCCESS,
5,2025-02-01T10:22:30Z,user123,192.168.1.40,175.35,FAILURE,Access denied
6,2025-02-01T10:23:45Z,user321,192.168.1.50,205.5,SUCCESS,
7,2025-02-01T10:25:05Z,user864,192.168.1.60,190.15,FAILURE,Invalid session
8,2025-02-01T10:26:20Z,user123,192.168.1.70,210.8,SUCCESS,
9,2025-02-01T10:27:40Z,user456,10.0.0.7,160.95,FAILURE,Account locked


In [None]:
#ControlMessage.Payload = MessageMeta
#ControlMessage = payload + metadata + task + tensor 