In [1]:
!pip install mdml_client





In [3]:
import time
import random
import numpy as np
import mdml_client as mdml

## Example 1 
Simple dictionary with producer and consumer.

In [10]:
# Create a schema that will define our data for the producer
data_schema = {
    "$schema": "http://merf.egs.anl.gov/mdml-example-integer-data-schema#",
    "title": "ExampleInteger",
    "description": "Schema for MDML with Kafka example integer data",
    "type": "object",
    "properties": {
        "id": {
            "description": "ID of the data point",
            "type": "number"
        },
        "time": {
            "description": "Unix time the data point occurred",
            "type": "number"
        },
        "int1": {
            "description": "Random integer 1",
            "type": "number"
        },
        "int2": {
            "description": "Random integer 2",
            "type": "number"
        },
        "int3": {
            "description": "Random integer 3",
            "type": "number"
        }
    },
    "required": [ "id", "time", "int1", "int2", "int3" ]
}

In [11]:
# Create a data producer and produce 5 messages with random integers
producer = mdml.kafka_mdml_producer(
    topic = "mdml-example-integer",
    schema = data_schema)

i = 0
while i < 5:
    dat = {
        'id': i,
        'time': time.time(),
        'int1': random.randint(0,25),
        'int2': random.randint(25,50),
        'int3': random.randint(50,100)
    }
    producer.produce(dat)
    i += 1

In [12]:
# Create a data consumer and consume the 5 messages
consumer = mdml.kafka_mdml_consumer(
    topic = "mdml-example-integer",
    group = "mdml-example-integer")

# consumer.consume() is a generator and will run until this cell is stopped manually
for msg in consumer.consume():
    print(msg)

Ctrl+C to break consumer loop
{'id': 0, 'time': 1620938425.0137014, 'int1': 4, 'int2': 42, 'int3': 87}
{'id': 1, 'time': 1620938427.0567675, 'int1': 24, 'int2': 37, 'int3': 81}
{'id': 2, 'time': 1620938427.0754302, 'int1': 23, 'int2': 44, 'int3': 94}
{'id': 3, 'time': 1620938427.0938582, 'int1': 1, 'int2': 48, 'int3': 63}
{'id': 4, 'time': 1620938427.1134882, 'int1': 5, 'int2': 31, 'int3': 60}


## Example 2
Arrays 

In [13]:
array_schema = {
    "$schema": "http://merf.egs.anl.gov/mdml-example-array-data-schema#",
    "title": "ExampleArray",
    "description": "Schema for MDML with Kafka example array data",
    "type": "object",
    "properties": {
        "id": {
            "description": "ID of the data point",
            "type": "number"
        },
        "time": {
            "description": "Unix time the data point occurred",
            "type": "number"
        },
        "wavelength": {
            "description": "Array of wavelength values",
            "type": "array"
        },
        "intensity": {
            "description": "Array of intensity values",
            "type": "array"
        }
    },
    "required": [ "id", "time", "wavelength", "intensity" ]
}

In [43]:
producer = mdml.kafka_mdml_producer(
    topic = "mdml-example-array",
    schema = array_schema)

i = 0
while i < 5:
    # Create array of five random floats 0.0 to 10000.0
    intensities = [
        round(random.uniform(0.0, 10000.0), 3),
        round(random.uniform(0.0, 10000.0), 3),
        round(random.uniform(0.0, 10000.0), 3),
        round(random.uniform(0.0, 10000.0), 3),
        round(random.uniform(0.0, 10000.0), 3)
    ]
    dat = {
        'id': i,
        'time': time.time(),
        'wavelength': [550, 600, 650, 700, 750],
        'intensity': intensities
    }
    producer.produce(dat)
    i += 1

In [44]:
consumer = mdml.kafka_mdml_consumer(
    topic = "mdml-example-array",
    group = "mdml-example-array")

for msg in consumer.consume():
    print(msg)

Ctrl+C to break consumer loop
{'id': 0, 'time': 1620939302.0333238, 'wavelength': [550, 600, 650, 700, 750], 'intensity': [8885.931, 1357.29, 5757.032, 9028.189, 1130.085]}
{'id': 1, 'time': 1620939303.0683024, 'wavelength': [550, 600, 650, 700, 750], 'intensity': [5508.834, 4779.071, 363.938, 5024.914, 9867.73]}
{'id': 2, 'time': 1620939303.0899205, 'wavelength': [550, 600, 650, 700, 750], 'intensity': [7371.471, 4869.943, 3310.335, 3394.481, 4449.198]}
{'id': 3, 'time': 1620939303.1243489, 'wavelength': [550, 600, 650, 700, 750], 'intensity': [2426.794, 7120.587, 4036.134, 6997.439, 1530.848]}
{'id': 4, 'time': 1620939303.1493125, 'wavelength': [550, 600, 650, 700, 750], 'intensity': [2025.027, 7674.068, 3138.048, 9452.086, 9005.383]}
