In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import to_timestamp

schema = StructType([
    StructField("player_id", StringType(), True),
    StructField("player_name", StringType(), True),
    StructField("country", StringType(), True),
    StructField("session_id", StringType(), True),
    StructField("task", StringType(), True),
    StructField("amount_spent", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

df = (spark.read
      .option("header", True)
      .schema(schema)
      .csv("/Volumes/gaming/gamingengagement/raw_data/player_engagement.csv")
      .withColumn("timestamp", to_timestamp("timestamp", "dd-MM-yyyy HH:mm"))
)

display(df.limit(5))


player_id,player_name,country,session_id,task,amount_spent,timestamp
P016,Trevor,Brazil,S026,logout,0.0,2025-08-16T15:33:00.000Z
P019,Melissa,Brazil,S021,login,0.0,2025-08-23T05:02:00.000Z
P005,Kimberly,Brazil,S029,login,0.0,2025-08-11T10:45:00.000Z
P017,Joyce,India,S028,logout,0.0,2025-08-14T04:20:00.000Z
P002,Vanessa,Germany,S012,logout,0.0,2025-08-07T13:19:00.000Z


In [0]:
from pyspark.sql.functions import sum as spark_sum

total_purchases = (df.filter(df.task == "purchase")
                     .groupBy("player_id", "player_name")
                     .agg(spark_sum("amount_spent").alias("total_spent"))
                     .orderBy("total_spent", ascending=False))

display(total_purchases)


player_id,player_name,total_spent
P017,Joyce,1530.0
P020,Stephanie,867.0
P007,Anthony,606.0
P014,Doris,602.0
P001,Timothy,566.0
P015,Robert,543.0
P012,Tommy,487.0
P010,Kelly,375.0
P019,Melissa,369.0
P006,Jessica,280.0


In [0]:
from pyspark.sql.functions import count

# Count number of events per session
session_lengths = (df.groupBy("player_id", "player_name", "session_id")
                     .agg(
                         spark_min("timestamp").alias("session_start"),
                         spark_max("timestamp").alias("session_end"),
                         count("*").alias("event_count")
                     )
                     .withColumn(
                         "session_length_hours", 
                         (col("session_end").cast("long") - col("session_start").cast("long")) / 3600
                     )
                  )

# Filter sessions with at least 2 events
session_lengths_filtered = session_lengths.filter(col("event_count") > 1)

# Compute max session length per player
max_session_length = (session_lengths_filtered.groupBy("player_id", "player_name")
                        .agg({"session_length_hours": "max"})
                        .withColumnRenamed("max(session_length_hours)", "max_session_length_hours")
                        .orderBy("max_session_length_hours", ascending=False))

display(max_session_length)


player_id,player_name,max_session_length_hours
P017,Joyce,537.8166666666667
P005,Kimberly,423.81666666666666
P006,Jessica,280.25
P009,Larry,28.65
P013,Jason,10.55


In [0]:
from pyspark.sql.functions import when

session_lengths = (df.groupBy("player_id", "player_name", "session_id")
                     .agg(
                         spark_min("timestamp").alias("session_start"),
                         spark_max("timestamp").alias("session_end"),
                         count("*").alias("event_count")
                     )
                     .withColumn(
                         "session_length_hours", 
                         (col("session_end").cast("long") - col("session_start").cast("long")) / 3600
                     )
                     # Assign minimum 0.1 hours if only 1 event
                     .withColumn(
                         "session_length_hours",
                         when(col("event_count") == 1, 0.1).otherwise(col("session_length_hours"))
                     )
                  )

max_session_length = (session_lengths.groupBy("player_id", "player_name")
                        .agg({"session_length_hours": "max"})
                        .withColumnRenamed("max(session_length_hours)", "max_session_length_hours")
                        .orderBy("max_session_length_hours", ascending=False))

display(max_session_length)


player_id,player_name,max_session_length_hours
P017,Joyce,537.8166666666667
P005,Kimberly,423.81666666666666
P006,Jessica,280.25
P009,Larry,28.65
P013,Jason,10.55
P019,Melissa,0.1
P003,Danny,0.1
P002,Vanessa,0.1
P015,Robert,0.1
P010,Kelly,0.1


In [0]:
from pyspark.sql.functions import when, col, min as spark_min, max as spark_max, count

session_lengths = (df.groupBy("player_id", "player_name", "session_id")
                     .agg(
                         spark_min("timestamp").alias("session_start"),
                         spark_max("timestamp").alias("session_end"),
                         count("*").alias("event_count")
                     )
                     .withColumn(
                         "session_length_minutes", 
                         (col("session_end").cast("long") - col("session_start").cast("long")) / 60
                     )
                     # Assign minimum 5 minutes if only 1 event
                     .withColumn(
                         "session_length_minutes",
                         when(col("event_count") == 1, 5).otherwise(col("session_length_minutes"))
                     )
                  )

max_session_length = (session_lengths.groupBy("player_id", "player_name")
                        .agg({"session_length_minutes": "max"})
                        .withColumnRenamed("max(session_length_minutes)", "max_session_length_minutes")
                        .orderBy("max_session_length_minutes", ascending=False))

display(max_session_length)


player_id,player_name,max_session_length_minutes
P017,Joyce,32269.0
P005,Kimberly,25429.0
P006,Jessica,16815.0
P009,Larry,1719.0
P013,Jason,633.0
P019,Melissa,5.0
P003,Danny,5.0
P002,Vanessa,5.0
P015,Robert,5.0
P010,Kelly,5.0


In [0]:
df.createOrReplaceTempView("player_events")


In [0]:
top5_spenders = spark.sql("""
    SELECT player_id, player_name, SUM(amount_spent) AS total_spent
    FROM player_events
    WHERE task = 'purchase'
    GROUP BY player_id, player_name
    ORDER BY total_spent DESC
    LIMIT 5
""")
display(top5_spenders)


player_id,player_name,total_spent
P017,Joyce,1530.0
P020,Stephanie,867.0
P007,Anthony,606.0
P014,Doris,602.0
P001,Timothy,566.0


In [0]:
# Create a month column
events_with_month = spark.sql("""
    SELECT *, DATE_FORMAT(timestamp, 'yyyy-MM') AS year_month
    FROM player_events
""")
events_with_month.createOrReplaceTempView("player_events_month")

# Active players by month
active_players = spark.sql("""
    SELECT DISTINCT player_id, year_month
    FROM player_events_month
    WHERE task = 'login'
""")
active_players.createOrReplaceTempView("active_players")


In [0]:
churn = spark.sql("""
WITH months AS (
    SELECT DISTINCT year_month FROM active_players
    ORDER BY year_month DESC
    LIMIT 2
),
last_month AS (
    SELECT year_month FROM months ORDER BY year_month ASC LIMIT 1
),
this_month AS (
    SELECT year_month FROM months ORDER BY year_month DESC LIMIT 1
),
active_last AS (
    SELECT player_id FROM active_players WHERE year_month = (SELECT year_month FROM last_month)
),
active_this AS (
    SELECT player_id FROM active_players WHERE year_month = (SELECT year_month FROM this_month)
),
churned AS (
    SELECT l.player_id
    FROM active_last l
    LEFT JOIN active_this t ON l.player_id = t.player_id
    WHERE t.player_id IS NULL
)
SELECT 
    COUNT(DISTINCT c.player_id) * 100.0 / COUNT(DISTINCT l.player_id) AS churn_rate
FROM active_last l
LEFT JOIN churned c ON l.player_id = c.player_id
""")

display(churn)


churn_rate
83.33333333333333


In [0]:
events_df = df 
events_df.createOrReplaceTempView("player_events")


In [0]:
%sql
-- COMMAND ----------
-- SQL cell: Top 5 spenders
SELECT player_id, player_name, SUM(amount_spent) AS total_spent
FROM player_events
WHERE task = 'purchase'
GROUP BY player_id, player_name
ORDER BY total_spent DESC
LIMIT 5;


player_id,player_name,total_spent
P017,Joyce,1530.0
P020,Stephanie,867.0
P007,Anthony,606.0
P014,Doris,602.0
P001,Timothy,566.0


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- COMMAND ----------
WITH monthly_logins AS (
  SELECT DISTINCT player_id, date_format(timestamp, 'yyyy-MM') AS ym
  FROM player_events
  WHERE task = 'login'
),
months AS (
  SELECT DISTINCT ym
  FROM monthly_logins
),
months_ordered AS (
  SELECT ym, lead(ym) OVER (ORDER BY ym) AS next_ym
  FROM (SELECT DISTINCT ym FROM months) t
)
SELECT
  m.ym AS month,
  -- players active in this month
  (SELECT COUNT(DISTINCT player_id) FROM monthly_logins WHERE ym = m.ym) AS active_count,
  -- players active in this month but NOT in next month
  (SELECT COUNT(DISTINCT ml1.player_id) 
     FROM monthly_logins ml1
     LEFT JOIN monthly_logins ml2
     ON ml1.player_id = ml2.player_id AND ml2.ym = m.next_ym
     WHERE ml1.ym = m.ym AND ml2.player_id IS NULL
  ) AS churn_count,
  -- churn rate %
  ROUND(
    100.0 * (SELECT COUNT(DISTINCT ml1.player_id) 
             FROM monthly_logins ml1
             LEFT JOIN monthly_logins ml2
             ON ml1.player_id = ml2.player_id AND ml2.ym = m.next_ym
             WHERE ml1.ym = m.ym AND ml2.player_id IS NULL
    ) / NULLIF((SELECT COUNT(DISTINCT player_id) FROM monthly_logins WHERE ym = m.ym), 0)
  , 2) AS churn_rate_percent
FROM months_ordered m
WHERE m.next_ym IS NOT NULL
ORDER BY month;

month,active_count,churn_count,churn_rate_percent
2025-08,12,10,83.33


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- COMMAND ----------
SELECT country, SUM(amount_spent) AS total_spend, COUNT(DISTINCT player_id) AS players
FROM player_events
WHERE task = 'purchase'
GROUP BY country
ORDER BY total_spend DESC;


country,total_spend,players
Spain,1966.0,4
India,1905.0,2
Brazil,1362.0,4
Germany,1147.0,2
UK,602.0,1
USA,490.0,2


Databricks visualization. Run in Databricks to view.