In [1]:
from confluent_kafka import Consumer, KafkaException
import json
import pandas as pd
import time

In [2]:
#connect to Kafka
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'iot_consumer',
    'auto.offset.reset': 'latest'
})

consumer.subscribe(['iot_topic'])
#init dataframe
df = pd.DataFrame(columns=['time', 'sensor_type', 'sensor_name', 'value'])

In [3]:
last_current_time = time.time()

try:
    while True: 
        message = consumer.poll(1.0)
        if message is None:
            continue
        if message.error():
            raise KafkaException(message.error())
        else:
            sensor_data = json.loads(message.value().decode('utf-8'))
            df = pd.concat([df, pd.DataFrame([sensor_data])], ignore_index=True)
            # print(f"Receive data: {sensor_data}")

        current_time = time.time()

        #every 20s, unix timestamp cannot accurately at 20
        if 20 <= current_time - last_current_time < 22:
            mean_sensor_type = df.groupby('sensor_type')['value'].mean().to_dict()
            mean_sensor_name = df.groupby('sensor_name')['value'].mean().to_dict()
            
            print(f"Average of Sensor By Type: {mean_sensor_type}")
            print(f"Average of Sensor By Name: {mean_sensor_name}")

            df = df.iloc[0:0]

            last_current_time = current_time

finally:
    consumer.close()

Average of Sensor By Type: {'light': 0.4476894711238868, 'pressure': 5.821059092057766, 'temperature': 20.712036230025006}
Average of Sensor By Name: {'light_sensor_1': 0.5012686412982569, 'light_sensor_2': 0.428206136515025, 'pressure_sensor_1': 5.341840525741799, 'pressure_sensor_2': 5.4240918466956325, 'pressure_sensor_3': 7.299591534661986, 'temperature_sensor_1': 21.040431925334456, 'temperature_sensor_2': 19.8127643174336, 'temperature_sensor_3': 21.36119355521776, 'temperature_sensor_4': 20.530941124776625}
Average of Sensor By Type: {'light': 0.4585431167584298, 'pressure': 6.559510044437487, 'temperature': 22.2730965882415}
Average of Sensor By Name: {'light_sensor_1': 0.2732118302348394, 'light_sensor_2': 0.5114949129080271, 'pressure_sensor_1': 6.910380127613888, 'pressure_sensor_2': 6.005057428927417, 'pressure_sensor_3': 6.6158050259284185, 'temperature_sensor_1': 24.35192951071455, 'temperature_sensor_2': 25.654448013307395, 'temperature_sensor_3': 18.40564293672228, 'tem