# PySpark Leaderboard Analysis - Local Mode

Notebook này chạy PySpark ở local mode để gen snapshot leaderboard data từ parquet file.

## Pipeline:
1. Đọc data từ Parquet file
2. Transform thành Score objects với event time
3. Tính tổng điểm trong sliding window (5 phút gần nhất)
4. Tính TopN với retraction logic
5. Snapshot mỗi 7 phút theo event time


# check lại ttl time sau

In [5]:
# Import required libraries
import os
import sys
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from collections import defaultdict, deque
import math

# PySpark imports
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window


print("Libraries imported successfully!")


Libraries imported successfully!


In [6]:
# Initialize Spark Session for local mode
spark = SparkSession.builder \
    .appName("LeaderBoardAnalysis") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Spark UI: http://localhost:4040")
print("Spark session created successfully!")


Spark version: 3.5.6
Spark UI: http://localhost:4040
Spark session created successfully!


In [7]:
# Helper functions
def parse_timestamp(timestamp_str: str) -> int:
    """Parse timestamp string to milliseconds"""
    try:
        # Try parsing ISO format
        dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
        return int(dt.timestamp() * 1000)
    except:
        # Fallback to current time
        return int(datetime.now().timestamp() * 1000)

def format_timestamp(timestamp: int) -> str:
    """Format timestamp for display"""
    dt = datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc)
    return dt.isoformat()

print("Helper functions defined successfully!")


Helper functions defined successfully!


In [27]:
# Step 1: Read data from Parquet file
input_path = "fixed-dataset.parquet"

if os.path.exists(input_path):
    print(f"Reading data from: {input_path}")
    user_data = spark.read.parquet(input_path)
    print(f"Data loaded successfully! Rows: {user_data.count()}")
    user_data.show(10, False)
    user_data.printSchema()
else:
    print(f"File not found: {input_path}")
    print("Available files in app-python directory:")
    if os.path.exists("app-python"):
        for file in os.listdir("app-python"):
            print(f"  - {file}")


Reading data from: fixed-dataset.parquet
Data loaded successfully! Rows: 1000000
+-------+-----+------------+-------+------+---+------------------------------------+--------------------------------+---------------+-------------------------------------------------------------------+---------+--------------------------------+--------------------------------+-----+-------------+--------------------------------+----+
|uid    |email|authProvider|appId  |avatar|geo|role                                |lastLoginAt                     |name           |devices                                                            |resources|created_at                      |updated_at                      |level|previousLevel|updatedAt                       |team|
+-------+-----+------------+-------+------+---+------------------------------------+--------------------------------+---------------+-------------------------------------------------------------------+---------+--------------------------------+---

In [30]:
# In ra 100 hàng đầu filter theo userId = 1
user_data.filter(col("uid") == "user_1").select("uid", "level").show(100, truncate=False)

+------+-----+
|uid   |level|
+------+-----+
|user_1|2    |
|user_1|3    |
|user_1|7    |
|user_1|8    |
|user_1|12   |
|user_1|16   |
|user_1|24   |
|user_1|26   |
|user_1|29   |
|user_1|30   |
|user_1|39   |
|user_1|47   |
|user_1|48   |
|user_1|50   |
|user_1|57   |
|user_1|64   |
|user_1|65   |
|user_1|73   |
|user_1|82   |
|user_1|84   |
|user_1|89   |
|user_1|91   |
|user_1|95   |
|user_1|97   |
|user_1|99   |
|user_1|106  |
|user_1|111  |
|user_1|116  |
|user_1|118  |
|user_1|122  |
|user_1|123  |
|user_1|128  |
|user_1|129  |
|user_1|133  |
|user_1|143  |
|user_1|153  |
|user_1|157  |
|user_1|160  |
|user_1|161  |
|user_1|169  |
|user_1|173  |
|user_1|175  |
|user_1|182  |
|user_1|190  |
|user_1|195  |
|user_1|204  |
|user_1|207  |
|user_1|208  |
|user_1|209  |
|user_1|213  |
|user_1|214  |
|user_1|217  |
|user_1|219  |
|user_1|220  |
|user_1|221  |
|user_1|222  |
|user_1|223  |
|user_1|230  |
|user_1|233  |
|user_1|241  |
|user_1|244  |
|user_1|249  |
|user_1|250  |
|user_1|25

