In [1]:
!pip install mdml_client



In [2]:
import time
import random
import mdml_client as mdml

## MDML Producing Data

When a client produces data with the MDML, the data is streamed to an underlying Kafka topic.  

In [3]:
example_data = {
    'time': time.time(), 
    'int1': 3,
    'int2': 4,
    'int3': 5
}
schema = mdml.create_schema(example_data, "Example schema", "schema for example notebook")
producer = mdml.kafka_mdml_producer(
    topic = "mdml-example-dict",
    schema = schema,
    kafka_host = '100.26.16.4',
    schema_host = '100.26.16.4'
)
producer.produce(example_data)
producer.flush()

## MDML Consuming Data

When consuming data from the MDML platform, data are read from Kafka topics. A consumer's `.consume()` method returns a generator which yields a result for every data message produced on the corresponding topic. The group parameter is used by the Kafka to coordinate groups of consumers such that each message streamed to a topic is only received by the consumer group once. 

In [4]:
consumer = mdml.kafka_mdml_consumer(
    topics = ["mdml-example-dict"],
    group = "abc", # create a unique group id here
    kafka_host = '100.26.16.4',
    schema_host = '100.26.16.4'
)
for msg in consumer.consume():
    print(msg)

Consumer loop will exit after 300.0 seconds without receiving a message or with Ctrl+C
{'topic': 'mdml-example-dict', 'value': {'time': 1645128376.6569948, 'int1': 3, 'int2': 4, 'int3': 5, 'mdml_time': 1645128376.6648843}}


## Streaming Files via MDML

The MDML takes two approaches to streaming large files. One is by chunking and the other we call "coat-checking". In chunking, a large file is broken up into smaller chunks that are sent directly to the MDML. We will only demonstrate the chunking method here. The second method of "coat-checking" uses an S3 bucket to upload files. At the same time, a message describing the location and some metadata about the file is sent to the MDML. A consumer could then download the file from the specified S3 bucket location in the message.


### Chunking

In [None]:
large_file = "large_file.txt" # ~20MB
producer = mdml.kafka_mdml_producer(
    topic = "mdml-example-file",
    schema = mdml.multipart_schema, # using MDML's pre-defined schema for chunking
    kafka_host = '100.26.16.4',
    schema_host = '100.26.16.4'
)
i=0
for chunk in mdml.chunk_file(large_file, 750000): # chunk size of 500,000 Bytes
    producer.produce(chunk)
    i += 1
    if i % 10 == 0:
        print("flush")
        producer.flush() # flush every 50 chunks
print("final flush")
producer.flush()

In [None]:
consumer = mdml.kafka_mdml_consumer(
    topics = ["mdml-example-file"],
    group = "abc", # create a unique group id here
    kafka_host = '100.26.16.4',
    schema_host = '100.26.16.4'
)
for msg in consumer.consume_chunks(): # the message returned is the filepath that the chunked file was written to
    print(msg)

## MDML Experiments

The MDML service implements functionality to create user-defined experiments. In short, an experiment aggregates data from multiple topics to capture any produced messages on the given topics. All messages produced between the start and stop of the experiment will be recorded in a separate experiment topic as well as a JSON file for upload to the Argonne Data Cloud.     

In [6]:
# Define experiment topics
experiment_topics = [
    "mdml-test-experiment-sensor1",
    "mdml-test-experiment-sensor2",
    "mdml-test-experiment-sensor3",
]
# MDML connection configuration
producer_config = {
    "kafka_host": "100.26.16.4",
    "schema_host": "100.26.16.4"
}
# Start experiment
experiment_id = "replay_tutorial" 
exp = mdml.start_experiment(
    id = experiment_id, 
    topics = experiment_topics,
    producer_kwargs = producer_config
)

Experiment started


In [7]:
# First, create a function to produce random data
def random_data(i):
    dat = {
        "time": time.time(),
        "data": random.randrange(0,100),
        "msg_id": i
    }
    return dat

In [8]:
# Generate data schema
data_schema = mdml.create_schema(random_data(1), title="example schema", descr="schema for the example notebook")
# Create data producers
producer1 = mdml.kafka_mdml_producer("mdml-test-experiment-sensor1", schema=data_schema, **producer_config)
producer2 = mdml.kafka_mdml_producer("mdml-test-experiment-sensor2", schema=data_schema, **producer_config)
producer3 = mdml.kafka_mdml_producer("mdml-test-experiment-sensor3", schema=data_schema, **producer_config)
# Perform the experiment
for i in range(5):    
    producer1.produce(random_data(i))
    producer2.produce(random_data(i))
    producer3.produce(random_data(i))
    time.sleep(3)
