In [1]:
!pip install kafka-python pyspark psycopg2-binary pandas


Collecting kafka-python
  Downloading kafka_python-2.1.5-py2.py3-none-any.whl.metadata (9.2 kB)
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Downloading kafka_python-2.1.5-py2.py3-none-any.whl (285 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m285.4/285.4 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python, psycopg2-binary
Successfully installed kafka-python-2.1.5 psycopg2-binary-2.9.10


In [2]:
import random
import json
import time

# Simulate product activity data
def generate_event():
    products = ['Shoes', 'Laptop', 'Headphones', 'Book']
    actions = ['click', 'view', 'purchase']
    return {
        "user_id": random.randint(1, 100),
        "product": random.choice(products),
        "action": random.choice(actions),
        "timestamp": time.time()
    }

# Generate 5 sample events
for _ in range(5):
    print(json.dumps(generate_event()))


{"user_id": 2, "product": "Laptop", "action": "view", "timestamp": 1744093333.5063}
{"user_id": 7, "product": "Book", "action": "click", "timestamp": 1744093333.506439}
{"user_id": 100, "product": "Shoes", "action": "purchase", "timestamp": 1744093333.5064688}
{"user_id": 32, "product": "Book", "action": "purchase", "timestamp": 1744093333.506485}
{"user_id": 93, "product": "Headphones", "action": "purchase", "timestamp": 1744093333.5064974}


In [3]:
import random
import json
import time
import pandas as pd

def generate_event():
    products = ['Shoes', 'Laptop', 'Headphones', 'Book', 'Watch']
    actions = ['click', 'view', 'purchase']
    return {
        "user_id": random.randint(1000, 9999),
        "product": random.choice(products),
        "action": random.choice(actions),
        "timestamp": pd.Timestamp.now().isoformat()
    }

# Generate 20 sample events and store in DataFrame
events = [generate_event() for _ in range(20)]
df_events = pd.DataFrame(events)
df_events.head()


Unnamed: 0,user_id,product,action,timestamp
0,1576,Laptop,purchase,2025-04-08T06:23:15.613178
1,6248,Headphones,purchase,2025-04-08T06:23:15.613374
2,4336,Laptop,view,2025-04-08T06:23:15.613405
3,4991,Watch,view,2025-04-08T06:23:15.613421
4,9698,Book,click,2025-04-08T06:23:15.613436


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Create a Spark session
spark = SparkSession.builder \
    .appName("ProductActivityStreaming") \
    .getOrCreate()

# Convert pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df_events)

# Show schema and first few rows
spark_df.printSchema()
spark_df.show(5)


root
 |-- user_id: long (nullable = true)
 |-- product: string (nullable = true)
 |-- action: string (nullable = true)
 |-- timestamp: string (nullable = true)

+-------+----------+--------+--------------------+
|user_id|   product|  action|           timestamp|
+-------+----------+--------+--------------------+
|   1576|    Laptop|purchase|2025-04-08T06:23:...|
|   6248|Headphones|purchase|2025-04-08T06:23:...|
|   4336|    Laptop|    view|2025-04-08T06:23:...|
|   4991|     Watch|    view|2025-04-08T06:23:...|
|   9698|      Book|   click|2025-04-08T06:23:...|
+-------+----------+--------+--------------------+
only showing top 5 rows



In [5]:
# 1. Count of each action type (view, click, purchase)
action_counts = spark_df.groupBy("action").count()
print("👉 Action Distribution:")
action_counts.show()

# 2. Most popular product (based on all actions)
popular_products = spark_df.groupBy("product").count().orderBy("count", ascending=False)
print("🔥 Most Popular Products:")
popular_products.show()

# 3. User engagement - how many actions each user performed
user_activity = spark_df.groupBy("user_id").count().orderBy("count", ascending=False)
print("🙋‍♂️ User Activity:")
user_activity.show()


👉 Action Distribution:
+--------+-----+
|  action|count|
+--------+-----+
|purchase|    9|
|    view|    5|
|   click|    6|
+--------+-----+

🔥 Most Popular Products:
+----------+-----+
|   product|count|
+----------+-----+
|    Laptop|    5|
|      Book|    4|
|     Watch|    4|
|Headphones|    4|
|     Shoes|    3|
+----------+-----+

🙋‍♂️ User Activity:
+-------+-----+
|user_id|count|
+-------+-----+
|   6445|    1|
|   4033|    1|
|   9698|    1|
|   1576|    1|
|   4336|    1|
|   6160|    1|
|   6248|    1|
|   4991|    1|
|   8817|    1|
|   2042|    1|
|   2517|    1|
|   9126|    1|
|   1404|    1|
|   6562|    1|
|   3913|    1|
|   7375|    1|
|   9078|    1|
|   7291|    1|
|   3140|    1|
|   3007|    1|
+-------+-----+



In [6]:
# Save raw simulated data
df_events.to_csv("raw_events.csv", index=False)

# Save processed Spark outputs (convert to pandas first)
action_counts.toPandas().to_csv("action_counts.csv", index=False)
popular_products.toPandas().to_csv("popular_products.csv", index=False)
user_activity.toPandas().to_csv("user_activity.csv", index=False)
