In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
# from pyspark.sql.functions import col, window, explode , from_json , count ,approx_count_distinct,to_timestamp
from pyspark.sql.functions import *
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType,TimestampType
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder \
    .appName("KafkaClickstream") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.hadoop.io.native.lib", "False") \
    .master("local[*]") \
    .getOrCreate()

In [4]:
import sys
print(sys.executable)

C:\Users\hp\AppData\Local\Programs\Python\Python312\python.exe


In [5]:
import os
os.environ["PYSPARK_PYTHON"] = "python"

In [6]:
# Define the schema for the incoming JSON data
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),  # Changed to TimestampType
    StructField("event_type", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("session_id", StringType(), True),
    StructField("user_name", StringType(), True),  # New field for user name
    StructField("user_email", StringType(), True),  # New field for user email
    StructField("user_location", StringType(), True)  # New field for user location
])

In [7]:
# Read the stream from Kafka
kafka_stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", "realtimedashboard") \
    .option('startingOffsets', 'earliest') \
    .load()

In [8]:
# Extract the message value and parse JSON into structured data
clickstream_df = kafka_stream_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

In [9]:
# Filter out only "click" and "purchase" events
filtered_df = clickstream_df.filter(clickstream_df.event_type.isin('click', 'purchase'))

In [10]:
parsed_df = filtered_df.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"))

In [11]:
product_csv_path = r"C:/Users/hp/Kafka poc/Real_Time_Streaming/dim_products.csv"

product_df = spark.read.option("header", "true").csv(product_csv_path, inferSchema=True)

In [12]:
product_df.show(5,truncate=True)

+----------+--------------+--------+--------------------+--------------------+------+-----+--------+------------+--------------------+--------------+--------+----------+----------+
|product_id|  product_name|category|               brand|       supplier_name| price|color|    size|release_date|         description|stock_quantity|discount|created_at|updated_at|
+----------+--------------+--------+--------------------+--------------------+------+-----+--------+------------+--------------------+--------------+--------+----------+----------+
|      3778|Life President|  Beauty|        Hall-Mccarty|     Johnson-Johnson|869.03| Blue|  Medium|  2024-01-18|Follow suggest it...|           345|    0.43|2025-02-07|2025-02-03|
|      9871|   Bring Woman|    Home|         Baldwin PLC|       Rodriguez Inc|697.09|Green|One size|  2021-11-05|Alone left throug...|           318|    0.42|2025-02-08|2025-02-18|
|      3441|     Door Side|    Home|Taylor, Gutierrez...|           Singh LLC| 495.1|White|One 

In [27]:
# Join the clickstream data with the product information
enriched_df = parsed_df.join(product_df, "product_id", "inner")

Total Revenue by Category

In [28]:
# Calculate revenue for each transaction (price * (1 - discount))
enriched_df = enriched_df.withColumn("revenue", enriched_df["price"] * (1 - enriched_df["discount"]))

enriched_df_with_watermark = enriched_df.withWatermark("timestamp", "5 seconds")

# Total revenue by category
total_revenue_by_category = enriched_df_with_watermark.withColumn("window", window("timestamp", "5 seconds")).groupBy("category","window").agg(sum("revenue").alias("total_revenue"))

In [16]:
def process_batch(batch_df, batch_id):
    print(f"Processing batch {batch_id}")
    batch_count = batch_df.count()  # Check how many rows are in this batch
    print(f"Number of rows in batch {batch_id}: {batch_count}")
    batch_df.show(5, truncate=False)  # Display a few rows of the batch

In [17]:
query = total_revenue_by_category_with_watermark \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime="5 seconds") \
    .foreachBatch(process_batch) \
    .start()

Processing batch 0
Number of rows in batch 0: 488
+--------+------------------------------------------+-------------+
|category|window                                    |total_revenue|
+--------+------------------------------------------+-------------+
|NULL    |{2025-03-04 18:22:10, 2025-03-04 18:22:15}|NULL         |
|NULL    |{2025-03-04 18:16:25, 2025-03-04 18:16:30}|NULL         |
|NULL    |{2025-03-04 18:46:00, 2025-03-04 18:46:05}|NULL         |
|NULL    |{2025-03-04 18:39:35, 2025-03-04 18:39:40}|NULL         |
|NULL    |{2025-03-04 18:26:50, 2025-03-04 18:26:55}|NULL         |
+--------+------------------------------------------+-------------+
only showing top 5 rows