# Flush producers
producer1.flush()
producer2.flush()
producer3.flush()

%4|1645206590.345|MAXPOLL|rdkafka#consumer-6| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 410ms (adjust max.poll.interval.ms for long-running message processing): leaving group


In [9]:
# Stop the experiment
mdml.stop_experiment(
    id = experiment_id,
    producer_kwargs = producer_config
)

Experiment stopped


## Replaying an experiment

After an experiment has been started and stop, it can be replayed. During a replay, all messages streamed to the experiment's configured topics will be re-streamed down those topics. This is useful for debugging ML code or for training and testing new models on a digital twin. One important note on the consumer for this example: if the consumer group that is specified has not already consumed the original experiment messages, they will be printed __in addition__ to the new replay messages. 

In [13]:
# First we will make a list of the topics used in the experiment
topics = [
    "mdml-test-experiment-sensor1",
    "mdml-test-experiment-sensor2",
    "mdml-test-experiment-sensor3"
]
# Next create a consumer to listen for the replayed messages
replay_consumer = mdml.kafka_mdml_consumer(
    topics = topics,
    group = "abc",
    kafka_host = "100.26.16.4",
    schema_host = "100.26.16.4"
)
# Start the replay with the MDML's Replay service
mdml.replay_experiment(experiment_id, producer_kwargs=producer_config)
# Starting the consumer
for msg in replay_consumer.consume(overall_timeout=-1):
    print(msg)

