In [3]:
# imports :)
import json
import os
from kafka import KafkaConsumer
import sys

In [4]:
# get environment variables (taken directly from kafka tutorial)

# location of the Kafka Bootstrap Server loaded from the environment variable.
# e.g. 'my-kafka-bootstrap.namespace.svc.cluster.local:9092'
KAFKA_BOOTSTRAP_SERVER = 'kafka-cluster-kafka-bootstrap.kafka-anomaly.svc.cluster.local'

# SASL settings.  Defaults to SASL_SSL/PLAIN.
# No auth would be PLAINTEXT/''
# KAFKA_SECURITY_PROTOCOL = os.environ.get('KAFKA_SECURITY_PROTOCOL', 'SASL_SSL')
KAFKA_SASL_MECHANISM = os.environ.get("KAFKA_SASL_MECHANISM", "PLAIN")
KAFKA_SECURITY_PROTOCOL = "PLAINTEXT"

# SASL username or client ID loaded from the environment variable
KAFKA_USERNAME = os.environ.get("KAFKA_USERNAME")

# SASL password or client secret loaded from the environment variable
KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD")

# Name of the topic for the producer to send messages.
# Consumers will listen to this topic for events.
KAFKA_TOPIC = os.environ.get("KAFKA_TOPIC") or "sensor-data"

# Kafka consumer group to which this consumer belongs
KAFKA_CONSUMER_GROUP = "notebook-consumer"

# Uncomment for debug purposes,
# but don't save the output anywhere
# print(f'KAFKA_BOOTSTRAP_SERVER="{KAFKA_BOOTSTRAP_SERVER}"')
# print(f'KAFKA_SECURITY_PROTOCOL="{KAFKA_SECURITY_PROTOCOL}"')
# print(f'KAFKA_SASL_MECHANISM="{KAFKA_SASL_MECHANISM}"')
# print(f'KAFKA_USERNAME="{KAFKA_USERNAME}"')
# print(f'KAFKA_PASSWORD="{KAFKA_PASSWORD}"')
# print(f'KAFKA_TOPIC="{KAFKA_TOPIC}"')

In [11]:
sensor_names_list = ["sensor_" + str(i+1) for i in range(48)]
sensor_names_list

['sensor_1',
 'sensor_2',
 'sensor_3',
 'sensor_4',
 'sensor_5',
 'sensor_6',
 'sensor_7',
 'sensor_8',
 'sensor_9',
 'sensor_10',
 'sensor_11',
 'sensor_12',
 'sensor_13',
 'sensor_14',
 'sensor_15',
 'sensor_16',
 'sensor_17',
 'sensor_18',
 'sensor_19',
 'sensor_20',
 'sensor_21',
 'sensor_22',
 'sensor_23',
 'sensor_24',
 'sensor_25',
 'sensor_26',
 'sensor_27',
 'sensor_28',
 'sensor_29',
 'sensor_30',
 'sensor_31',
 'sensor_32',
 'sensor_33',
 'sensor_34',
 'sensor_35',
 'sensor_36',
 'sensor_37',
 'sensor_38',
 'sensor_39',
 'sensor_40',
 'sensor_41',
 'sensor_42',
 'sensor_43',
 'sensor_44',
 'sensor_45',
 'sensor_46',
 'sensor_47',
 'sensor_48']

In [19]:
def message_to_dict(msg, sensor_names_list):
    msg_dict = {}
    msg_dict["timestamp"] = msg[0]
    sensors = msg[1]
    msg_dict["machine_status"] = msg[2]
    # put sensors in dictionary : ) 
    for i, name in enumerate(sensor_names_list):
        msg_dict[name] = sensors[i]
    return msg_dict

In [20]:
def data_consumption_sensor(sensor_names_list, GROUP_ID):
    """simple Kafka consumer example"""
    """ fully taken from kafka tutorial """
    consumer = KafkaConsumer(
        KAFKA_TOPIC,
        group_id=KAFKA_CONSUMER_GROUP,
        bootstrap_servers=[KAFKA_BOOTSTRAP_SERVER],
        security_protocol=KAFKA_SECURITY_PROTOCOL,
        sasl_mechanism=KAFKA_SASL_MECHANISM,
        sasl_plain_username=KAFKA_USERNAME,
        sasl_plain_password=KAFKA_PASSWORD,
        api_version_auto_timeout_ms=30000,
        request_timeout_ms=450000,
    )
    print(
        f'Subscribed to "{KAFKA_BOOTSTRAP_SERVER}" consuming topic "{KAFKA_TOPIC}"...'
    )
    try:
        for record in consumer:
            msg = record.value.decode("utf-8")
            topic = record.topic
            msg_list = list(json.loads(msg).values())
            id = int(record.key)
            msg_dict = message_to_dict(msg_list, sensor_names_list)
            print(id, msg_dict)
            # do something with message (display in web UI, send to database)
            if id == GROUP_ID:
                yield msg_dict

    finally:
        print("Closing consumer...")
        consumer.close()
    print("Kafka consumer stopped.")

In [None]:
data_consumption_sensor(sensor_names_list)

Subscribed to "kafka-cluster-kafka-bootstrap.kafka-anomaly.svc.cluster.local" consuming topic "sensor-data"...
13 {'timestamp': '2022-09-01T00:03', 'machine_status': 0.0, 'sensor_1': 46.2493782043457, 'sensor_2': 52.28346633911133, 'sensor_3': 43.705318450927734, 'sensor_4': 614.9024658203125, 'sensor_5': 74.51283264160156, 'sensor_6': 13.644190788269043, 'sensor_7': 16.01307487487793, 'sensor_8': 14.97795295715332, 'sensor_9': 14.75201416015625, 'sensor_10': 46.114036560058594, 'sensor_11': 46.224952697753906, 'sensor_12': 35.252716064453125, 'sensor_13': 10.548749923706055, 'sensor_14': 417.3882751464844, 'sensor_15': 460.00830078125, 'sensor_16': 457.0516357421875, 'sensor_17': 2.607666492462158, 'sensor_18': 661.9113159179688, 'sensor_19': 396.96337890625, 'sensor_20': 871.177490234375, 'sensor_21': 516.2406616210938, 'sensor_22': 1043.499755859375, 'sensor_23': 623.904052734375, 'sensor_24': 716.7260131835938, 'sensor_25': 892.2738647460938, 'sensor_26': 505.37567138671875, 'senso