
# **Section B.⚙️𝒟𝒶𝓉𝒶 𝐼𝓃𝓉𝑒𝑔𝓇𝒶𝓉𝒾𝑜𝓃 𝒫𝒾𝓅𝑒𝓁𝒾𝓃𝑒**


### Improvements: Streaming and Output Handling

To simulate a real-time streaming scenario and avoid reading stale or duplicated data, the following changes were made:
1. Added directory cleanup before each run.
2. Ensured input and output directories are separate.
3. Added a simulated streaming data producer using the existing JSON file.


In [None]:

import time
import json
import shutil
from pathlib import Path

# Simulate streaming by copying lines from MOCK_DATA.json into a stream directory
input_json_path = Path("MOCK_DATA.json")
streaming_input_dir = Path("streaming_input")
streaming_input_dir.mkdir(exist_ok=True)

# Clean previous streaming input
shutil.rmtree(streaming_input_dir)
streaming_input_dir.mkdir()

# Read the JSON data
with open(input_json_path, 'r') as file:
    records = json.load(file)

# Write each record as a separate line-delimited JSON file (1 file per record)
for i, record in enumerate(records[:20]):  # simulate 20 records
    with open(streaming_input_dir / f"event_{i}.json", 'w') as out_file:
        json.dump(record, out_file)
    time.sleep(1)  # simulate delay between events


In [None]:

from pyspark.sql.functions import col, to_timestamp, hour, dayofweek, dayofmonth, month, year
import shutil

# Define paths
input_path = "streaming_input"
output_path = "streaming_output"
checkpoint_path = "streaming_checkpoint"

# Clean old output and checkpoint
shutil.rmtree(output_path, ignore_errors=True)
shutil.rmtree(checkpoint_path, ignore_errors=True)

# Load streaming data
streaming_df = spark.readStream.schema(df.schema).json(input_path)

# Transform timestamp and extract features
streaming_df = streaming_df.withColumn("timestamp", to_timestamp("timestamp"))
streaming_df = streaming_df.withColumn("hour", hour(col("timestamp"))) \
                           .withColumn("weekday", dayofweek(col("timestamp"))) \
                           .withColumn("day", dayofmonth(col("timestamp"))) \
                           .withColumn("month", month(col("timestamp"))) \
                           .withColumn("year", year(col("timestamp")))

# Write stream with proper checkpointing
query = streaming_df.writeStream \
    .format("parquet") \
    .option("path", output_path) \
    .option("checkpointLocation", checkpoint_path) \
    .outputMode("append") \
    .start()

query.awaitTermination(30)  # run streaming for 30 seconds then stop


# 🛠️ *Extraction*

###  📡Kafka Section

`Install Kafka & Zookeeper`

