In [None]:
#Src
# Read Kafka stream -- Target Postgres DB
# Read Batch file -- Target Postgres DB

In [1]:
pip install psycopg2-binary==2.9.9

Note: you may need to restart the kernel to use updated packages.


In [3]:
import psycopg2

# Connect to the database
conn = psycopg2.connect(
    host="127.0.0.1",
    port=5433,
    database="de_proj",
    user="postgres",
    password="postgres"
)

# Create a cursor object
cur = conn.cursor()


def try_execute_sql(sql: str):
    try:
        cur.execute(sql)
        conn.commit()
        print(f"Executed table creation successfully")
    except Exception as e:
        print(f"Couldn't execute table creation due to exception: {e}")
        conn.rollback()
    # Close the cursor and connection
    cur.close()
    conn.close()

def create_table():
    """
    Creates the sensorInfo table and its columns.
    """
    # SQL to create a table
    create_table_sql = """
        CREATE TABLE IF NOT EXISTS stg_SensorInfo (
            record JSONB,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        CREATE TABLE IF NOT EXISTS stg_SensorInfo_json (
            record JSONB,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        
        CREATE TABLE IF NOT EXISTS SensorInfo (
            id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
            timestamp TIMESTAMP NOT NULL,
            sensor_id INTEGER,
            value DOUBLE PRECISION,
            city VARCHAR(50),
            country VARCHAR(50),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """

    try_execute_sql(create_table_sql)

if __name__ == "__main__":
    create_table()

Executed table creation successfully


In [5]:
import psycopg2

def postgres_conn():
    # Database connection settings
    conn = psycopg2.connect(
        host="127.0.0.1",
        port=5433,
        database="de_proj",
        user="postgres",
        password="postgres"
    )
    
    return conn

In [7]:
pip install kafka-python




In [8]:
from kafka import KafkaProducer
import json
import time
import random

# Kafka Producer Configuration
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  # broker IP
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Simulate producing data
def generate_data():
    location_dict = {
    'ON': 'Ajax',
    'ON': 'Milton',
    'ON': 'Oakville',
    'ON': 'Whitby',
    'BC': 'Port Moody',
    'BC': 'Delta',
    'BC': 'Burnaby',
    'BC': 'Langley'
    }

    selected_key, selected_value = random.choice(list(location_dict.items()))
    
    data = {
        'timestamp': int(time.time()),
        'sensor_id': random.randint(1, 5),
        'value': random.random() * 100,
        'city': selected_value,
        'country': selected_key
    }
    return data

# Produce data to input-topic
def produce_data():
    count=0
    #while True:
    while count<10:
        data = generate_data()
        print(f"Sending: {data}")
        producer.send('sensorInfo', value=data)
        time.sleep(1)
        count +=1

if __name__ == '__main__':
    produce_data()

