In [20]:
!pip install kafka-python==2.0.2
!pip show kafka-python
!pip install mysql-connector-python


Name: kafka-python
Version: 2.0.2
Summary: Pure Python client for Apache Kafka
Home-page: https://github.com/dpkp/kafka-python
Author: Dana Powers
Author-email: dana.powers@gmail.com
License: Apache License 2.0
Location: /usr/local/lib/python3.11/dist-packages
Requires: 
Required-by: 


In [21]:
import mysql.connector
from google.colab import userdata

# Load credentials from secrets

KAFKA_HOST = userdata.get("KAFKA_HOST")
MYSQL_HOST = userdata.get("MYSQL_HOST")
MYSQL_USER = userdata.get("MYSQL_USER")
MYSQL_PASS = userdata.get("MYSQL_PASS")

# Replace with your MySQL server details
conn = mysql.connector.connect(
    host=MYSQL_HOST,
    user=MYSQL_USER,
    password=MYSQL_PASS,
    database="iot"
)

cursor = conn.cursor()
print("Connected to MySQL!")


Connected to MySQL!


In [22]:
# Create database if it doesn't exist
cursor.execute("CREATE DATABASE IF NOT EXISTS iot")
cursor.execute("USE iot")

# Create table
cursor.execute("""
    CREATE TABLE IF NOT EXISTS sensor_data (
        id INT AUTO_INCREMENT PRIMARY KEY,
        sensor_id VARCHAR(50),
        temperature FLOAT,
        pressure FLOAT,
        humidity FLOAT,
        battery_level FLOAT,
        location VARCHAR(100),
        sensor_type VARCHAR(50),
        device_status VARCHAR(50),
        timestamp DOUBLE
    )
""")

print("Database and table created successfully!")


Database and table created successfully!


In [23]:
import json
from kafka import KafkaConsumer


KAFKA_BROKER = f"{KAFKA_HOST}:9093"

# Kafka Consumer
consumer = KafkaConsumer(
    "iot-sensors",
    bootstrap_servers= KAFKA_BROKER,
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Convert JSON to dict
)

print("Listening for IoT Sensor Data...")

for message in consumer:
    data = message.value
    print(f"Received: {data}")

    # Insert Data into MySQL
    sql = """
    INSERT INTO sensor_data (sensor_id, temperature, pressure, humidity, battery_level, location, sensor_type, device_status, timestamp)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    values = (
        data["sensor_id"],
        data["temperature"],
        data["pressure"],
        data["humidity"],
        data["battery_level"],
        data["location"],
        data["sensor_type"],
        data["device_status"],
        data["timestamp"],
    )

    cursor.execute(sql, values)
    conn.commit()

print("Data successfully stored in MySQL!")



[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Received: {'sensor_id': 'sensor-04', 'temperature': 27.98, 'pressure': 916.77, 'humidity': 54.95, 'battery_level': 37.41, 'location': 'Factory Floor', 'sensor_type': 'Motion', 'device_status': 'offline', 'timestamp': 1742090007.6009681}
Received: {'sensor_id': 'sensor-04', 'temperature': 24.86, 'pressure': 920.71, 'humidity': 46.2, 'battery_level': 70.01, 'location': 'Outdoor', 'sensor_type': 'Proximity', 'device_status': 'offline', 'timestamp': 1742090007.601042}
Received: {'sensor_id': 'sensor-03', 'temperature': 22.05, 'pressure': 1069.42, 'humidity': 44.67, 'battery_level': 69.22, 'location': 'Warehouse', 'sensor_type': 'Proximity', 'device_status': 'inactive', 'timestamp': 1742090007.6011117}
Received: {'sensor_id': 'sensor-02', 'temperature': 24.36, 'pressure': 977.85, 'humidity': 51.84, 'battery_level': 3.58, 'location': 'Warehouse', 'sensor_type': 'Temperature', 'device_status': 'maintenance', 'timestamp': 1742090

ERROR:kafka.conn:<BrokerConnection node_id=0 host=35.224.95.121:9093 <connected> [IPv4 ('35.224.95.121', 9093)]>: socket disconnected
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=35.224.95.121:9093 <connecting> [IPv4 ('35.224.95.121', 9093)]> returned error 111. Disconnecting.


Received: {'sensor_id': 'sensor-01', 'temperature': 27.51, 'pressure': 961.63, 'humidity': 57.16, 'battery_level': 92.72, 'location': 'Room B', 'sensor_type': 'Humidity', 'device_status': 'maintenance', 'timestamp': 1742090008.1295395}


ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=35.224.95.121:9093 <connecting> [IPv4 ('35.224.95.121', 9093)]> returned error 111. Disconnecting.
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=35.224.95.121:9093 <connecting> [IPv4 ('35.224.95.121', 9093)]> returned error 111. Disconnecting.
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=35.224.95.121:9093 <connecting> [IPv4 ('35.224.95.121', 9093)]> returned error 111. Disconnecting.
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=35.224.95.121:9093 <connecting> [IPv4 ('35.224.95.121', 9093)]> returned error 111. Disconnecting.
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=35.224.95.121:9093 <connecting> [IPv4 ('35.224.95.121', 9093)]> returned error 111. Disconnecting.
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=35.224.95.121:9093 <connecting> [IPv4 ('35.224.95.121', 9093)]> returned error 111. Disconn

KeyboardInterrupt: 