In [16]:
user_scores = user_data.select(
    col("uid").alias("id"),
    when(col("level") - col("previousLevel") > 0, 
           col("level") - col("previousLevel"))
     .otherwise(0).alias("score"),
    ((to_timestamp(col("updatedAt")).cast("timestamp").cast("double") * 1000).cast("long").alias("lastUpdateTime"))
  
)
print(user_scores.show(n=20, truncate=False))

+-------+-----+--------------+
|id     |score|lastUpdateTime|
+-------+-----+--------------+
|user_2 |1    |1759312841696 |
|user_8 |6    |1759312841797 |
|user_8 |9    |1759312841897 |
|user_4 |1    |1759312841997 |
|user_14|1    |1759312842097 |
|user_16|3    |1759312842197 |
|user_20|4    |1759312842297 |
|user_20|2    |1759312842397 |
|user_16|2    |1759312842497 |
|user_20|8    |1759312842597 |
|user_15|2    |1759312842697 |
|user_17|3    |1759312842797 |
|user_19|8    |1759312842897 |
|user_9 |10   |1759312842997 |
|user_17|2    |1759312843097 |
|user_5 |2    |1759312843197 |
|user_20|2    |1759312843297 |
|user_14|7    |1759312843397 |
|user_10|1    |1759312843497 |
|user_11|5    |1759312843597 |
+-------+-----+--------------+
only showing top 20 rows

None


In [32]:
# Step 3: Calculate total scores in sliding window 
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as spark_sum, lag, col, when

window_size_minutes = 1
window_size_ms = window_size_minutes * 60 * 1000

print(f"Calculating sliding window scores for {window_size_minutes} minute window...")

# Create window specification for sliding window calculation
window_spec = Window.partitionBy("id").orderBy("lastUpdateTime")

# Calculate total score in sliding window using rangeBetween
total_scores = user_scores.withColumn(
    "totalScore",
    spark_sum("score").over(
        window_spec.rangeBetween(-window_size_ms, 0)
    )
).withColumn(
    "previousTotalScore",
    lag("totalScore", 1, 0.0).over(window_spec)
).select(
    col("id").alias("userId"),
    col("totalScore"),
    col("previousTotalScore"),
    col("lastUpdateTime")
)

print("done!")


Calculating sliding window scores for 1 minute window...
done!


In [25]:
print(f"Total scores calculated for {window_size_minutes} minute window!")
total_scores.filter(col("userId") == "user_1").show(100, False)
print(f"Total records: {total_scores.count()}")

Total scores calculated for 1 minute window!
+------+----------+------------------+--------------+
|userId|totalScore|previousTotalScore|lastUpdateTime|
+------+----------+------------------+--------------+
|user_1|2         |0                 |1759312851398 |
|user_1|3         |2                 |1759312853598 |
|user_1|7         |3                 |1759312856198 |
|user_1|8         |7                 |1759312857099 |
|user_1|12        |8                 |1759312857499 |
|user_1|16        |12                |1759312861399 |
|user_1|24        |16                |1759312863399 |
|user_1|26        |24                |1759312863499 |
|user_1|29        |26                |1759312866800 |
|user_1|30        |29                |1759312875400 |
|user_1|39        |30                |1759312876000 |
|user_1|47        |39                |1759312877300 |
|user_1|48        |47                |1759312879202 |
|user_1|50        |48                |1759312881902 |
|user_1|57        |50                