Consumer loop will run indefinitely until a Ctrl+C
{'topic': 'mdml-test-experiment-sensor2', 'value': {'time': 1645206569.237449, 'data': 49, 'msg_id': 0, 'mdml_time': 1645207561.9636014}}
{'topic': 'mdml-test-experiment-sensor1', 'value': {'time': 1645206569.1522157, 'data': 89, 'msg_id': 0, 'mdml_time': 1645207560.972872}}
{'topic': 'mdml-test-experiment-sensor3', 'value': {'time': 1645206569.328619, 'data': 52, 'msg_id': 0, 'mdml_time': 1645207562.9689975}}
{'topic': 'mdml-test-experiment-sensor1', 'value': {'time': 1645206572.4201186, 'data': 61, 'msg_id': 1, 'mdml_time': 1645207564.2407699}}
{'topic': 'mdml-test-experiment-sensor3', 'value': {'time': 1645206572.4241636, 'data': 56, 'msg_id': 1, 'mdml_time': 1645207564.2486012}}
{'topic': 'mdml-test-experiment-sensor2', 'value': {'time': 1645206572.4219654, 'data': 1, 'msg_id': 1, 'mdml_time': 1645207564.2450778}}
{'topic': 'mdml-test-experiment-sensor1', 'value': {'time': 1645206575.4267747, 'data': 91, 'msg_id': 2, 'mdml_time': 1

## FuncX and MDML

The MDML pairs nicely with FuncX, a function as a service (FaaS) platform, to enable on-demand analyses. A FuncX endpoint has been created (endpoint ID below) that can be used to run any arbitrary Python code. Using the MDML client, you can create deployable functions to act on streaming data. These functions can then produce analysis results or even new operating conditions for an experiment back to the MDML.    

### Registering a function
First, we need to create a simple function that sums up integers and produces a new message with the result. This function is then registered with the FuncX service to receive a function ID that will be used to deploy the functions. Since this function has already been registered, the following cell cannot be run but shows the function's code for a better understanding.

```
from funcx.sdk.client import FuncXClient
fxc = FuncXClient()

def addition_func(params):
    # FuncX functions require modules be imported (and installed on the endpoint)
    import time
    import mdml_client as mdml
    values = params['addition_values']
    # Create a consumer to listen for messages on a data topic and a control topic
    consumer = mdml.kafka_mdml_consumer(
        topics = [params['data_topic'], params['control_topic']],
        group = "mdml-testing-funcx-tutorial")
    # Create a schema for the returned results with an example dictionary 
    example_result = {
        'time': time.time(),
        'int1': 1,
        'int2': 2,
        'int3': 3,
        'sum': 6,
        'worker_id': 0
    }
    schema = mdml.create_schema(example_result, 
                                title='mdml-testing-funcx-tutorial-sum', 
                                descr='Tutorial for deploying FuncX function with MDML')
    # Create a producer to stream the results with the schema
    result_producer = mdml.kafka_mdml_producer(
        topic = "mdml-testing-funcx-tutorial-sum",
        schema = schema
    )
    # Start the consumer loop
    for msg in consumer.consume(overall_timeout=600):
        # If the message is on the data topic, sum values and produce a result
        if msg['topic'] == params['data_topic']:
            result = msg['value']
            result['worker_id'] = params['worker_id']
            sum = 0
            for val in values:
                sum += msg['value'][val]
            result['sum'] = sum
            result_producer.produce(result)
            result_producer.flush()
        # Else the message is from the control topic, stop the consumer loop and exit
        else:
            break
    consumer.close()

# Function parameters
params = {
    'data_topic': 'mdml-testing-funcx-tutorial-data',
    'control_topic': 'mdml-testing-funcx-tutorial-stop',
    'addition_values': ['int1','int2','int3'],
    'worker_id': 1
}

func_id = fxc.register_function(addition_func, description="Tutorial function for FuncX and MDML")
print(func_id)
```

In [32]:
# The cell is used to store the most current version of the FuncX function ID

# Most recent UUID - Feb 1st, 2022 9:22AM
func_id = '5f4461f7-a4e8-4c4d-addc-f20cf447b409'

## Starting a FuncX function

All that is left to do now is to start the function and produce data. Notice that there is a parameter for `worker_id`. It is possible to start any number of the same FuncX function to run concurrently. Combining this with Kafka's ability to group consumers via their ID, we can spin up multiple instances of an analysis task to share the data messages. The `worker_id` parameter is used so that we can see which instance produced the results message. The way in which messaged are shared between consumers is an artifact of Kafka's partitioning of topics but is beyond the scope of this tutorial.      

In [34]:
num_workers = 3
tasks = []

for i in range(num_workers):
    params = {
        'data_topic': 'mdml-testing-funcx-tutorial-data',
        'control_topic': 'mdml-testing-funcx-tutorial-stop',
        'addition_values': ['int1','int2','int3'],
        'worker_id': i
    }
    endp_id = 'fa1a5d62-86f1-4761-87d5-0a2976a3e1c5' # public mdml endpoint on GPU server
    tasks.append(fxc.run(params, function_id=func_id, endpoint_id=endp_id))

while True:
    for task in tasks:
        try:
            result = fxc.get_result(task)
            print(result)
        except Exception as e:
            print(e)
    input(f"press enter to check status again")

Task is pending due to waiting-for-ep
Task is pending due to waiting-for-ep
Task is pending due to waiting-for-ep
press enter to check status again
Task is pending due to waiting-for-ep
Task is pending due to waiting-for-ep
Task is pending due to waiting-for-ep
press enter to check status again
Task is pending due to waiting-for-launch
Task is pending due to waiting-for-launch
Task is pending due to waiting-for-launch
press enter to check status again
Task is pending due to waiting-for-launch
Task is pending due to waiting-for-launch
Task is pending due to waiting-for-launch
press enter to check status again
Task is pending due to running
Task is pending due to running
Task is pending due to running
press enter to check status again
None
None
None


KeyboardInterrupt: Interrupted by user

In [19]:
# Consume function results
consumer = mdml.kafka_mdml_consumer(
    topics = ['mdml-testing-funcx-tutorial-sum'],
    group = "mdml-testing-funcx-tutorial")
for msg in consumer.consume(overall_timeout=-1):
    print(msg)

Consumer loop will run indefinitely until a Ctrl+C
{'topic': 'mdml-testing-funcx-tutorial-sum', 'value': {'time': 1643651363.290894, 'int1': 26, 'int2': 8, 'int3': 61, 'mdml_time': 1643651364.3562765, 'sum': 95}}
{'topic': 'mdml-testing-funcx-tutorial-sum', 'value': {'time': 1643651369.3361378, 'int1': 97, 'int2': 23, 'int3': 71, 'mdml_time': 1643651369.4128666, 'sum': 191}}
{'topic': 'mdml-testing-funcx-tutorial-sum', 'value': {'time': 1643651394.1517346, 'int1': 8, 'int2': 2, 'int3': 21, 'mdml_time': 1643651395.2494736, 'sum': 31}}


In [24]:
# Stop FuncX worker
producer = mdml.kafka_mdml_producer(topic="mdml-testing-funcx-tutorial-stop",
             schema=mdml.stop_funcx_schema)
producer.produce({'stop':True})
producer.flush()