In [None]:
from confluent_kafka import Producer
import json
import random
import numpy as np
import time
import os, binascii
from datetime import datetime

In [None]:
# Queue signal to Kafka

def queue_signals(signal_data):
    p = Producer({'bootstrap.servers': '10.6.0.155:9092'})
    p.produce('electron', key='12345', value=json.dumps(signal_data))
    p.flush(3)

In [None]:
# Create lists for pseudorandom data generation

units = ["ED", "MICU", "SICU"]
beds = {
    "ED":["A1", "A2", "A3"],
    "MICU":["8-170", "8-171", "8-172", "8-173"],
    "SICU":["2-200", "2-201", "2-202"]
}
sources = {
    "ED":"SERVER1",
    "MICU":"SERVER2",
    "SICU":"SERVER3"
}
channels = {
    "1":"HR",
    "2":"SpO2",
    "3":"Resp"
}

means = {
    "1":72,
    "2":98,
    "3":15
}

sds = {
    "1":10,
    "2":2,
    "3":2
}

starting_values = {}

# Create initial list of starting values for each bed/channel combination
for unit in units:
    for bed in beds[unit]:
        for channel in channels:
            starting_values[bed+channel] = np.random.normal(means[channel],
                                                            sds[channel],
                                                            1)[0]    

In [None]:
# Genereate a random string
def generate_id():
    return binascii.b2a_hex(os.urandom(8)).decode('utf-8')

def toiso(ts):
    return datetime.fromtimestamp(ts).isoformat() + 'Z'

# Generate a set of signals for a randomly selected bed
def generate_signals(units, beds, sources, channels, starting_values, sds):
    # Select a random unit and bed from that unit
    unit = random.choice(units)
    bed = random.choice(beds[unit])
    
    # Get current timestamp
    ts = int(time.time())
    
    # Create JSON message with 1 and 2 ms offsets for msh_ts and rcv_ts
    data = {
        "msh_ts":toiso(ts+1),
        "start_ts":toiso(ts),
        "unit":unit,
        "bed":bed,
        "src":sources[unit],
        "msg_id":generate_id(),
        "helix_rcv_ts":toiso(ts+2),
        "tz_offset":0,
        "obx_objects":[],
        "hl7":"HL7MESSAGE..."
    }
    
    # Create set of OBX messages, 1 for each channel
    z = 0
    for channel in channels:
        z += 1
        
        value = np.random.normal(starting_values[bed+channel], sds[channel], 1)[0]
        
        cur_signal = {
            "line": z,
            "channel": channel,
            "text": int(value)
        }
        data["obx_objects"].append(cur_signal)
    
    return data

In [None]:
while True:
    for i in range(0,5):
        cur_signal = generate_signals(units, beds, sources, channels, starting_values, sds)
        queue_signals(cur_signal)
    time.sleep(1)