# Data Faker

In [1]:
import random
from datetime import datetime, timedelta
import numpy as np
import pandas as pd

from faker import Faker

In [2]:
# Initialize Faker and other settings
fake = Faker()

num_sensors = 10  # Number of sensors
start_time = datetime.now() - timedelta(days=1)  # Start from 24 hours ago
time_interval = timedelta(minutes=1)  # Time between sensor readings


# Define sensor types and their data ranges
sensor_types = {
    "temperature": {"min": 15, "max": 35, "unit": "Celsius"},
    "humidity": {"min": 30, "max": 70, "unit": "%"},
    "air_quality": {"min": 50, "max": 200, "unit": "AQI"},
}

# Create a list of sensors with random sensor types
sensors = [
    {
        "sensor_id": f"sensor_{i+1:03}",
        "sensor_type": random.choice(list(sensor_types.keys())),
    }
    for i in range(num_sensors)
]

In [4]:
# Generate synthetic data
def generate_sensor_data(num_readings):
    data = []
    for sensor in sensors:
        current_time = start_time
        for _ in range(num_readings):
            reading = np.random.uniform(
                sensor_types[sensor["sensor_type"]]["min"],
                sensor_types[sensor["sensor_type"]]["max"],
            )
            data.append(
                {
                    "sensor_id": sensor["sensor_id"],
                    "timestamp": current_time,
                    "sensor_type": sensor["sensor_type"],
                    "sensor_reading": round(reading, 2),
                    "unit": sensor_types[sensor["sensor_type"]]["unit"],
                }
            )
            current_time += time_interval
    return data


# Generate data for 24 hours (1440 minutes, 1 reading per minute per sensor)
num_readings = 14
sensor_data = generate_sensor_data(num_readings)

# Convert to pandas DataFrame and save as CSV
df = pd.DataFrame(sensor_data)
df.to_csv("synthetic_iot_sensor_data.csv", index=False)

print(
    f"Generated {len(df)} rows of sensor data and saved to 'synthetic_iot_sensor_data.csv'."
)

Generated 140 rows of sensor data and saved to 'synthetic_iot_sensor_data.csv'.


In [5]:
df.head()

Unnamed: 0,sensor_id,timestamp,sensor_type,sensor_reading,unit
0,sensor_001,2024-11-23 22:15:21.019651,temperature,18.93,Celsius
1,sensor_001,2024-11-23 22:16:21.019651,temperature,28.06,Celsius
2,sensor_001,2024-11-23 22:17:21.019651,temperature,28.84,Celsius
3,sensor_001,2024-11-23 22:18:21.019651,temperature,31.73,Celsius
4,sensor_001,2024-11-23 22:19:21.019651,temperature,30.01,Celsius


# Kafka Producer

In [6]:
import time
import json
import pandas as pd
from kafka import KafkaProducer

In [7]:
# Load the synthetic sensor data
df = pd.read_csv('synthetic_iot_sensor_data.csv')
df.head()

Unnamed: 0,sensor_id,timestamp,sensor_type,sensor_reading,unit
0,sensor_001,2024-11-23 22:15:21.019651,temperature,18.93,Celsius
1,sensor_001,2024-11-23 22:16:21.019651,temperature,28.06,Celsius
2,sensor_001,2024-11-23 22:17:21.019651,temperature,28.84,Celsius
3,sensor_001,2024-11-23 22:18:21.019651,temperature,31.73,Celsius
4,sensor_001,2024-11-23 22:19:21.019651,temperature,30.01,Celsius


In [8]:
# Create a Kafka producer
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda x: json.dumps(x).encode("utf-8"),
)

In [9]:
from tqdm import tqdm

In [10]:
# Stream the data row by row to Kafka
for index, row in tqdm(df.iterrows(), total=len(df)):
    data = {
        'sensor_id': row['sensor_id'],
        'timestamp': row['timestamp'],
        'sensor_type': row['sensor_type'],
        'sensor_reading': row['sensor_reading'],
        'unit': row['unit'],
    }
    producer.send('iot-sensor-data', value=data)
    time.sleep(0.1)  # Simulate streaming with a small delay

producer.flush()

100%|██████████| 140/140 [00:14<00:00,  9.83it/s]


# Cassandra Client Setup

In [2]:
from kafka import KafkaConsumer
from cassandra.cluster import Cluster
import json

In [3]:
# Create a Cassandra connection
cluster = Cluster(["127.0.0.1"])
session = cluster.connect("iot_data")

In [4]:
# Create Kafka consumer to listen to "iot-sensor-data" topic
consumer = KafkaConsumer(
    "iot-sensor-data",
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode("utf-8")),
)

In [4]:
for message in consumer:
    break

In [None]:
consumer.

In [None]:
# Insert data from Kafka into Cassandra
for message in tqdm(consumer):
    data = message.value
    query = f"""
            INSERT INTO iot_data.sensor_readings 
            (sensor_id, timestamp, sensor_type, sensor_reading, unit) 
            VALUES ('{str(data['sensor_id'])}', '{data['timestamp']}', '{data['sensor_type']}', {data['sensor_reading']}, '{str(data['unit'])}')
            """
    session.execute(query)

print("Data inserted into Cassandra from Kafka")

0it [00:00, ?it/s]

In [1]:
from cassandra.cluster import Cluster

# Connect to Cassandra
cluster = Cluster(["127.0.0.1"])
session = cluster.connect("iot_data")

NoHostAvailable: ("Unable to connect to any servers using keyspace 'iot_data'", ['127.0.0.1'])

In [9]:
sensor_id = "sensor_005"
sensor_type = "air_quality"
query = f""" SELECT * FROM iot_data.sensor_readings WHERE sensor_id='{sensor_id}' and sensor_type = '{sensor_type}' """

rows = session.execute(query)
result = [
    {
        "sensor_id": row.sensor_id,
        "timestamp": row.timestamp,
        "sensor_type": row.sensor_type,
        "sensor_reading": row.sensor_reading,
        "unit": row.unit,
    }
    for row in rows
]

In [10]:
result

[{'sensor_id': 'sensor_005',
  'timestamp': datetime.datetime(2024, 11, 23, 22, 15, 21, 19000),
  'sensor_type': 'air_quality',
  'sensor_reading': 99.9800033569336,
  'unit': 'AQI'},
 {'sensor_id': 'sensor_005',
  'timestamp': datetime.datetime(2024, 11, 23, 22, 16, 21, 19000),
  'sensor_type': 'air_quality',
  'sensor_reading': 78.23999786376953,
  'unit': 'AQI'},
 {'sensor_id': 'sensor_005',
  'timestamp': datetime.datetime(2024, 11, 23, 22, 17, 21, 19000),
  'sensor_type': 'air_quality',
  'sensor_reading': 61.459999084472656,
  'unit': 'AQI'},
 {'sensor_id': 'sensor_005',
  'timestamp': datetime.datetime(2024, 11, 23, 22, 18, 21, 19000),
  'sensor_type': 'air_quality',
  'sensor_reading': 92.37000274658203,
  'unit': 'AQI'},
 {'sensor_id': 'sensor_005',
  'timestamp': datetime.datetime(2024, 11, 23, 22, 19, 21, 19000),
  'sensor_type': 'air_quality',
  'sensor_reading': 89.95999908447266,
  'unit': 'AQI'},
 {'sensor_id': 'sensor_005',
  'timestamp': datetime.datetime(2024, 11, 23,