In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, TimestampType

spark = SparkSession.builder.appName("UserActivityMonitoring").getOrCreate()

# Define schema for incoming data
schema = StructType() \
    .add("user_id", StringType()) \
    .add("event_type", StringType()) \
    .add("page", StringType()) \
    .add("timestamp", TimestampType())

# Read from Kafka or socket
activity_df = spark.readStream.format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")


In [4]:
from pyspark.sql.functions import window

# Active users in the last 5 minutes
active_users = activity_df \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window("timestamp", "5 minutes"), "user_id") \
    .count()

# Most visited pages in the last 10 minutes
page_views = activity_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(window("timestamp", "10 minutes"), "page") \
    .count() #\
    #.orderBy("count", ascending=False)


In [5]:
# Output active users to console
active_users.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# Output page views to console
page_views.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()


<pyspark.sql.streaming.StreamingQuery at 0x26a754c32e0>

In [7]:
historical_df = spark.read.csv("./dataset/airports.csv", header=True, schema=schema)

# Total unique users
total_users = historical_df.select("user_id").distinct().count()

# Popular pages
popular_pages = historical_df.groupBy("page").count().orderBy("count", ascending=False)


In [8]:
historical_df.createOrReplaceTempView("activity")

# Query for specific insights
popular_pages_sql = spark.sql("SELECT page, COUNT(*) AS views FROM activity GROUP BY page ORDER BY views DESC")
popular_pages_sql.show()


+----+-----+
|page|views|
+----+-----+
|  CA|   32|
|  TX|   29|
|  AK|   22|
|  FL|   20|
|  NY|   14|
|  MI|   12|
|  NC|   11|
|  CO|   11|
|  GA|    8|
|  WI|    8|
|  IL|    8|
|  MT|    8|
|  OR|    7|
|  PA|    7|
|  MO|    7|
|  WA|    7|
|  LA|    7|
|  MS|    7|
|  HI|    7|
|  OH|    6|
+----+-----+
only showing top 20 rows



In [None]:
#Concepts Applied
#Structured Streaming: For real-time user activity monitoring.
#Batch Processing: For analyzing historical data.
#Spark SQL: For querying data and aggregations.
#DataFrame and RDD Operations: Used to load and transform data.
#Window Operations: To group data by time intervals.
#File Systems: For storing historical data and intermediate outputs.
#Fault Tolerance and Checkpointing: Ensuring resilience in the streaming pipeline.