In [None]:
def calculate_leaderboard_at_snapshot(total_scores: DataFrame, snapshot_time: int, 
                                    top_n: int, cutoff_time: int) -> List[Dict]:

    
   
    
    print(f"  Snapshot at {format_timestamp(snapshot_time)}: using data from {format_timestamp(cutoff_time)} to {format_timestamp(snapshot_time)}")
    

    valid_scores = total_scores.filter(
        (col('lastUpdateTime') <= snapshot_time) & 
        (col('lastUpdateTime') > cutoff_time)
    ).collect()
    
    print(f"    Found {len(valid_scores)} valid scores in cleanup interval")
    
    # Group by user và lấy score mới nhất cho mỗi user
    user_latest_scores = {}
    for score in valid_scores:
        user_id = score['userId']
        if user_id not in user_latest_scores or score['lastUpdateTime'] > user_latest_scores[user_id]['lastUpdateTime']:
            user_latest_scores[user_id] = score
    
    print(f"    Found {len(user_latest_scores)} unique users")
    
    # Sort by total score và lấy top N
    sorted_users = sorted(user_latest_scores.values(), key=lambda x: x['totalScore'], reverse=True)
    
    # Tạo leaderboard entries cho snapshot này (format giống SnapshotTopNSink)
    snapshot_entries = []
    for i, user_score in enumerate(sorted_users[:top_n]):
        snapshot_entries.append({
            'userId': user_score['userId'],
            'score': user_score['totalScore'],  # Đổi từ totalScore thành score
            'lastUpdateTime': user_score['lastUpdateTime']
        })
    
    print(f"    Generated {len(snapshot_entries)} leaderboard entries (Top-{top_n})")
    return snapshot_entries

def generate_snapshots(total_scores: DataFrame, top_n: int, 
                                            cleanup_interval_minutes: int = 5,
                                            snapshot_interval_minutes: int = 7) -> List[Dict]:
    """Generate snapshots với logic retractable TopN đúng"""
    
    # Lấy tất cả timestamps
    all_timestamps = [row['lastUpdateTime'] for row in total_scores.select('lastUpdateTime').distinct().collect()]
    all_timestamps.sort()
    
    if not all_timestamps:
        return []
    
    first_timestamp = all_timestamps[0]
    last_timestamp = all_timestamps[-1]
    
    # Convert intervals to milliseconds
    snapshot_interval_ms = snapshot_interval_minutes * 60 * 1000
    cleanup_interval_ms = cleanup_interval_minutes * 60 * 1000
    
    print(f"Data range: {format_timestamp(first_timestamp)} to {format_timestamp(last_timestamp)}")
    print(f"Cleanup interval: {cleanup_interval_minutes} minutes")
    print(f"Snapshot interval: {snapshot_interval_minutes} minutes")
    
    # Generate snapshot times
    snapshot_times = []
    current_snapshot_time = first_timestamp + snapshot_interval_ms
    
    while current_snapshot_time <= last_timestamp:
        snapshot_times.append(current_snapshot_time)
        current_snapshot_time += snapshot_interval_ms
    
    # Generate cleanup times để debug
    cleanup_times = []
    # Cleanup đầu tiên = first_timestamp + cleanup_interval_ms * 2 (như trong Flink code)
    first_cleanup_time = first_timestamp + cleanup_interval_ms * 2
    current_cleanup_time = first_cleanup_time
    
    while current_cleanup_time <= last_timestamp:
        cleanup_times.append(current_cleanup_time)
        current_cleanup_time += cleanup_interval_ms
    
    print(f"\nCleanup times (for reference):")
    for i, ts in enumerate(cleanup_times[:10]):  # Show first 10
        print(f"  Cleanup {i+1}: {format_timestamp(ts)}")
    if len(cleanup_times) > 10:
        print(f"  ... and {len(cleanup_times) - 10} more")

    
    # Tính leaderboard tại các snapshot times
    all_snapshots = []
    # Sử dụng bisect để tìm cleanup_time lớn nhất mà < snapshot_time một cách hiệu quả
    import bisect

    cleanup_times_sorted = sorted(cleanup_times)
    for i, snapshot_time in enumerate(snapshot_times):
        # Tìm vị trí chèn snapshot_time vào cleanup_times_sorted
        idx = bisect.bisect_left(cleanup_times_sorted, snapshot_time)
        cutoff_time = (
            cleanup_times_sorted[0] - cleanup_interval_ms if idx == 1
            else cleanup_times_sorted[idx-2] if idx > 1
            else first_timestamp
        )

        snapshot_entries = calculate_leaderboard_at_snapshot(
            total_scores, snapshot_time, top_n, cutoff_time
        )
        
        # Format giống SnapshotTopNSink: Tuple2<timestamp, List<Score>>
        snapshot_data = {
            '_id': snapshot_time,
            'users': snapshot_entries
        }
        all_snapshots.append(snapshot_data)
    
    return all_snapshots

