In [None]:
!python3.9 -m pip install kafka-python

In [None]:
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime

In [None]:
KAFKA_HOST = #URL
LOW_TEMPERATURE_THRESHOLD = 15.0
HIGH_TEMPERTURE_THRESHOLD = 25.0
ALARM_TIME_DIFFERENCE = 3600 # 1h = 60 min = 3600 sec

In [None]:
alert_producer = KafkaProducer(
    bootstrap_servers=[KAFKA_HOST], 
    value_serializer=lambda x: json.dumps(x).encode('utf-8'))

temperature_consumer = KafkaConsumer(
    bootstrap_servers=[KAFKA_HOST], 
    value_deserializer=lambda x: json.loads(x.decode('utf-8')))
temperature_consumer.subscribe(['temperature'])

In [None]:
last_alarm_timestamp = -1
last_seen_offset = -1

def create_temperature_alarm_event(temperature_value, alarm_type):
    return {
        'temperature': temperature_value,
        'unit': 'Celsius',
        'alarm_type': alarm_type
    }

def create_temperature_too_low_alarm_event(temperature_value):
    return create_temperature_alarm_event(temperature_value, 'TEMPERATURE_TOO_LOW')


def create_temperature_too_high_alarm_event(temperature_value):
    return create_temperature_alarm_event(temperature_value, 'TEMPERATURE_TOO_HIGH')

def evaluate_event(temperature_event, current_timestamp):
    global last_alarm_timestamp, last_seen_offset
    
    temperature = temperature_event.value['value']
    
    if current_timestamp < last_alarm_timestamp + ALARM_TIME_DIFFERENCE:
        return False, None
    
    if temperature_event.offset <= last_seen_offset:
        return False, None
    
    if temperature < LOW_TEMPERATURE_THRESHOLD:
        return True, create_temperature_too_low_alarm_event(temperature)
    
    if temperature > HIGH_TEMPERATURE_THRESHOLD:
        return True, create_temperature_too_high_alarm_event(temperature)
    
    return False, None

def handle_event(temperature_event):
    global last_alarm_timestamp, last_seen_offset, alert_producer
    
    timestamp = datetime.utcnow().timestamp()
    
    alarm, alarm_event = evaluate_event(temperature_event, timestamp)
    
    last_seen_offset = temperature_event.offset
    
    if alarm:
        last_alarm_timestamp = timestamp
        print('Alert', alarm_event)
        alert_producer.send('alert', alarm_event)

In [None]:
for temperature_event in temperature_consumer:
    print(temperature_event)
    handle_event(temperature_event)