# Step 1: Imports, Configuration and Setup

In [1]:
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer, KafkaConsumer
import json
import time
import random
from datetime import datetime, timezone
import threading
import logging
import colorlog

In [2]:
# Configure color logging
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
    "%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    datefmt='%Y-%m-%d %H:%M:%S',
    log_colors={
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
))

logger = logging.getLogger('KafkaLogger')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

# Step 2: Kafka Configuration

In [3]:
# Kafka configuration
kafka_config = {
    "bootstrap_servers": ['77.81.230.104:9092'],
    "username": 'admin',
    "password": 'VawEzo1ikLtrA8Ug8THa',
    "security_protocol": 'SASL_PLAINTEXT',
    "sasl_mechanism": 'PLAIN'
}

# Producer and consumer common configurations
common_consumer_config = {
    'bootstrap_servers': kafka_config['bootstrap_servers'],
    'security_protocol': kafka_config['security_protocol'],
    'sasl_mechanism': kafka_config['sasl_mechanism'],
    'sasl_plain_username': kafka_config['username'],
    'sasl_plain_password': kafka_config['password'],
    'value_deserializer': lambda m: json.loads(m.decode('utf-8')),
    'auto_offset_reset': 'earliest',
    'enable_auto_commit': True,
}

common_producer_config = {
    'bootstrap_servers': kafka_config['bootstrap_servers'],
    'security_protocol': kafka_config['security_protocol'],
    'sasl_mechanism': kafka_config['sasl_mechanism'],
    'sasl_plain_username': kafka_config['username'],
    'sasl_plain_password': kafka_config['password'],
    'value_serializer': lambda v: json.dumps(v).encode('utf-8'),
}

In [4]:
unique_id = 'goit_goit_de_hw_05'
topic_building_sensors = f'building_sensors_{unique_id}'
topic_temperature_alerts = f'temperature_alerts_{unique_id}'
topic_humidity_alerts = f'humidity_alerts_{unique_id}'

# Step 3: Create Topics in Kafka

In [5]:
# Define topic names with a unique identifier
topics = [topic_building_sensors, topic_temperature_alerts, topic_humidity_alerts]

# Create KafkaAdminClient
try:
    logger.info("Creating KafkaAdminClient...")
    admin_client = KafkaAdminClient(
        bootstrap_servers=kafka_config['bootstrap_servers'],
        security_protocol=kafka_config['security_protocol'],
        sasl_mechanism=kafka_config['sasl_mechanism'],
        sasl_plain_username=kafka_config['username'],
        sasl_plain_password=kafka_config['password'],
        client_id='admin_client'
    )
    logger.info("KafkaAdminClient successfully created.")
except Exception as e:
    logger.critical(f"Error creating KafkaAdminClient: {e}")
    raise

# Get a list of existing topics
try:
    existing_topics = admin_client.list_topics()
    logger.info("Successfully fetched the list of existing topics.")
except Exception as e:
    logger.error(f"Error fetching the list of topics: {e}")
    raise

# Create only those topics that do not already exist
topic_list = []
for topic_name in topics:
    if topic_name not in existing_topics:
        topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
        topic_list.append(topic)
    else:
        logger.info(f"Topic '{topic_name}' already exists, skipping creation.")

if topic_list:
    try:
        logger.info("Attempting to create new topics...")
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
        logger.info("Topics successfully created:")
        for topic in topic_list:
            logger.info(f"- {topic.name}")
    except Exception as e:
        logger.error(f"Error creating topics: {e}")
else:
    logger.info("All topics already exist, no new topics need to be created.")