Sending: {'timestamp': 1747800450, 'sensor_id': 3, 'value': 38.10958339372211, 'city': 'Langley', 'country': 'BC'}
Sending: {'timestamp': 1747800451, 'sensor_id': 5, 'value': 86.47944019318841, 'city': 'Whitby', 'country': 'ON'}
Sending: {'timestamp': 1747800452, 'sensor_id': 5, 'value': 55.68616192337594, 'city': 'Whitby', 'country': 'ON'}
Sending: {'timestamp': 1747800453, 'sensor_id': 3, 'value': 72.73500269331272, 'city': 'Whitby', 'country': 'ON'}
Sending: {'timestamp': 1747800454, 'sensor_id': 5, 'value': 60.816383297303844, 'city': 'Whitby', 'country': 'ON'}
Sending: {'timestamp': 1747800455, 'sensor_id': 5, 'value': 39.659161419315446, 'city': 'Whitby', 'country': 'ON'}
Sending: {'timestamp': 1747800456, 'sensor_id': 5, 'value': 26.367020153334806, 'city': 'Langley', 'country': 'BC'}
Sending: {'timestamp': 1747800457, 'sensor_id': 1, 'value': 19.305507618278938, 'city': 'Whitby', 'country': 'ON'}
Sending: {'timestamp': 1747800458, 'sensor_id': 1, 'value': 75.43290730206103, 'ci

In [9]:
from kafka import KafkaConsumer
import time

# Connect to Kafka and subscribe to topic
consumer = KafkaConsumer(
    'sensorInfo',                           # Replace with your topic
    bootstrap_servers='localhost:9092',     # Match your Docker setup
    auto_offset_reset='earliest',           # Start from the beginning if no offset
    group_id='consumer-group',           # Consumer group ID
    enable_auto_commit=True,                # Commit offsets automatically
    value_deserializer=lambda x: x.decode('utf-8')  # Decode bytes to string
)

# Continuously listen for messages
def de_kafka_consumer():
    # List to store messages
    messages = []
    
    # Poll for messages during a time window (e.g., 10 seconds)
    timeout_secs = 10
    end_time = time.time() + timeout_secs
    
    print(f"Polling messages for {timeout_secs} seconds...")
    
    while time.time() < end_time:
        # Poll with short timeout to collect messages incrementally
        polled = consumer.poll(timeout_ms=10)
    
        for topic_partition, records in polled.items():
            for record in records:
                #print(f"Received: {record.value}")
                messages.append(record.value)

    print(f"Done. Collected {len(messages)} messages.")

    import json

    filepath="data/kafka_messages.json"
    with open(filepath, "w") as f:
        json.dump(messages, f, indent=2)
        
    print(f"Data written to file: ",{filepath})
    
if __name__ == '__main__':
    de_kafka_consumer()

Polling messages for 10 seconds...
Done. Collected 10 messages.
Data written to file:  {'data/kafka_messages.json'}


In [10]:
#TRANSFORMATION

In [53]:
import json

def transform_data():
    filepath="data/kafka_messages.json"
    with open(filepath, "r") as f:
        content = f.read()
        data = json.loads(content)
   
    #print(data)  # This will be a Python list
    
    conn = postgres_conn()
    cur= conn.cursor()
    
    #Insert data into table
    for item in data:
        record = json.loads(item)  # parse each string in the list
        print(record['timestamp'],record['sensor_id'], record['value'],record['city'], record['country'])
        cur.execute(
            "INSERT INTO SensorInfo (timestamp,sensor_id,value,city,country) VALUES (TO_TIMESTAMP(%s), %s, %s, %s, %s) ON CONFLICT (id) DO NOTHING",
            (record['timestamp'],record['sensor_id'], record['value'],record['city'], record['country'])
        )
    
    #Commit and close
    conn.commit()
    cur.close()
    conn.close()
    
    print("Data inserted successfully.")

if __name__ == '__main__':
    transform_data()

1747787759 4 56.77640368566098 Whitby ON
1747787761 2 59.12027390887851 Langley BC
1747787762 2 62.66116270625716 Langley BC
1747787763 5 2.343682598834418 Whitby ON
1747787764 1 87.08696766999357 Whitby ON
1747787765 4 20.357644775637162 Whitby ON
1747787766 5 1.047685939513776 Langley BC
1747787767 4 97.82147529631257 Langley BC
1747787768 5 92.92027866265691 Whitby ON
1747787769 4 71.83390551365967 Langley BC
1747789394 3 49.278125440360796 Whitby ON
1747789396 5 93.89550366396267 Langley BC
1747789397 5 73.54078772029678 Whitby ON
1747789398 1 97.1560479452384 Whitby ON
1747789399 3 69.78298885556862 Whitby ON
1747789400 5 35.483795180988544 Whitby ON
1747789401 2 36.9241120094266 Whitby ON
1747789402 2 77.28465396865352 Whitby ON
1747789403 4 19.165706730030664 Whitby ON
1747789404 5 56.65409248907063 Whitby ON
Data inserted successfully.


In [15]:
#VALIDATION

conn = postgres_conn()
cur= conn.cursor()
cur.execute(
            "SELECT * FROM SensorInfo;"
        )
result = cur.fetchall()
# Print the results to validate the insertion
print("Validation Result:")
for row in result:
    print(row)
    
cur.close()
conn.close()

Validation Result:
(1, datetime.datetime(2025, 5, 21, 0, 35, 59), 4, 56.77640368566098, 'Whitby', 'ON', datetime.datetime(2025, 5, 21, 1, 4, 56, 324946))
(2, datetime.datetime(2025, 5, 21, 0, 36, 1), 2, 59.12027390887851, 'Langley', 'BC', datetime.datetime(2025, 5, 21, 1, 4, 56, 324946))
(3, datetime.datetime(2025, 5, 21, 0, 36, 2), 2, 62.66116270625716, 'Langley', 'BC', datetime.datetime(2025, 5, 21, 1, 4, 56, 324946))
(4, datetime.datetime(2025, 5, 21, 0, 36, 3), 5, 2.343682598834418, 'Whitby', 'ON', datetime.datetime(2025, 5, 21, 1, 4, 56, 324946))
(5, datetime.datetime(2025, 5, 21, 0, 36, 4), 1, 87.08696766999357, 'Whitby', 'ON', datetime.datetime(2025, 5, 21, 1, 4, 56, 324946))
(6, datetime.datetime(2025, 5, 21, 0, 36, 5), 4, 20.357644775637162, 'Whitby', 'ON', datetime.datetime(2025, 5, 21, 1, 4, 56, 324946))
(7, datetime.datetime(2025, 5, 21, 0, 36, 6), 5, 1.047685939513776, 'Langley', 'BC', datetime.datetime(2025, 5, 21, 1, 4, 56, 324946))
(8, datetime.datetime(2025, 5, 21, 0, 

In [1]:
#PYSPARK

In [3]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [61]:
import pyspark
print(pyspark.__version__)

3.5.0


In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import col

import os
os.environ["HADOOP_HOME"] = "C:/hadoop/hadoop-3.3.5"

def create_spark_session() -> SparkSession:
    spark = (
        SparkSession.builder.appName("Transform data with PySpark")
        .config("spark.streaming.stopGracefullyOnShutdown", True) 
        .master("local[*]") 
        .getOrCreate()
    )

    print("Spark session created successfully")
    return spark

    
def spark_transform_data_load_table(filepath,spark_session):
    import os
    print(os.getcwd()) 

    # To allow automatic schemaInference while reading
    spark.conf.set("spark.sql.streaming.schemaInference", True)
    
    # Create the streaming_df to read from input directory
    streaming_df = (
        spark
        .readStream
        .format("json")
        .load(filepath)
    )

    # To the schema of the data, place a sample json file and change readStream to read 
    streaming_df.printSchema()

    # Lets explode the data as devices contains list/array of device reading    
    exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))
    
    # Check the schema of the exploded_df, place a sample json file and change readStream to read 
    exploded_df.printSchema()

    # Flatten the exploded df    
    flattened_df = (
        exploded_df
        .drop("data")
        .withColumn("deviceId", col("data_devices.deviceId"))
        .withColumn("measure", col("data_devices.measure"))
        .withColumn("status", col("data_devices.status"))
        .withColumn("temperature", col("data_devices.temperature"))
        .drop("data_devices")
    )
    # Check the schema of the flattened_df, place a sample json file and change readStream to read 
    flattened_df.printSchema()
    
    # Write the output to console sink to check the output
    flattened_df.writeStream \
        .format("csv") \
        .option("path", "data/device_files/output/") \
        .option("checkpointLocation", "output/stream_parquet/checkpoints/") \
        .outputMode("append") \
        .start()
    
    print('write completed')
    
if __name__ == '__main__':
    filepath="data/device_files/input/device_data.json"
    spark=create_spark_session()
    spark_transform_data_load_table(filepath,spark)
    #transform_data_load_table(filepath)

    # Stop the Spark session when done
    spark.stop()

Spark session created successfully
C:\Users\russe\Desktop\JaniceProj\GitProjects\de_e2e
root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    