print("Retractable TopN snapshot functions defined successfully!")

Retractable TopN snapshot functions defined successfully!


In [34]:
# Step 5: Generate leaderboard snapshots
top_n = 10
cleanup_interval_minutes = 5
snapshot_interval_minutes = 7

print(f"Generating leaderboard snapshots with:")
print(f"  Top N: {top_n}")
print(f"  TTL: {cleanup_interval_minutes} minutes")
print(f"  Snapshot interval: {snapshot_interval_minutes} minutes")

snapshots = generate_snapshots(
    total_scores, 
    top_n, 
    cleanup_interval_minutes,
    snapshot_interval_minutes
)

print(f"\nGenerated {len(snapshots)} leaderboard entries across snapshots.")


Generating leaderboard snapshots with:
  Top N: 10
  TTL: 5 minutes
  Snapshot interval: 7 minutes
Data range: 2025-10-01T10:00:41.696000+00:00 to 2025-10-02T13:47:31.286000+00:00
Cleanup interval: 5 minutes
Snapshot interval: 7 minutes

Generated 238 snapshot times:
  Snapshot 1: 2025-10-01T10:07:41.696000+00:00
  Snapshot 2: 2025-10-01T10:14:41.696000+00:00
  Snapshot 3: 2025-10-01T10:21:41.696000+00:00
  Snapshot 4: 2025-10-01T10:28:41.696000+00:00
  Snapshot 5: 2025-10-01T10:35:41.696000+00:00
  Snapshot 6: 2025-10-01T10:42:41.696000+00:00
  Snapshot 7: 2025-10-01T10:49:41.696000+00:00
  Snapshot 8: 2025-10-01T10:56:41.696000+00:00
  Snapshot 9: 2025-10-01T11:03:41.696000+00:00
  Snapshot 10: 2025-10-01T11:10:41.696000+00:00
  Snapshot 11: 2025-10-01T11:17:41.696000+00:00
  Snapshot 12: 2025-10-01T11:24:41.696000+00:00
  Snapshot 13: 2025-10-01T11:31:41.696000+00:00
  Snapshot 14: 2025-10-01T11:38:41.696000+00:00
  Snapshot 15: 2025-10-01T11:45:41.696000+00:00
  Snapshot 16: 2025-1

In [38]:
snapshots[0]

{'_id': 1759313261696,
 'users': [{'userId': 'user_17',
   'score': 170,
   'lastUpdateTime': 1759313261133},
  {'userId': 'user_10', 'score': 148, 'lastUpdateTime': 1759313260333},
  {'userId': 'user_19', 'score': 132, 'lastUpdateTime': 1759313260933},
  {'userId': 'user_16', 'score': 131, 'lastUpdateTime': 1759313257733},
  {'userId': 'user_1', 'score': 130, 'lastUpdateTime': 1759313256433},
  {'userId': 'user_11', 'score': 130, 'lastUpdateTime': 1759313261033},
  {'userId': 'user_8', 'score': 128, 'lastUpdateTime': 1759313261233},
  {'userId': 'user_15', 'score': 124, 'lastUpdateTime': 1759313260433},
  {'userId': 'user_20', 'score': 121, 'lastUpdateTime': 1759313256133},
  {'userId': 'user_12', 'score': 120, 'lastUpdateTime': 1759313261533}]}

