In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

spark = SparkSession.builder \
    .appName("Exemple SparkSession") \
    .master("local[2]") \
    .getOrCreate()

print(spark.version)

In [None]:
# read data
df_events = spark.read.csv("data/events_2025-10-*", header=True)
df_events.show(5, truncate=False)

df_transactions = spark.read.csv("data/transactions_2025-10-*", header=True)
df_transactions.show(5, truncate=False)

In [None]:
# merge data
df_joined = df_transactions.join(df_events, on=["transaction_id"], how="left")
df_joined.show(truncate=False, n=3)

In [None]:
# add aggregations
df_aggregation = df_joined.withColumn("load_date", F.to_date("load_time")) \
    .groupBy(["load_date", "file_name"]).agg(
        F.count("*").alias("nb_total"),
        F.sum(F.when(
            (F.col("status") != "OK") | 
            (F.col("amount").isNull()) | 
            (F.col("amount") < 0), 
            1
        ).otherwise(0)).alias("nb_invalid")
    ).withColumn("error_rate", F.col("nb_invalid")/ F.col("nb_total"))

df_aggregation.show(truncate=False, n=3)


In [None]:
# calculate 7 days rolling error_rate
rolling_window = Window.partitionBy("file_name").orderBy(F.col("load_date")).rowsBetween(-6, 0)
df_rolling = df_aggregation.withColumn("rolling_error_rate", F.avg("error_rate").over(rolling_window))
df_rolling.show(3)

In [None]:
# rank files by error rate in each load_date
file_rank_window = Window.partitionBy(["load_date"]).orderBy(F.col("error_rate").desc())
df_rolling_with_ranks = df_rolling.withColumn("file_rank", F.rank().over(file_rank_window)) \
    .filter("file_rank <= 3")
df_rolling_with_ranks.show(3)