In [21]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Connecting to security.ubuntu.com (91.189.91.81)] [Connected to cloud.r-pro                                                                               Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
0% [Connecting to security.ubuntu.com (91.189.91.81)] [Connected to cloud.r-pro                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:6 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRel

`Simulated Streaming Data`

In [22]:
# Simulate streaming input files
import os
import json
import time
import shutil

# Make directories
input_dir = '/content/input_stream/'
os.makedirs(input_dir, exist_ok=True)

# Load MOCK data
import pandas as pd
data = pd.read_json('/content/MOCK_DATA.json')

# Stream data: one row at a time into new files
for idx, row in data.iterrows():
    single_record = pd.DataFrame([row])
    single_record.to_json(f'/content/input_stream/record_{idx}.json', orient='records', lines=True)
    time.sleep(0.5)  # simulate 0.5 seconds between events

print("Simulated streaming data created.")

Simulated streaming data created.


`Creating Sparksession`

In [23]:
# Start SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import from_json, col

spark = SparkSession.builder.appName("ColabStreamingMockData").master("local[*]").getOrCreate()

print("Spark Session started.")

Spark Session started.


`Defining Schema`

In [24]:

from pyspark.sql.functions import col, to_timestamp, hour, dayofweek, dayofmonth, month, year

# Convert timestamp column to actual timestamp type
df = df.withColumn("timestamp", to_timestamp("timestamp"))

# Extract time-based features
df = df.withColumn("hour", hour(col("timestamp"))) \
       .withColumn("weekday", dayofweek(col("timestamp"))) \
       .withColumn("day", dayofmonth(col("timestamp"))) \
       .withColumn("month", month(col("timestamp"))) \
       .withColumn("year", year(col("timestamp"))) \
       .withColumn("is_weekend", (dayofweek(col("timestamp")) >= 6).cast("int")) \
       .withColumn("is_night_user", ((hour(col("timestamp")) < 6) | (hour(col("timestamp")) > 20)).cast("int"))


`Read Streaming JSON Data into Spark DataFrame`

In [25]:
# Read stream from input directory

df_stream = (
    spark.readStream
    .schema(schema)
    .option("maxFilesPerTrigger", 1)
    .json("/content/input_stream/")
)
df_stream.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- country: string (nullable = true)
 |-- event_type: string (nullable = true)



# 🌱Transformation

### 🐍🔥 PySpark

`Importing important libraries and functions`

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, sum as spark_sum, count, countDistinct,
    to_date, date_format, to_timestamp,
    hour, dayofweek, month, year, when
)
from pyspark.sql.types import StructType, StringType, TimestampType, IntegerType


In [None]:
query = (
    df_stream.writeStream
    .format("console")        # output to console, like .show()
    .outputMode("append")     # or "complete" depending on your use case
    .start()
)

query.awaitTermination()

`Dropping Null Values`

In [27]:
df_stream = df_stream.na.drop()


There was not much null values. Only one null value

`Converting timestamp and extract features from it`

In [29]:
df_stream = df_stream.withColumn(
    "timestamp",
    to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss'Z'")
)
df_stream = df_stream.withColumn("date", to_date("timestamp")) \
    .withColumn("time", date_format("timestamp", "HH:mm:ss")) \
    .withColumn("date", to_date("timestamp")) \
    .withColumn("time", date_format("timestamp", "HH:mm:ss")) \
    .withColumn("hour", hour("timestamp")) \
    .withColumn("weekday", dayofweek("timestamp")) \
    .withColumn("month", month("timestamp")) \
    .withColumn("year", year("timestamp")) \
    .withColumn("is_weekend", when(col("weekday").isin(1, 7), 1).otherwise(0)) \
    .withColumn("is_night_user", when((col("hour") >= 0) & (col("hour") <= 6), 1).otherwise(0))

* The above line fo code generates the new coloumns in table which are date,time, hour, weekday, month, year , is_weekend and is_night_user.
* Please remember that the is_weekend the coloumn with binary values for 1 as weekend meaning its saturday and sunday
* Is_night_user the coloumn with hours between 0-6 which is the night time.  
* weekday is the coloumn with the numbers from 1-7 and the 1 showing Sunday and 7 as saturday.

`Event Type Count`

In [31]:
event_counts = df_stream.groupBy("event_type").agg(count("*").alias("count"))
event_counts_query = event_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .start()

`Count per Gender`

In [32]:
gender_counts = df_stream.groupBy("gender").count()
gender_counts_query = gender_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

`Count per Country`

In [33]:
country_counts = df_stream.groupBy("country").count()
country_counts_query = country_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

`Unique users per country`

In [34]:
from pyspark.sql.functions import approx_count_distinct

unique_users = df_stream.groupBy("country").agg(approx_count_distinct("id").alias("unique_users"))

unique_users_query = unique_users.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .start()

`Hourly distribution`

In [35]:
hourly = df_stream.groupBy("hour").count()
hourly_query = hourly.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

`Weekday distribution`

In [36]:
weekday = df_stream.groupBy("weekday").count()
weekday_query = weekday.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

`Country vs Event Type`

In [37]:
country_event = df_stream.groupBy("country", "event_type").count()
country_event_query = country_event.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

` Gender vs Event Type`

In [38]:
gender_event = df_stream.groupBy("gender", "event_type").count()
gender_event_query = gender_event.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# 🚚Load(Data Lake Storage)

In [39]:
query = df_stream.writeStream \
    .format("parquet") \
    .option("path", "/content/output_stream/") \
    .option("checkpointLocation", "/output/_checkpoint/") \
    .trigger(once=True) \
    .start()
query.awaitTermination()

In [40]:
spark.read.parquet("/content/output_stream/").show()

+---+----------+--------------+--------------------+-----------+---------------+---------+--------------------+----------+----+----+----+-------+-----+----+----------+-------------+
| id|first_name|     last_name|               email|     gender|     ip_address|timestamp|             country|event_type|date|time|hour|weekday|month|year|is_weekend|is_night_user|
+---+----------+--------------+--------------------+-----------+---------------+---------+--------------------+----------+----+----+----+-------+-----+----+----------+-------------+
|835|    Corene|  Dmitrichenko|cdmitrichenkon6@c...|     Female|174.241.178.239|     null|Republic of the C...|     share|null|null|null|   null| null|null|         0|            0|
|509|    Barbie|    Komorowski|bkomorowskie4@goo...|     Female|   150.57.58.54|     null|  Dominican Republic|  purchase|null|null|null|   null| null|null|         0|            0|
| 46|      Tish|      Christou|tchristou19@marke...|     Female| 218.227.108.73|     null|

# MLIB