Processing batch 1
Number of rows in batch 1: 495
+--------+------------------------------------------+-------------+
|category|window                                    |total_revenue|
+--------+------------------------------------------+-------------+
|NULL    |{2025-03-04 18:22:10, 2025-03-04 18:22:15}|NULL  

In [18]:
query.stop()

Top Products by Purchases

In [20]:
# Count the number of purchases for each product
top_products = enriched_df.filter(enriched_df["event_type"] == "purchase") \
    .groupBy("product_id", "product_name") \
    .agg(count("event_type").alias("purchase_count")) \
    .orderBy(desc("purchase_count"))

In [21]:
def process_batch(batch_df, batch_id):
    print(f"Processing batch {batch_id}")
    batch_df.show(5, truncate=False)

query = top_products \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime="5 seconds") \
    .foreachBatch(process_batch) \
    .start()

In [22]:
query.stop()

Average Discount per Category

In [14]:
average_discount_by_category = enriched_df.groupBy("category").agg(avg("discount").alias("avg_discount"))

In [15]:
def process_batch(batch_df, batch_id):
    print(f"Processing batch {batch_id}")
    batch_df.show(5, truncate=False)

average_discount_by_category_query = average_discount_by_category \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime="5 seconds") \
    .foreachBatch(process_batch) \
    .start()

Processing batch 0
+-----------+-------------------+
|category   |avg_discount       |
+-----------+-------------------+
|Home       |0.2555555555555556 |
|Sports     |0.21875            |
|Electronics|0.24285714285714285|
|Clothing   |0.24400000000000005|
|Beauty     |0.2285             |
+-----------+-------------------+
only showing top 5 rows

Processing batch 1
+-----------+-------------------+
|category   |avg_discount       |
+-----------+-------------------+
|Home       |0.2555555555555556 |
|Sports     |0.21875            |
|Electronics|0.24285714285714285|
|Clothing   |0.24400000000000005|
|Beauty     |0.2285             |
+-----------+-------------------+
only showing top 5 rows

Processing batch 2
+-----------+-------------------+
|category   |avg_discount       |
+-----------+-------------------+
|Home       |0.2555555555555556 |
|Sports     |0.21875            |
|Electronics|0.24285714285714285|
|Clothing   |0.2554545454545455 |
|Beauty     |0.2285             |
+--------

In [16]:
average_discount_by_category_query.stop()

Most Active Users (by Event Type)

In [17]:
user_activity = enriched_df.groupBy("user_id", "user_name", "user_location") \
    .agg(count("event_type").alias("event_count")) \
    .orderBy(desc("event_count"))

In [18]:
def process_batch(batch_df, batch_id):
    print(f"Processing batch {batch_id}")
    batch_df.show(5, truncate=False)

user_activity_query = user_activity \
    .writeStream \
    .outputMode("complete") \
    .trigger(processingTime="5 seconds") \
    .foreachBatch(process_batch) \
    .start()

In [19]:
user_activity_query.stop()

In [29]:
# Example of writing total revenue by category to a Parquet file
total_revenue_by_category.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "C:/kafka-poc/output/revenue_by_category") \
    .option("checkpointLocation", "C:/kafka-poc/checkpoint/revenue_by_category") \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x2cb93fe47a0>

In [30]:
# Get all active streaming queries
active_queries = spark.streams.active

# Check if there are any active streaming queries
if len(active_queries) > 0:
    print("There are active streaming queries:")
    for query in active_queries:
        print(f"Query ID: {query.id}, Status: {query.status}")
else:
    print("No active streaming queries.")


There are active streaming queries:
Query ID: fa1ea7dd-1def-430a-9d00-1709378535a5, Status: {'message': 'Getting offsets from KafkaV2[Subscribe[realtimedashboard]]', 'isDataAvailable': False, 'isTriggerActive': True}


In [31]:
for query in spark.streams.active:
    print(f"Stopping query with ID: {query.id}")
    query.stop()

Stopping query with ID: fa1ea7dd-1def-430a-9d00-1709378535a5


In [32]:
# Get all active streaming queries
active_queries = spark.streams.active

# Check if there are any active streaming queries
if len(active_queries) > 0:
    print("There are active streaming queries:")
    for query in active_queries:
        print(f"Query ID: {query.id}, Status: {query.status}")
else:
    print("No active streaming queries.")


No active streaming queries.