In [None]:
# Step 6: Display results
if snapshots:
    # Convert to DataFrame for better display
    snapshot_data = []
    for snapshot in snapshots:
        snapshot_time = snapshot['_id']
        users = snapshot['users']
        
        for user in users:
            snapshot_data.append({
                'userId': user['userId'],
                'score': user['score'],  # Đổi từ totalScore thành score
                'lastUpdateTime': user['lastUpdateTime'],
                'snapshotTime': snapshot_time,
                'snapshotTimeFormatted': format_timestamp(snapshot_time),
                'lastUpdateTimeFormatted': format_timestamp(user['lastUpdateTime'])
            })
    
    snapshot_df = spark.createDataFrame(snapshot_data)
    
    print("\n=== LEADERBOARD SNAPSHOTS ===")
    snapshot_df.orderBy("snapshotTime", "score").show(50, False)
    
    # Show summary by snapshot time
    print("\n=== SNAPSHOT SUMMARY ===")
    snapshot_summary = snapshot_df.groupBy("snapshotTimeFormatted") \
        .agg(
            count("userId").alias("userCount"),
            max("score").alias("maxScore"),  # Đổi từ totalScore thành score
            min("score").alias("minScore"),  # Đổi từ totalScore thành score
            avg("score").alias("avgScore")   # Đổi từ totalScore thành score
        ) \
        .orderBy("snapshotTimeFormatted")
    
    snapshot_summary.show(20, False)
    
    # Show top users across all snapshots
    print("\n=== TOP USERS ACROSS ALL SNAPSHOTS ===")
    top_users = snapshot_df.groupBy("userId") \
        .agg(
            count("score").alias("snapshotCount"),  # Đổi từ rank thành score
            max("score").alias("maxScore"),         # Đổi từ totalScore thành score
            avg("score").alias("avgScore")          # Đổi từ totalScore thành score
        ) \
        .orderBy(desc("maxScore"), desc("avgScore"))
    
    top_users.show(20, False)
    
else:
    print("No snapshots generated!")

In [54]:
# Step 7: Save results to file
import os
output_path = "leaderboard_snapshots/raw_snapshots.json"


if snapshots:
    # Create output directory if it doesn't exist
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    # Convert snapshots to DataFrame for saving
    snapshot_data = []
    for snapshot in snapshots:
        snapshot_time = snapshot['_id']
        users = snapshot['users']
        
        for user in users:
            snapshot_data.append({
                'userId': user['userId'],
                'score': user['score'],  # Đổi từ totalScore thành score
                'lastUpdateTime': user['lastUpdateTime'],
                'snapshotTime': snapshot_time,
                'snapshotTimeFormatted': format_timestamp(snapshot_time),
                'lastUpdateTimeFormatted': format_timestamp(user['lastUpdateTime'])
            })
    
    snapshot_df = spark.createDataFrame(snapshot_data)
    

    
    # Save raw snapshots in MongoDB format
    import json
    raw_snapshots_path = "leaderboard_snapshots/raw_snapshots.json"
    with open(raw_snapshots_path, 'w') as f:
        json.dump(snapshots, f, indent=2)
    print(f"Raw snapshots (MongoDB format) saved to: {raw_snapshots_path}")
    
else:
    print("No snapshots to save!")

Raw snapshots (MongoDB format) saved to: leaderboard_snapshots/raw_snapshots.json


In [56]:
# Clean up
print("Stopping Spark session...")
spark.stop()
print("Spark session stopped successfully!")


Stopping Spark session...
Spark session stopped successfully!
