In [0]:
# Make sure the input directory exists
dbutils.fs.mkdirs("dbfs:/tmp/clickstream/input/")

True

In [0]:
import json
import random
import time
import os
from datetime import datetime

input_dir = "/dbfs/tmp/clickstream/input/"
os.makedirs(input_dir, exist_ok=True)

users = ["alice", "bob", "charlie", "david"]
pages = ["home", "about", "product", "cart", "checkout"]

for i in range(20):
    event = {
        "user_id": random.choice(users),
        "page": random.choice(pages),
        "timestamp": datetime.utcnow().isoformat()
    }
    file_path = os.path.join(input_dir, f"event_{int(time.time())}.json")
    with open(file_path, "w") as f:
        f.write(json.dumps(event))
    print(f"Wrote: {file_path}")
    time.sleep(2)


Wrote: /dbfs/tmp/clickstream/input/event_1754157849.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157851.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157853.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157855.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157857.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157859.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157861.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157863.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157866.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157868.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157870.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157872.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157874.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157876.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157878.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157880.json
Wrote: /dbfs/tmp/clickstream/input/event_1754157882.json
Wrote: /dbfs/tmp/clickstream/in

In [0]:
from pyspark.sql.types import StructType, StringType, TimestampType
from pyspark.sql.functions import col

# Define the schema for the JSON input
schema = StructType() \
    .add("user_id", StringType()) \
    .add("page", StringType()) \
    .add("timestamp", StringType())  # We'll cast this to TimestampType

# Read streaming input
input_path = "/tmp/clickstream/input/"

stream_df = (
    spark.readStream
    .format("json")
    .schema(schema)
    .load(input_path)
)


In [0]:
# Convert timestamp string to proper timestamp type
clean_df = stream_df.withColumn("timestamp", col("timestamp").cast(TimestampType()))


In [0]:
output_path = "/tmp/clickstream/output/"
checkpoint_path = "/tmp/clickstream/checkpoint/"

query = (
    clean_df.writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_path)
    .outputMode("append")
    .start(output_path)
)


In [0]:
from pyspark.sql.functions import count

df = spark.read.format("delta").load("/tmp/clickstream/output/")
df.createOrReplaceTempView("clickstream_data")


In [0]:
%sql
SELECT page, COUNT(*) AS total_clicks
FROM clickstream_data
GROUP BY page
ORDER BY total_clicks DESC


page,total_clicks
checkout,12
home,11
cart,6
about,6
product,5


In [0]:
%sql
SELECT user_id, COUNT(*) AS events
FROM clickstream_data
GROUP BY user_id
ORDER BY events DESC


user_id,events
david,12
charlie,12
alice,9
bob,7


In [0]:
%sql
SELECT *
FROM clickstream_data
ORDER BY timestamp DESC
LIMIT 10


user_id,page,timestamp
charlie,checkout,2025-08-02T18:04:48.96236Z
david,checkout,2025-08-02T18:04:46.883422Z
alice,about,2025-08-02T18:04:44.791391Z
alice,product,2025-08-02T18:04:42.724713Z
charlie,home,2025-08-02T18:04:40.649998Z
bob,home,2025-08-02T18:04:38.575405Z
alice,about,2025-08-02T18:04:36.512612Z
charlie,about,2025-08-02T18:04:34.411231Z
bob,cart,2025-08-02T18:04:32.255589Z
bob,home,2025-08-02T18:04:30.19083Z


In [0]:
df.orderBy("timestamp", ascending=False).show(10, truncate=False)


+-------+--------+--------------------------+
|user_id|page    |timestamp                 |
+-------+--------+--------------------------+
|charlie|checkout|2025-08-02 18:04:48.96236 |
|david  |checkout|2025-08-02 18:04:46.883422|
|alice  |about   |2025-08-02 18:04:44.791391|
|alice  |product |2025-08-02 18:04:42.724713|
|charlie|home    |2025-08-02 18:04:40.649998|
|bob    |home    |2025-08-02 18:04:38.575405|
|alice  |about   |2025-08-02 18:04:36.512612|
|charlie|about   |2025-08-02 18:04:34.411231|
|bob    |cart    |2025-08-02 18:04:32.255589|
|bob    |home    |2025-08-02 18:04:30.19083 |
+-------+--------+--------------------------+
only showing top 10 rows

