In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Create SparkSession
spark = SparkSession.builder \
    .appName("User Metrics Analysis") \
    .getOrCreate()

In [12]:
# Define the schema based on the provided data
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("page_views", IntegerType(), True),
    StructField("time_on_page", IntegerType(), True),
    StructField("referrer", StringType(), True),
    StructField("device_type", StringType(), True),
    StructField("browser", StringType(), True),
    StructField("conversion", BooleanType(), True),
    StructField("landing_page", StringType(), True),
    StructField("location", StringType(), True),
    StructField("timestamp", DateType(), True)
])

In [13]:
# Read the CSV file
df = spark.read.csv("hdfs://namenode:9000/user/jovyan/notebooks/work/data/website_data_1.csv", header=True, schema=schema)

In [15]:
# 1. Basic statistics
basic_stats = df.select(
    count("user_id").alias("total_users"),
    sum("page_views").alias("total_page_views"),
    avg("time_on_page").alias("avg_time_on_page"),
    sum(when(col("conversion") == True, 1).otherwise(0)).alias("total_conversions")
)

# 2. Device type distribution
device_distribution = df.groupBy("device_type").count().orderBy(desc("count"))

# 3. Browser distribution
browser_distribution = df.groupBy("browser").count().orderBy(desc("count"))

# 4. Conversion rate by device type
conversion_by_device = df.groupBy("device_type").agg(
    count("user_id").alias("total_users"),
    sum(when(col("conversion") == True, 1).otherwise(0)).alias("conversions")
).withColumn("conversion_rate", col("conversions") / col("total_users"))

# 5. Top referrers
top_referrers = df.groupBy("referrer").count().orderBy(desc("count")).limit(10)

# 6. User engagement metrics
user_engagement = df.groupBy("user_id").agg(
    sum("page_views").alias("total_page_views"),
    avg("time_on_page").alias("avg_time_on_page"),
    count("*").alias("session_count")
).orderBy(desc("total_page_views"))

# 7. Daily active users
daily_active_users = df.groupBy("timestamp").agg(
    countDistinct("user_id").alias("daily_active_users")
).orderBy("timestamp")

# 8. Conversion funnel
conversion_funnel = df.agg(
    count("user_id").alias("total_users"),
    sum(when(col("page_views") > 0, 1).otherwise(0)).alias("viewed_page"),
    sum(when(col("time_on_page") > 60, 1).otherwise(0)).alias("engaged_users"),
    sum(when(col("conversion") == True, 1).otherwise(0)).alias("converted_users")
)

# 9. User segmentation based on engagement
engagement_window = Window.orderBy(desc("total_page_views"))
user_segments = user_engagement.withColumn(
    "segment", 
    when(percent_rank().over(engagement_window) < 0.2, "High Engagement")
    .when(percent_rank().over(engagement_window) < 0.6, "Medium Engagement")
    .otherwise("Low Engagement")
)

# 10. Location-based analysis
location_analysis = df.groupBy("location").agg(
    count("user_id").alias("total_users"),
    avg("page_views").alias("avg_page_views"),
    avg("time_on_page").alias("avg_time_on_page"),
    sum(when(col("conversion") == True, 1).otherwise(0)).alias("conversions")
).orderBy(desc("total_users"))

In [16]:
# Display results
print("Basic Statistics:")
basic_stats.show()

print("Device Type Distribution:")
device_distribution.show()

print("Browser Distribution:")
browser_distribution.show()

print("Conversion Rate by Device Type:")
conversion_by_device.show()

print("Top Referrers:")
top_referrers.show()

print("User Engagement Metrics (Top 10):")
user_engagement.show(10)

print("Daily Active Users:")
daily_active_users.show()

print("Conversion Funnel:")
conversion_funnel.show()

print("User Segmentation:")
user_segments.groupBy("segment").count().orderBy("segment").show()

print("Location-based Analysis (Top 10):")
location_analysis.show(10)


Basic Statistics:


Py4JJavaError: An error occurred while calling o287.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 15) (172.20.0.7 executor 0): java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = 823754013007382808, local class serialVersionUID = 3516924559342767982
	at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(Unknown Source)
	at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = 823754013007382808, local class serialVersionUID = 3516924559342767982
	at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(Unknown Source)
	at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