[32m2024-11-21 18:46:41 - KafkaLogger - INFO - Creating KafkaAdminClient...[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - KafkaAdminClient successfully created.[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Successfully fetched the list of existing topics.[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Topic 'building_sensors_goit_goit_de_hw_05' already exists, skipping creation.[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Topic 'temperature_alerts_goit_goit_de_hw_05' already exists, skipping creation.[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Topic 'humidity_alerts_goit_goit_de_hw_05' already exists, skipping creation.[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - All topics already exist, no new topics need to be created.[0m


# Step 4: Sensor Simulation

In [6]:
# Sensor simulation
def sensor_simulation(sensor_id, topic_name):
    producer = KafkaProducer(**common_producer_config)

    try:
        while True:
            # Generate random temperature and humidity values
            temperature = random.uniform(25, 45)
            humidity = random.uniform(15, 85)
            timestamp = datetime.now(timezone.utc).isoformat()

            # Create a message
            data = {
                "sensor_id": sensor_id,
                "timestamp": timestamp,
                "temperature": round(temperature, 2),
                "humidity": round(humidity, 2),
            }

            # Send the message to the topic
            producer.send(topic_name, value=data)
            logger.info(f"Message sent from sensor {sensor_id}: {data}")

            # Wait before sending the next message
            time.sleep(10)
    except KeyboardInterrupt:
        logger.warning(f"Simulation for sensor {sensor_id} stopped.")
    finally:
        producer.close()

# Step 5: Start Sensor Simulations

In [None]:
# Start sensor simulations
num_sensors = 3
sensor_ids = random.sample(range(1000, 9999), num_sensors)

logger.info(f"Starting {num_sensors} sensor simulations with IDs: {sensor_ids}")

sensor_threads = []
for sensor_id in sensor_ids:
    sensor_thread = threading.Thread(target=sensor_simulation, args=(sensor_id, topic_building_sensors))
    sensor_thread.start()
    sensor_threads.append(sensor_thread)

[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Starting 3 sensor simulations with IDs: [2886, 1228, 4479][0m


[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Message sent from sensor 2886: {'sensor_id': 2886, 'timestamp': '2024-11-21T17:46:42.716764+00:00', 'temperature': 33.18, 'humidity': 20.13}[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Message sent from sensor 1228: {'sensor_id': 1228, 'timestamp': '2024-11-21T17:46:42.722758+00:00', 'temperature': 37.71, 'humidity': 19.87}[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - Message sent from sensor 4479: {'sensor_id': 4479, 'timestamp': '2024-11-21T17:46:42.727259+00:00', 'temperature': 27.92, 'humidity': 44.15}[0m
[32m2024-11-21 18:46:52 - KafkaLogger - INFO - Message sent from sensor 2886: {'sensor_id': 2886, 'timestamp': '2024-11-21T17:46:52.762841+00:00', 'temperature': 37.25, 'humidity': 47.17}[0m
[32m2024-11-21 18:46:52 - KafkaLogger - INFO - Message sent from sensor 1228: {'sensor_id': 1228, 'timestamp': '2024-11-21T17:46:52.779486+00:00', 'temperature': 38.27, 'humidity': 42.39}[0m
[32m2024-11-21 18:46:52 - KafkaLog

# Step 6: Data Processing

In [8]:
# Process data from the building_sensors topic
def data_processor(input_topic, temp_alert_topic, humidity_alert_topic):
    consumer = KafkaConsumer(
        input_topic, group_id=f"data_processor_{unique_id}", **common_consumer_config
    )
    producer = KafkaProducer(**common_producer_config)

    try:
        for message in consumer:
            data = message.value
            sensor_id = data["sensor_id"]
            temperature = data["temperature"]
            humidity = data["humidity"]
            timestamp = data["timestamp"]

            # Log incoming data
            logger.info(f"Received data from topic '{input_topic}': {data}")

            # Check temperature
            if temperature > 40:
                alert = {
                    "sensor_id": sensor_id,
                    "timestamp": timestamp,
                    "temperature": temperature,
                    "message": "Temperature exceeds 40°C!",
                }
                # Warning log for temperature threshold
                logger.warning(f"Temperature threshold exceeded for sensor {sensor_id}: {temperature}°C")
                producer.send(temp_alert_topic, value=alert)
                logger.info(f"Temperature alert sent to topic '{temp_alert_topic}': {alert}")

            # Check humidity
            if humidity > 80 or humidity < 20:
                alert = {
                    "sensor_id": sensor_id,
                    "timestamp": timestamp,
                    "humidity": humidity,
                    "message": "Humidity out of range (20%-80%)!",
                }
                # Warning log for humidity threshold
                logger.warning(f"Humidity threshold exceeded for sensor {sensor_id}: {humidity}%")
                producer.send(humidity_alert_topic, value=alert)
                logger.info(f"Humidity alert sent to topic '{humidity_alert_topic}': {alert}")

    except KeyboardInterrupt:
        logger.warning("Data processing stopped.")
    finally:
        consumer.close()
        producer.close()

# Step 7: Start Data Processor

In [None]:
# Start the data processor
input_topic = topic_building_sensors
temp_alert_topic = topic_temperature_alerts
humidity_alert_topic = topic_humidity_alerts

# Launch the data processor in a separate thread
processor_thread = threading.Thread(target=data_processor, args=(input_topic, temp_alert_topic, humidity_alert_topic))
processor_thread.start()

[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Received data from topic 'building_sensors_goit_goit_de_hw_05': {'sensor_id': 9144, 'timestamp': '2024-11-21T17:46:37.332042+00:00', 'temperature': 43.6, 'humidity': 17.87}[0m
[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Temperature alert sent to topic 'temperature_alerts_goit_goit_de_hw_05': {'sensor_id': 9144, 'timestamp': '2024-11-21T17:46:37.332042+00:00', 'temperature': 43.6, 'message': 'Temperature exceeds 40°C!'}[0m
[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Humidity alert sent to topic 'humidity_alerts_goit_goit_de_hw_05': {'sensor_id': 9144, 'timestamp': '2024-11-21T17:46:37.332042+00:00', 'humidity': 17.87, 'message': 'Humidity out of range (20%-80%)!'}[0m
[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Received data from topic 'building_sensors_goit_goit_de_hw_05': {'sensor_id': 5774, 'timestamp': '2024-11-21T17:46:37.333598+00:00', 'temperature': 31.58, 'humidity': 23.02}[0m
[32m2024-11-21 18:46:49 - KafkaLogger -

# Step 8: Alert Listener

In [10]:
# Read alerts from temperature_alerts and humidity_alerts topics
def alert_listener(temp_alert_topic, humidity_alert_topic):
    consumer = KafkaConsumer(
        temp_alert_topic,
        humidity_alert_topic,
        group_id=f"alert_listener_{unique_id}",
        **common_consumer_config,
    )

    try:
        for message in consumer:
            alert = message.value
            logger.info(f"Alert received from topic '{message.topic}': {alert}")
    except KeyboardInterrupt:
        logger.warning("Alert reading stopped.")
    finally:
        consumer.close()

# Start 9: Start Alert Listener

In [None]:
# Start the alert listener
temp_alert_topic = topic_temperature_alerts
humidity_alert_topic = topic_humidity_alerts

# Launch the alert listener in a separate thread
listener_thread = threading.Thread(target=alert_listener, args=(temp_alert_topic, humidity_alert_topic))
listener_thread.start()

[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Alert received from topic 'humidity_alerts_goit_goit_de_hw_05': {'sensor_id': 9144, 'timestamp': '2024-11-21T17:46:37.332042+00:00', 'humidity': 17.87, 'message': 'Humidity out of range (20%-80%)!'}[0m
[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Alert received from topic 'temperature_alerts_goit_goit_de_hw_05': {'sensor_id': 9144, 'timestamp': '2024-11-21T17:46:37.332042+00:00', 'temperature': 43.6, 'message': 'Temperature exceeds 40°C!'}[0m
[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Alert received from topic 'temperature_alerts_goit_goit_de_hw_05': {'sensor_id': 9141, 'timestamp': '2024-11-21T17:46:37.476248+00:00', 'temperature': 42.72, 'message': 'Temperature exceeds 40°C!'}[0m
[32m2024-11-21 18:46:49 - KafkaLogger - INFO - Alert received from topic 'humidity_alerts_goit_goit_de_hw_05': {'sensor_id': 9144, 'timestamp': '2024-11-21T17:46:37.332042+00:00', 'humidity': 17.87, 'message': 'Humidity out of range (20%-80%)!'}[0m

# Step 10: List Topics with Unique Identifier

In [12]:
# Display the list of topics containing the unique identifier
topics = admin_client.list_topics()
logger.info("List of topics:")
for topic in topics:
    if unique_id in topic:
        logger.info(f"- {topic}")

[32m2024-11-21 18:46:42 - KafkaLogger - INFO - List of topics:[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - - temperature_alerts_goit_goit_de_hw_05[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - - building_sensors_goit_goit_de_hw_05[0m
[32m2024-11-21 18:46:42 - KafkaLogger - INFO - - humidity_alerts_goit_goit_de_hw_05[0m
