In [4]:
from pyspark.sql import SparkSession

HDFS_WAREHOUSE_PATH = "hdfs://localhost:9000/user/dottier/spark-warehouse"

spark: SparkSession = SparkSession.builder \
                        .appName('Test Silver') \
                        .config("spark.sql.warehouse.dir", HDFS_WAREHOUSE_PATH) \
                        .config("spark.driver.memory", "8g") \
                        .config("spark.sql.shuffle.partitions", "400") \
                        .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0") \
                        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                        .enableHiveSupport() \
                        .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
logger = spark._jvm.org.apache.log4j
logger.LogManager.getLogger("org.apache.hadoop.hive.metastore.ObjectStore").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("org.apache.hadoop.hive.conf.HiveConf").setLevel(logger.Level.ERROR)

25/09/06 05:21:26 WARN Utils: Your hostname, DESKTOP-9VM3SA1 resolves to a loopback address: 127.0.1.1; using 172.28.82.250 instead (on interface eth0)
25/09/06 05:21:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/dottier/big_data/venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/dottier/.ivy2/cache
The jars for the packages stored in: /home/dottier/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e37ea06e-3141-4f51-8ec9-214eab08477f;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.3.0 in central
	found io.delta#delta-storage;3.3.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 107ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.3.0 from central in [default]
	io.delta#delta-storage;3.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   

In [2]:
print(spark.sparkContext.master)

local[*]


In [2]:
print(spark.version)  # Spark version
print(spark.sparkContext.getConf().get("spark.scala.version")) # Scala version


In [32]:
spark._jsc.hadoopConfiguration().set("fs.defaultFS", "hdfs://localhost:9000")

In [2]:
print(spark._jsc.hadoopConfiguration().get("fs.defaultFS"))

In [3]:
spark.sql("CREATE DATABASE IF NOT EXISTS silver")

25/09/03 18:26:04 WARN ObjectStore: Failed to get database silver, returning NoSuchObjectException
25/09/03 18:26:04 WARN ObjectStore: Failed to get database silver, returning NoSuchObjectException
25/09/03 18:26:04 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
25/09/03 18:26:04 WARN ObjectStore: Failed to get database silver, returning NoSuchObjectException


DataFrame[]

In [2]:
spark.sql("SHOW TABLES IN silver").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|   silver|         dim_players|      false|
|   silver|          dim_stages|      false|
|   silver|           dim_teams|      false|
|   silver|   fct_match_summary|      false|
|   silver|fct_player_match_...|      false|
+---------+--------------------+-----------+



In [14]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|   silver|
+---------+



In [5]:
import re
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

def camel_to_snake(name: str) -> str:
    if name is None:
        return None
    return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()

In [6]:
import re
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

# 1. The core logic remains the same, but it now operates on a Pandas Series.
def camel_to_snake_vectorized(series: pd.Series) -> pd.Series:
    # The .str accessor in Pandas applies the regex to the whole series at once.
    return series.str.replace(r'(?<!^)(?=[A-Z])', '_', regex=True).str.lower()

# 2. Register it as a Pandas UDF.
snake_case_pandas_udf = pandas_udf(camel_to_snake_vectorized, returnType=StringType())

In [None]:
# from pyspark.sql import DataFrame
# from pyspark.sql.functions import col, explode_outer
# from pyspark.sql.types import StructType, ArrayType

# # Recursively flattens a PySpark DataFrame
# def flatten_df(df: DataFrame, parent_prefix: str = "") -> DataFrame:
#     flat_cols = []
#     explode_cols = []
    
#     for field in df.schema.fields:
#         field_name = field.name
#         col_name = f"{parent_prefix}{field_name}" if parent_prefix else field_name
#         alias_name = camel_to_snake(col_name.replace(".", "_"))
#         data_type = field.dataType

#         if isinstance(data_type, StructType):
#             # Recurse into struct
#             for subfield in data_type.fields:
#                 sub_col_name = f"{col_name}.{subfield.name}"
#                 sub_alias = camel_to_snake(sub_col_name.replace(".", "_"))
#                 flat_cols.append(col(sub_col_name).alias(sub_alias))
        
#         elif isinstance(data_type, ArrayType) and isinstance(data_type.elementType, StructType):
#             # Explode arrays of structs
#             explode_cols.append((col_name, field_name))
        
#         else:
#             # Primitive column
#             flat_cols.append(col(col_name).alias(alias_name))
    
#     return df.select(flat_cols)


In [7]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import StructType, StringType

def flatten_df(df: DataFrame, cols_to_snake_case_values: set = None) -> DataFrame:
    if cols_to_snake_case_values is None:
        cols_to_snake_case_values = set()

    flat_cols = []
    
    def get_flat_cols(schema: StructType, prefix: str = ""):
        for field in schema.fields:
            full_col_name = f"{prefix}{field.name}"
            
            # Recurse into nested structs
            if isinstance(field.dataType, StructType):
                get_flat_cols(field.dataType, prefix=f"{full_col_name}.")
            else:
                # Create the final, clean snake_case alias
                alias_name = camel_to_snake(full_col_name.replace(".", "_"))
                
                # Check if this column's values should be cleaned
                if alias_name in cols_to_snake_case_values and isinstance(field.dataType, StringType):
                    flat_cols.append(
                        snake_case_pandas_udf(col(full_col_name)).alias(alias_name)
                    )
                else:
                    flat_cols.append(
                        col(full_col_name).alias(alias_name)
                    )

    # Start the recursion on the top-level schema
    get_flat_cols(df.schema)
    
    # Return the DataFrame with the flattened and aliased columns
    return df.select(flat_cols)


In [2]:
import os

BRONZE_PATH = "/user/dottier/bronze"
STAGE_PATH = os.path.join(BRONZE_PATH, "stage_data")
MATCH_PATH = os.path.join(BRONZE_PATH, "match_data")

In [3]:
from hdfs import InsecureClient, HdfsError
from datetime import datetime, timezone
import os

def read_last_crawl_time():
    client = InsecureClient("http://localhost:9870", user="dottier")
    file_path = os.path.join(BRONZE_PATH, "last_crawl_timestamp.txt")
    try:
        with client.read(file_path, encoding="utf-8") as f:
            last_crawl_str = f.read().strip()
            return datetime.strptime(last_crawl_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
    except (FileNotFoundError, HdfsError):
        return datetime(1970, 1, 1, tzinfo=timezone.utc)
    
last_crawl_time = read_last_crawl_time()
print(last_crawl_time)

2025-08-15 17:16:35+00:00


In [10]:
import os
import datetime
from datetime import datetime, timezone

def read_last_crawl_time():
    try:
        with open(os.path.join(BRONZE_PATH, 'last_crawl_timestamp.txt'), 'r') as f:
            last_crawl_str = f.read().strip()
            return datetime.strptime(last_crawl_str, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc)
    except FileNotFoundError:
        return datetime(1970, 1, 1, tzinfo=timezone.utc)
    
last_crawl_time = read_last_crawl_time()
print(last_crawl_time)

1970-01-01 00:00:00+00:00


In [8]:
def json_to_df(
    schema: StructType,
    crawled_after: datetime,
    path: str,
    glob_pattern: str | None = None
) -> DataFrame:
    reader = (
        spark.read
        .schema(schema)
        .option("modifiedAfter", crawled_after.strftime("%Y-%m-%d %H:%M:%S"))
    )

    if glob_pattern:
        reader = reader.option("pathGlobFilter", glob_pattern)
    
    return reader.json(path)

In [8]:
from pyspark.sql.functions import current_timestamp, date_format, lit
load_date_hour = date_format(current_timestamp(), "yyyy-MM-dd-HH")
print(lit(load_date_hour))

Column<'date_format(current_timestamp(), yyyy-MM-dd-HH)'>


In [9]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
from delta.tables import DeltaTable

# Create Delta table if not exists, else merge
def write_to_silver(
    df: DataFrame,
    table_name: str,
    primary_keys: list[str], # MERGE requires a list of keys to join on
    partition_by: list[str] | None = None,
    load_date_hour=None 
):
    full_table_name = f"silver.{table_name}"

    # partition by load_date_hour (yyyy-MM-dd-HH)
    df_to_write = df
    if partition_by and "load_date_hour" in partition_by and partition_by not in df.columns:
        df_to_write = df.withColumn("load_date_hour", lit(load_date_hour))

    # check if table exists in spark catalog
    if not spark.catalog.tableExists(full_table_name):
        # Create delta table
        print(f"Table '{full_table_name}' does not exist. Creating new table...")

        warehouse_path = spark.conf.get("spark.sql.warehouse.dir")
        table_path = f"{warehouse_path}/{full_table_name.replace('.', '.db/')}"

        writer = df_to_write.write \
            .mode("overwrite") \
            .format("delta") \
            .option("mergeSchema", "true")

        if partition_by:
            writer = writer.partitionBy(partition_by)
        
        writer.save(table_path)
        spark.sql(f"CREATE TABLE {full_table_name} USING DELTA LOCATION '{table_path}'")
        print("Table successfully created and registered.")

    else:
        # merge into table
        print(f"Table '{full_table_name}' exists. Merging new data...")
        
        delta_table = DeltaTable.forName(spark, full_table_name)
        merge_condition = " AND ".join([f"target.{key} = source.{key}" for key in primary_keys])
        
        (delta_table.alias("target")
            .merge(
                source=df_to_write.alias("source"),
                condition=merge_condition
            )
            .whenMatchedUpdateAll() 
            .whenNotMatchedInsertAll()
            .execute()
        )
        print("Merge complete.")

In [13]:
spark.sql("SHOW TABLES") 

DataFrame[namespace: string, tableName: string, isTemporary: boolean]

In [9]:
from delta.tables import DeltaTable

def read_from_silver(table_name: str):
    full_table_name = f"silver.{table_name}"

    try:
        df = spark.read.table(full_table_name)
        print(f"Successfully read {full_table_name}")
        return df
    except Exception as e:
        print(f"Error reading {full_table_name}: {e}")
        raise

In [10]:
stage = read_from_silver("dim_stages")
stage.show(1000)

Successfully read silver.dim_stages


25/09/06 05:21:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+---------+-------------+-------------+--------------------+---------+-----------+--------------------+-------------------+--------------------+---------+--------+--------------+
|region_id|  region_name|tournament_id|     tournament_name|season_id|season_name|          stage_name|         crawled_at|              league|   season|stage_id|load_date_hour|
+---------+-------------+-------------+--------------------+---------+-----------+--------------------+-------------------+--------------------+---------+--------+--------------+
|      247|International|          124|European Champion...|     3164|       2012| EURO Quarter finals|2025-09-05 07:06:28|international-eur...|     2012|    5975| 2025-09-05-12|
|      247|International|          124|European Champion...|     4246|       2016|         EURO Grp. B|2025-09-05 07:04:27|international-eur...|     2016|    8988| 2025-09-05-12|
|      247|International|           67| FIFA Club World Cup|    10539|       2025|FIFA Club World C...|20

In [11]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import asc, desc, col

stage_schemas = StructType([
    StructField("metadata", StructType([
        StructField("crawled_at", StringType(), True),
    ]), True),
    
    StructField("data", StructType([
        # Your original schema, now nested under the 'data' field
        StructField("regionId", IntegerType(), True),
        StructField("regionName", StringType(), True),
        StructField("tournamentId", IntegerType(), True),
        StructField("tournamentName", StringType(), True),
        StructField("seasonId", IntegerType(), True),
        StructField("seasonName", StringType(), True),
        StructField("stageName", StringType(), True)
    ]), True)
])

dim_stages_df = spark.read \
    .option("modifiedAfter", last_crawl_time) \
    .schema(stage_schemas) \
    .json(STAGE_PATH)

dim_stages_df = flatten_df(
    dim_stages_df
    .select(
        "data.*",
        "metadata.crawled_at",
        "league",
        "season",
        "stage_id"
    )
)

In [12]:
dim_stages_df.printSchema()
print(dim_stages_df.count())
dim_stages_df.show(truncate=False)

root
 |-- region_id: integer (nullable = true)
 |-- region_name: string (nullable = true)
 |-- tournament_id: integer (nullable = true)
 |-- tournament_name: string (nullable = true)
 |-- season_id: integer (nullable = true)
 |-- season_name: string (nullable = true)
 |-- stage_name: string (nullable = true)
 |-- crawled_at: string (nullable = true)
 |-- league: string (nullable = true)
 |-- season: string (nullable = true)
 |-- stage_id: integer (nullable = true)



                                                                                

426
+---------+-----------+-------------+----------------+---------+-----------+----------------+-------------------+-----------------------+---------+--------+
|region_id|region_name|tournament_id|tournament_name |season_id|season_name|stage_name      |crawled_at         |league                 |season   |stage_id|
+---------+-----------+-------------+----------------+---------+-----------+----------------+-------------------+-----------------------+---------+--------+
|250      |Europe     |12           |Champions League|10903    |2025/2026  |Champions League|2025-09-04 19:12:16|europe-champions-league|2025-2026|24796   |
|108      |Italy      |5            |Serie A         |7928     |2019/2020  |Serie A         |2025-09-04 18:24:31|italy-serie-a          |2019-2020|17835   |
|206      |Spain      |4            |LaLiga          |7889     |2019/2020  |LaLiga          |2025-09-04 19:08:44|spain-laliga           |2019-2020|17702   |
|252      |England    |2            |Premier League  |

In [13]:
from pyspark.sql.functions import current_timestamp, date_format, lit
JOB_RUN_DATE = date_format(current_timestamp(), "yyyy-MM-dd-HH")

In [60]:
write_to_silver(
    df=dim_stages_df,
    table_name="dim_stages",
    primary_keys=["stage_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)

Table 'silver.dim_stages' does not exist. Creating new table...


                                                                                

Table successfully created and registered.


25/09/05 12:05:46 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`silver`.`dim_stages` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


- Dimension Team Table

In [61]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, expr

team_schema = StructType([
    StructField("data", StructType([
        StructField("homeTeamId", IntegerType(), False),
        StructField("homeTeamName", StringType(), False),
        StructField("homeTeamCountryName", StringType(), False),
        StructField("awayTeamId", IntegerType(), False),
        StructField("awayTeamName", StringType(), False),
        StructField("awayTeamCountryName", StringType(), False),
    ]), True)
])

dim_teams_df = json_to_df(
    schema=team_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_preview.json"
)

home_teams_df = dim_teams_df.select(
    col("data.homeTeamId").alias("team_id"),
    col("data.homeTeamName").alias("team_name"),
    col("data.homeTeamCountryName").alias("country_name")
)

away_teams_df = dim_teams_df.select(
    col("data.awayTeamId").alias("team_id"),
    col("data.awayTeamName").alias("team_name"),
    col("data.awayTeamCountryName").alias("country_name")
)

dim_teams_df = home_teams_df.union(away_teams_df).dropDuplicates(["team_id"])

In [31]:
print(dim_teams_df.count())



389


                                                                                

In [32]:
dim_teams_df.printSchema()
dim_teams_df.show(10)

root
 |-- team_id: integer (nullable = true)
 |-- team_name: string (nullable = true)
 |-- country_name: string (nullable = true)





+-------+-----------------+------------+
|team_id|        team_name|country_name|
+-------+-----------------+------------+
|     15|          Chelsea|     England|
|     16|       Sunderland|     England|
|     23|        Newcastle|     England|
|     27|          Watford|     England|
|     31|          Everton|     England|
|     37|    Bayern Munich|     Germany|
|     39|       Schalke 04|     Germany|
|     41|    VfB Stuttgart|     Germany|
|     44|Borussia Dortmund|     Germany|
|     52|      Real Madrid|       Spain|
+-------+-----------------+------------+
only showing top 10 rows



                                                                                

In [62]:
write_to_silver(
    df=dim_teams_df,
    table_name="dim_teams",
    primary_keys=["team_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)

Table 'silver.dim_teams' does not exist. Creating new table...


                                                                                

Table successfully created and registered.


25/09/05 12:06:32 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`silver`.`dim_teams` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In [63]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.functions import desc, col, regexp_extract

match_preview_schema = StructType([
    StructField("metadata", StructType([
        StructField("crawled_at", StringType(), True),
    ]), True),

    StructField("data", StructType([
        StructField(("homeTeamId"), IntegerType(), True),
        StructField(("awayTeamId"), IntegerType(), True),
        StructField(("startTimeUtc"), TimestampType(), True)
    ]), True)
])

match_preview_df = json_to_df(
    schema=match_preview_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_preview.json"
)

match_preview_df = flatten_df(
    match_preview_df
    .select(
        "match_id",
        "stage_id",
        "data.*",
        "metadata.crawled_at"
    )
)

In [35]:
print(match_preview_df.count())
match_preview_df.printSchema()



25081
root
 |-- match_id: integer (nullable = true)
 |-- stage_id: integer (nullable = true)
 |-- home_team_id: integer (nullable = true)
 |-- away_team_id: integer (nullable = true)
 |-- start_time_utc: timestamp (nullable = true)
 |-- crawled_at: string (nullable = true)



                                                                                

In [36]:
match_preview_df.orderBy(
    # col("stage_id").asc(),
    col("match_id").desc()
).show()



+--------+--------+------------+------------+-------------------+-------------------+
|match_id|stage_id|home_team_id|away_team_id|     start_time_utc|         crawled_at|
+--------+--------+------------+------------+-------------------+-------------------+
| 1946663|   24798|         446|         128|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946662|   24798|         446|         395|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946661|   24798|         446|         223|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946660|   24798|         446|         306|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946659|   24798|         140|         493|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946658|   24798|         140|         401|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946657|   24798|         140|         994|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946656|   24798|         140|          54|2025-09-24 19:00:00|2025-08-30 15:17:29|
| 1946655|   24798|         128|         140|2025-09-2

                                                                                

In [64]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import desc, col, regexp_extract, split, regexp_extract

match_data_schema = StructType([
    StructField("matchCentreData", StructType([
        StructField(("score"), StringType(), False),
        StructField(("htScore"), StringType(), False),
        StructField(("ftScore"), StringType(), False),
        StructField(("etScore"), StringType(), False),
        StructField(("pkScore"), StringType(), False),
    ]))
])

number_pattern = r"(\d+)"

match_data_df = json_to_df(
    schema=match_data_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_data.json"
)

# Convert from score string (3 : 2) to home score (3) and away score (2)
match_data_df = match_data_df.select(
    col("match_id"),
    regexp_extract(split(col("matchCentreData.score"), ":").getItem(0), number_pattern, 1).cast("integer").alias("home_score"),
    regexp_extract(split(col("matchCentreData.score"), ":").getItem(1), number_pattern, 1).cast("integer").alias("away_score"),
    regexp_extract(split(col("matchCentreData.htScore"), ":").getItem(0), number_pattern, 1).cast("integer").alias("home_ht_score"),
    regexp_extract(split(col("matchCentreData.htScore"), ":").getItem(1), number_pattern, 1).cast("integer").alias("away_ht_score"),
    regexp_extract(split(col("matchCentreData.ftScore"), ":").getItem(0), number_pattern, 1).cast("integer").alias("home_ft_score"),
    regexp_extract(split(col("matchCentreData.ftScore"), ":").getItem(1), number_pattern, 1).cast("integer").alias("away_ft_score"),
    regexp_extract(split(col("matchCentreData.etScore"), ":").getItem(0), number_pattern, 1).cast("integer").alias("home_et_score"),
    regexp_extract(split(col("matchCentreData.etScore"), ":").getItem(1), number_pattern, 1).cast("integer").alias("away_et_score"),
    regexp_extract(split(col("matchCentreData.pkScore"), ":").getItem(0), number_pattern, 1).cast("integer").alias("home_pk_score"),
    regexp_extract(split(col("matchCentreData.pkScore"), ":").getItem(1), number_pattern, 1).cast("integer").alias("away_pk_score"),
    col("stage_id")
)

In [38]:
match_data_df.printSchema()
# print(match_data_df.count())
match_data_df.show()

root
 |-- match_id: integer (nullable = true)
 |-- home_score: integer (nullable = true)
 |-- away_score: integer (nullable = true)
 |-- home_ht_score: integer (nullable = true)
 |-- away_ht_score: integer (nullable = true)
 |-- home_ft_score: integer (nullable = true)
 |-- away_ft_score: integer (nullable = true)
 |-- home_et_score: integer (nullable = true)
 |-- away_et_score: integer (nullable = true)
 |-- home_pk_score: integer (nullable = true)
 |-- away_pk_score: integer (nullable = true)
 |-- stage_id: integer (nullable = true)

+--------+----------+----------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------+
|match_id|home_score|away_score|home_ht_score|away_ht_score|home_ft_score|away_ft_score|home_et_score|away_et_score|home_pk_score|away_pk_score|stage_id|
+--------+----------+----------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+------

In [None]:
match_data_df.filter(col("away_pk_score").isNotNull()).show()

In [None]:
match_data_df.filter(col("home_pk_score").isNotNull()).show()

In [65]:
fct_match_summary_df = match_preview_df.join(
    other=match_data_df,
    on=["match_id", "stage_id"],
    how="left_outer")

In [40]:
print(fct_match_summary_df.count())
fct_match_summary_df.printSchema()

fct_match_summary_df.orderBy(
    col("match_id")
).show(10)

                                                                                

25081
root
 |-- match_id: integer (nullable = true)
 |-- stage_id: integer (nullable = true)
 |-- home_team_id: integer (nullable = true)
 |-- away_team_id: integer (nullable = true)
 |-- start_time_utc: timestamp (nullable = true)
 |-- crawled_at: string (nullable = true)
 |-- home_score: integer (nullable = true)
 |-- away_score: integer (nullable = true)
 |-- home_ht_score: integer (nullable = true)
 |-- away_ht_score: integer (nullable = true)
 |-- home_ft_score: integer (nullable = true)
 |-- away_ft_score: integer (nullable = true)
 |-- home_et_score: integer (nullable = true)
 |-- away_et_score: integer (nullable = true)
 |-- home_pk_score: integer (nullable = true)
 |-- away_pk_score: integer (nullable = true)



                                                                                

+--------+--------+------------+------------+-------------------+-------------------+----------+----------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|match_id|stage_id|home_team_id|away_team_id|     start_time_utc|         crawled_at|home_score|away_score|home_ht_score|away_ht_score|home_ft_score|away_ft_score|home_et_score|away_et_score|home_pk_score|away_pk_score|
+--------+--------+------------+------------+-------------------+-------------------+----------+----------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+
|  566036|    5980|         342|         323|2012-06-08 16:00:00|2025-08-30 14:58:11|         1|         1|            1|            0|            1|            1|         NULL|         NULL|         NULL|         NULL|
|  566037|    5980|         326|         332|2012-06-08 18:45:00|2025-08-30 14:58:11|         4|         1|            2

In [66]:
write_to_silver(
    df=fct_match_summary_df,
    table_name="fct_match_summary",
    primary_keys=["match_id"],
    partition_by="load_date_hour",
    load_date_hour=JOB_RUN_DATE
)

Table 'silver.fct_match_summary' does not exist. Creating new table...


                                                                                

Table successfully created and registered.


25/09/05 12:08:11 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`silver`.`fct_match_summary` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In [None]:
# from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# from pyspark.sql.functions import col, expr

# team_stage_schema = StructType([
#     StructField("homeTeamId", IntegerType(), False),
#     StructField("awayTeamId", IntegerType(), False),
# ])

# fct_team_stage_participation_df = spark.read \
#     .schema(team_stage_schema) \
#     .option("multiLine", True) \
#     .option("pathGlobFilter", "*match_preview.json") \
#     .json(MATCH_PATH)

# # or select home_df and away_df then join
# fct_team_stage_participation_df = fct_team_stage_participation_df.select(
#     expr("stack(2, homeTeamId, awayTeamId) as (team_id)"),
#     col("stage_id")
# ).distinct()

In [None]:
# print(fct_team_stage_participation_df.count())
# fct_team_stage_participation_df.printSchema()
# fct_team_stage_participation_df.show()

In [None]:
# (
#     fct_team_stage_participation_df
#     .write
#     .mode("append")
#     .format("delta")
#     .save(f"{SILVER_PATH}/fct_team_stage_partcipation")
# )

In [51]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql.functions import col, expr, explode

player_schema = StructType([
    StructField("matchCentreData", StructType([
        StructField("home", StructType([
            StructField("players", ArrayType(
                StructType([
                    StructField("playerId", IntegerType(), False),
                    StructField("name", StringType(), False)
                ])
            ))
        ])),
        StructField("away", StructType([
            StructField("players", ArrayType(
                StructType([
                    StructField("playerId", IntegerType(), False),
                    StructField("name", StringType(), False)
                ])
            ))
        ]))
    ]))
])

dim_players_df = json_to_df(
    schema=player_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_data.json"
)

home_players_df = (
    dim_players_df
    .select(
        explode("matchCentreData.home.players").alias("player")
    )
    .select(
        "player.playerId",
        "player.name"
    )
)

away_players_df = (
    dim_players_df
    .select(
        explode("matchCentreData.away.players").alias("player")
    )
    .select(
        "player.playerId",
        "player.name"
    )
)

dim_players_df = home_players_df.union(away_players_df)

dim_players_df = dim_players_df.select(
    col("playerId").alias("player_id"),
    col("name").alias("player_name")
).dropDuplicates(["player_id"])

In [52]:
print(dim_players_df.count())
dim_players_df.printSchema()
dim_players_df.show()

ERROR:root:KeyboardInterrupt while sending command.           (200 + 16) / 1706]
Traceback (most recent call last):
  File "/home/dottier/big_data/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/dottier/big_data/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

KeyboardInterrupt: 



In [None]:
write_to_silver(dim_players_df, "dim_players")

In [None]:
view = dim_players_df.filter(col("player_name").contains("Mus"))
view.show()

In [67]:
def extract_side(df: DataFrame, side):
    return df.select(
        explode(f"matchCentreData.{side}.players").alias("player"),
        col(f"matchCentreData.{side}.teamId").alias("team_id"),
        col("match_id"),
        col("stage_id")
    )
    # .withColumnRenamed(
    #     "player.playerId", "player_id"
    # ).withColumnRenamed(
    #     "player.shirtNo", "player_id"
    # ).withColumnRenamed(
    #     "player.playerId", "player_id"
    # )

In [68]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, BooleanType
from pyspark.sql.functions import col, expr, explode

player_match_schema = StructType([
    StructField("matchCentreData", StructType([
        StructField("home", StructType([
            StructField("teamId", IntegerType(), False),
            StructField("players", ArrayType(
                StructType([
                    StructField("playerId", IntegerType(), False),
                    StructField("name", StringType(), False),
                    StructField("shirtNo", IntegerType(), False),
                    StructField("isFirstEleven", BooleanType(), False),
                    StructField("position", StringType(), False),
                ])
            ), True)
        ]), True),
        StructField("away", StructType([
            StructField("teamId", IntegerType(), False),
            StructField("players", ArrayType(
                StructType([
                    StructField("playerId", IntegerType(), False),
                    StructField("name", StringType(), False),
                    StructField("shirtNo", IntegerType(), False),
                    StructField("isFirstEleven", BooleanType(), False),
                    StructField("position", StringType(), False),
                ])
            ), True)
        ]), True)
    ]))
])

fct_player_match_participation_df = json_to_df(
    schema=player_match_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_data.json"
)

home_pm_df = extract_side(fct_player_match_participation_df, "home")
away_pm_df = extract_side(fct_player_match_participation_df, "away")
joined_pm_df = home_pm_df.union(away_pm_df)

flatten_pm_df = (
    joined_pm_df.select(
        col("match_id"),
        col("team_id"),
        col("player.playerId").alias("player_id"),
        col("player.name").alias("player_name"),
        col("player.shirtNo").alias("shirt_no"),
        col("player.isFirstEleven").alias("is_starting_eleven"),
        col("player.position").alias("position")
    )
)

fct_player_match_participation_df = (
    flatten_pm_df
    .drop("player_name")
    .fillna(False, ["is_starting_eleven"])
)

                                                                                

In [15]:
fct_player_match_participation_df.count()

                                                                                

922314

In [69]:
# Construct dim_players_df from fct_player_match_participation_df
dim_players_df = (
    flatten_pm_df.select(
        "player_id",
        "player_name",
    )
).dropDuplicates(["player_id"])

In [30]:
print(fct_player_match_participation_df.count())
fct_player_match_participation_df.printSchema()
fct_player_match_participation_df.show()

                                                                                

922314
root
 |-- match_id: integer (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- player_id: integer (nullable = true)
 |-- shirt_no: integer (nullable = true)
 |-- is_starting_eleven: boolean (nullable = false)
 |-- position: string (nullable = true)

+--------+-------+---------+--------+------------------+--------+
|match_id|team_id|player_id|shirt_no|is_starting_eleven|position|
+--------+-------+---------+--------+------------------+--------+
| 1872043|    341|   141646|      16|              true|      GK|
| 1872043|    341|   301019|       5|              true|      DR|
| 1872043|    341|   327721|       4|              true|      DC|
| 1872043|    341|   361822|      17|              true|      DC|
| 1872043|    341|   303115|      22|              true|      DL|
| 1872043|    341|   353423|       8|              true|     DMC|
| 1872043|    341|   369570|      13|              true|     DMC|
| 1872043|    341|   299513|       7|              true|     AMR|
| 187

In [45]:
print(dim_players_df.count())
dim_players_df.printSchema()
dim_players_df.show()

                                                                                

18822
root
 |-- player_id: integer (nullable = true)
 |-- player_name: string (nullable = true)





+---------+------------------+
|player_id|       player_name|
+---------+------------------+
|     1019|         Wes Brown|
|     1212|   Faryd Mondragón|
|     2180|       Joseph Yobo|
|     2235|     Lars Jacobsen|
|     2247|    Daniele Bonera|
|     2479|    Leon Andreasen|
|     2778|   Giampiero Pinzi|
|     2842|    Emre Belozoglu|
|     3841|       John O'Shea|
|     4368| Sergio Pellissier|
|     4773|   Steffen Hofmann|
|     5480|    Jermaine Jones|
|     5759|    Markus Feulner|
|     6040|    Lukas Podolski|
|     6128|  Steven Whittaker|
|     6145|    Angelo Palombo|
|     6176|José Antonio Reyes|
|     6301|      Julien Cardy|
|     6319|       David Villa|
|     6695|      Jonas Olsson|
+---------+------------------+
only showing top 20 rows



                                                                                

In [70]:
write_to_silver(
    df=dim_players_df,
    table_name="dim_players",
    primary_keys=["player_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)

Table 'silver.dim_players' does not exist. Creating new table...


                                                                                

Table successfully created and registered.


25/09/05 12:10:01 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`silver`.`dim_players` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In [None]:
write_to_silver(
    df=fct_player_match_participation_df,
    table_name="fct_player_match_participation",
    primary_keys=["player_id", "match_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)

In [24]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType, BooleanType
from pyspark.sql.functions import col, explode_outer, asc, desc, row_number, monotonically_increasing_id, explode
from pyspark.sql.window import Window

match_event_schema = StructType([
    StructField("matchCentreData", StructType([
        StructField("home", StructType([
            StructField("teamId", IntegerType(), False),
            StructField("formations", ArrayType(
                StructType([
                    StructField("formationName", StringType(), False),
                    StructField("playerIds", ArrayType(IntegerType()), False)
                ])
            ), True)
        ]), True),
        StructField("away", StructType([
            StructField("teamId", IntegerType(), False),
            StructField("formations", ArrayType(
                StructType([
                    StructField("formationName", StringType(), False),
                    StructField("playerIds", ArrayType(IntegerType()), False)
                ])
            ), True)
        ]), True)
    ]))
])

fct_match_events_df = json_to_df(
    schema=match_event_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_data.json"
)

# # repartition before exploding for low shuffle cost later
# num_partitions = 200
# fct_match_events_df = fct_match_events_df.repartition(num_partitions, "match_id")

# exploding column "event"
fct_match_events_df = fct_match_events_df.withColumn("event", explode_outer(col("matchCentreData.events"))).drop("matchCentreData")

# Since event_id is unique for each team per match
# We create an unique _match_id to preserve the original event order of every match
fct_match_events_df = fct_match_events_df.withColumn(
    "surrogate_event_id", monotonically_increasing_id()
)
w = Window.partitionBy("match_id").orderBy("surrogate_event_id")
fct_match_events_df = fct_match_events_df.withColumn("_event_id", row_number().over(w))

base_match_events_df = (
    flatten_df(
        fct_match_events_df.select(
            col("surrogate_event_id"),
            col("match_id"),
            col("_event_id"),
            "event.*",
            col("league"),
            col("season"),
            col("stage_id")
        )
    )
)

exploded_match_events_df = base_match_events_df.select(
    col("surrogate_event_id"),
    explode(col("satisfied_events_types")).alias("event_type_id")
)

# fct_match_events_df = fct_match_events_df.orderBy(
#     asc("match_id"),
#     asc("_event_id")
# )

In [71]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType, BooleanType
from pyspark.sql.functions import col, explode_outer, asc, desc, row_number, monotonically_increasing_id, explode, transform, filter, regexp_replace, lower, when, arrays_zip
from pyspark.sql.window import Window

match_formations_schema = StructType([
    StructField("matchCentreData", StructType([
        StructField("home", StructType([
            StructField("teamId", IntegerType(), False),
            StructField("formations", ArrayType(
                StructType([
                    StructField("formationName", StringType(), True),
                    StructField("playerIds", ArrayType(IntegerType(), True), True),
                    StructField("formationSlots", ArrayType(IntegerType(), True), True),
                    StructField("startMinuteExpanded", IntegerType(), True)
                ])
            ), True)
        ]), True),
        StructField("away", StructType([
            StructField("teamId", IntegerType(), False),
            StructField("formations", ArrayType(
                StructType([
                    StructField("formationName", StringType(), True),
                    StructField("playerIds", ArrayType(IntegerType(), True), True),
                    StructField("formationSlots", ArrayType(IntegerType(), True), True),
                    StructField("startMinuteExpanded", IntegerType(), True)
                ])
            ), True)
        ]), True)
    ]))
])

fct_match_formations_df = json_to_df(
    schema=match_formations_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_data.json"
)


home_formations = fct_match_formations_df.select(
    explode(f"matchCentreData.home.formations").alias("formation"),
    col(f"matchCentreData.home.teamId").alias("team_id"),
    col("match_id"),
    col("stage_id")
)
away_formations = fct_match_formations_df.select(
    explode(f"matchCentreData.away.formations").alias("formation"),
    col(f"matchCentreData.away.teamId").alias("team_id"),
    col("match_id"),
    col("stage_id")
)

joined_formations = home_formations.union(away_formations)

# # exploding column "formations"
# fct_match_formations_df = joined_formations.withColumn(
#     "formation",
#     explode_outer(col("formations"))
# ).drop("formations")

granular_player_slots_df = (
    joined_formations
    .withColumn("player_slot_pairs", arrays_zip(
        col("formation.formationSlots"), 
        col("formation.playerIds")
    ))
    .withColumn("player_slot", explode("player_slot_pairs"))
    .select(
        col("match_id"),
        col("stage_id"),
        col("team_id"),
        col("formation.formationName").alias("formation_name"),
        col("formation.startMinuteExpanded").alias("start_minute"),
        col("player_slot.playerIds").alias("player_id"),
        col("player_slot.formationSlots").alias("slot_id")
    )
    .filter(col("slot_id") > 0) # Only keep players on the pitch
)

In [33]:
granular_player_slots_df.filter(col("formation_name") == "532").select("match_id").show()

+--------+
|match_id|
+--------+
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1699239|
| 1829434|
| 1829434|
| 1829434|
| 1829434|
| 1829434|
| 1829434|
| 1829434|
| 1829434|
| 1829434|
+--------+
only showing top 20 rows



                                                                                

In [55]:
from pyspark.sql import functions as sf
# granular_player_slots_df.filter(col("start_minute") == 0).select(sf.count_distinct("formation_name", "slot_id")).show()
granular_player_slots_df.filter(col("start_minute") == 0).dropDuplicates(["formation_name", "slot_id"]).filter(col("slot_id") == 1).show(25)

[Stage 652:>                                                        (0 + 1) / 1]

+--------+--------+-------+--------------+------------+---------+-------+
|match_id|stage_id|team_id|formation_name|start_minute|player_id|slot_id|
+--------+--------+-------+--------------+------------+---------+-------+
| 1811958|   22712|    413|          3142|           0|   401633|      1|
| 1809772|   22687|    249|          3511|           0|   145427|      1|
| 1627572|   20266|     45|           343|           0|    66424|      1|
| 1697767|   18657|    337|           433|           0|   119120|      1|
| 1604536|   20266|    288|          3421|           0|   296521|      1|
| 1694396|   20979|    248|           442|           0|   260770|      1|
| 1789439|   22686|     63|           352|           0|    76662|      1|
| 1872043|   23839|    341|          4231|           0|   141646|      1|
| 1809759|   22686|    167|          4141|           0|   121774|      1|
| 1697443|   18657|    335|          3412|           0|   252239|      1|
| 1096711|    8996|    340|          4

                                                                                

In [26]:
from pyspark.sql.functions import col, first, broadcast

enriched_slots_df = granular_player_slots_df.join(
    broadcast(fct_player_match_participation_df),
    ["match_id", "player_id"],
    "inner" # Use inner join because we only care about players with known starting positions
)

golden_matches_df = (
    enriched_slots_df
    .filter(col("start_minute") == 0)
    .select("formation_name", "match_id")
    .dropDuplicates(["formation_name"])
)

formation_slot_mapping_df = (
    enriched_slots_df
    # Filter 1: Only keep starting players
    .filter(col("start_minute") == 0)
    # Filter 2: Only keep data from our "golden matches"
    .join(
        broadcast(golden_matches_df),
        ["formation_name", "match_id"],
        "inner"
    )
    # Now, aggregate to build the final map
    .groupBy("formation_name", "slot_id")
    .agg(first("position").alias("inferred_position"))
    .orderBy("formation_name", "slot_id")
)

In [27]:
# Assume you have already built your mapping.
# 'formation_slot_mapping_df' is the DataFrame with columns:
# ["formation_name", "slot_id", "inferred_position"]

# --- Step 1: Get the PRIMARY position for every player in every match ---
# We need to find the first formation/slot each player was in for each match.
# This correctly handles starters (start_minute=0) and subs (start_minute > 0).

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define a window to partition by match and player, ordering by when they came on
window_spec = Window.partitionBy("match_id", "player_id").orderBy(col("start_minute").asc())

# 'granular_player_slots_df' is the big DataFrame with all players from all formation periods
player_first_appearance_df = (
    granular_player_slots_df
    .withColumn("appearance_rank", row_number().over(window_spec))
    .filter(col("appearance_rank") == 1) # Keep only the first row for each player in each match
    .select("match_id", "player_id", "formation_name", "slot_id")
)


# --- Step 2: Join with your mapping to get the position string ---
# This is the final join that brings it all together.

final_player_positions_df = player_first_appearance_df.join(
    formation_slot_mapping_df,
    ["formation_name", "slot_id"],
    "left" # Use a left join to see if any positions are unmapped
).select(
    col("match_id"),
    col("player_id"),
    col("inferred_position").alias("position") # Rename the column to your final desired name
)


# --- Final Output ---
# This is the table you wanted.
print("Final Player Positions Table:")
display(final_player_positions_df)

# You would then write this to your final Delta table
# final_player_positions_df.write.mode("overwrite").saveAsTable("workspace.gold.player_match_positions")

In [30]:
final_player_positions_df.filter(
    col("match_id") == 1697443
).show(50)

# final_player_positions_df.printSchema()

In [23]:
from pyspark.sql.functions import first

# Assume you have a DataFrame 'player_match_participation_df'
# with columns: match_id, player_id, team_id, position (the known starting position)

# --- Join the granular slot data with the known starting positions ---
# We only care about the starting period (start_minute = 0)
starting_slots_df = granular_player_slots_df.filter(
    col("start_minute") == 0
).dropDuplicates(["formation_name", "slot_id"])

# Join to get the known position string
mapping_builder_df = starting_slots_df.join(
    fct_player_match_participation_df,
    ["match_id", "team_id", "player_id"],
    how="left"
)

formation_slot_mapping_df = mapping_builder_df.select(
    "formation_name",
    "slot_id",
    "position",
)

player_match_position_df = granular_player_slots_df.join(
    formation_slot_mapping_df,
    on=["formation_name", "slot_id"],
)

fct_player_match_participation_df = (
    fct_player_match_participation_df
    .join(
        player_match_position_df,
        on=["match_id", "team_id", "player_id"],
    )
)

NameError: name 'granular_player_slots_df' is not defined

In [63]:
granular_player_slots_df.printSchema()

root
 |-- match_id: integer (nullable = true)
 |-- stage_id: integer (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- formation_name: string (nullable = true)
 |-- start_minute: integer (nullable = true)
 |-- player_id: integer (nullable = true)
 |-- slot_id: integer (nullable = true)



In [20]:
"""
Now since we don't have the mapping (formation + slot_id) -> played_position,  
we have to build it from existing data

The source data only shows Sub position for players not in the starting lineup,  
so this block purpose is to build actual position for them. You can modify this  
to consider Sub a real position like what Whoscored did
"""
from pyspark.sql.functions import col, first, broadcast
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Get all distinct formation_name + slot_id to build mapping
# from starting lineups (start_min = 0)
formation_starter_slots_df = granular_player_slots_df.filter(
    col("start_minute") == 0
).dropDuplicates(
    ["formation_name", "slot_id"]
)

formation_starter_slots_df.cache()
print(formation_starter_slots_df.count())

# Join the existing data
# to the original fct_player_match_participation table with registered positions
mapping_builder_df = (
    broadcast(formation_starter_slots_df)
    .join(
        fct_player_match_participation_df,
        ["match_id", "player_id", "team_id"],
    )
)

print(mapping_builder_df.count())

mapping_builder_df = mapping_builder_df.select(
    "formation_name",
    "slot_id",
    "position"
)

import json

rows = [row.asDict() for row in mapping_builder_df.collect()]
print(json.dumps(rows, indent=2))

                                                                                

253


                                                                                

253




[
  {
    "formation_name": "4231",
    "slot_id": 1,
    "position": "GK"
  },
  {
    "formation_name": "4231",
    "slot_id": 2,
    "position": "DR"
  },
  {
    "formation_name": "4231",
    "slot_id": 5,
    "position": "DC"
  },
  {
    "formation_name": "4231",
    "slot_id": 6,
    "position": "DC"
  },
  {
    "formation_name": "4231",
    "slot_id": 3,
    "position": "DL"
  },
  {
    "formation_name": "4231",
    "slot_id": 8,
    "position": "DMC"
  },
  {
    "formation_name": "4231",
    "slot_id": 4,
    "position": "DMC"
  },
  {
    "formation_name": "4231",
    "slot_id": 7,
    "position": "AMR"
  },
  {
    "formation_name": "4231",
    "slot_id": 10,
    "position": "AMC"
  },
  {
    "formation_name": "4231",
    "slot_id": 11,
    "position": "AML"
  },
  {
    "formation_name": "4231",
    "slot_id": 9,
    "position": "FW"
  },
  {
    "formation_name": "352",
    "slot_id": 1,
    "position": "GK"
  },
  {
    "formation_name": "352",
    "slot_id": 5,
    "

                                                                                

In [21]:
formation_starter_slots_df.unpersist()

DataFrame[match_id: int, stage_id: int, team_id: int, formation_name: string, start_minute: int, player_id: int, slot_id: int]

In [None]:
from pyspark.sql import Row

print(len(rows))

mapping_position_df = spark.createDataFrame([Row(**row) for row in rows])

mapping_position_df.show()
mapping_position_df.select("formation_name").distinct().show(23)

165


In [72]:
import os
import json
from pyspark.sql.functions import col, first, broadcast, row_number
from pyspark.sql.window import Window
from pyspark.sql import Row

mapping_file = "mapping/formation_slot_mapping.json"

if not os.path.exists(mapping_file):
    print(f"{mapping_file} doesn't exist. Creating...")
    # Get all distinct formation_name + slot_id to build mapping
    formation_starter_slots_df = (
        granular_player_slots_df
        .filter(col("start_minute") == 0)
        .dropDuplicates(["formation_name", "slot_id"])
    )

    print(formation_starter_slots_df.count())

    # Join the existing data to fct_player_match_participation table with registered positions
    mapping_position_df = (
        broadcast(formation_starter_slots_df)
        .join(
            fct_player_match_participation_df,
            ["match_id", "player_id", "team_id"],
        )
        .select("formation_name", "slot_id", "position")
    )

    # Store position_mapping table to json file
    rows = [row.asDict() for row in mapping_position_df.collect()]
    os.makedirs(os.path.dirname(mapping_file), exist_ok=True)
    with open(mapping_file, "w", encoding="utf-8") as f:
        json.dump(rows, f, indent=2)

    print(f"Successfully written mapping_position_df to file {mapping_file}")

else:
    print(f"Reading mapping_position_df from file {mapping_file}")
    with open(mapping_file, "r", encoding="utf-8") as f:
        rows = json.load(f)
    
    mapping_position_df = spark.createDataFrame([Row(**row) for row in rows])
    mapping_position_df.show()

Reading mapping_position_df from file mapping/formation_slot_mapping.json


[Stage 656:>                                                        (0 + 1) / 1]

+--------------+-------+--------+
|formation_name|slot_id|position|
+--------------+-------+--------+
|          4231|      1|      GK|
|          4231|      2|      DR|
|          4231|      5|      DC|
|          4231|      6|      DC|
|          4231|      3|      DL|
|          4231|      8|     DMC|
|          4231|      4|     DMC|
|          4231|      7|     AMR|
|          4231|     10|     AMC|
|          4231|     11|     AML|
|          4231|      9|      FW|
|           352|      1|      GK|
|           352|      5|      DC|
|           352|      6|      DC|
|           352|      4|      DC|
|           352|      2|     DMR|
|           352|      3|     DML|
|           352|     11|      MC|
|           352|      7|      MC|
|           352|      8|      MC|
+--------------+-------+--------+
only showing top 20 rows



                                                                                

In [73]:
# Take the first position of a player in a match as their played_position (appearance_rank = 1)
window_spec = Window.partitionBy("match_id", "player_id").orderBy(col("start_minute").asc())

player_primary_slot_df = (
    granular_player_slots_df
    .withColumn("appearance_rank", row_number().over(window_spec))
    .filter(col("appearance_rank") == 1)
    .select("match_id", "player_id", "team_id", "formation_name", "slot_id")
)

# join to get position data from mapping
player_final_position_df = player_primary_slot_df.join(
    broadcast(mapping_position_df),
    ["formation_name", "slot_id"],
    "left"
).select("match_id", "player_id", "team_id", "position")


# join back to final fct_player_match_participation table
enriched_fct_player_match_participation_df = (
    fct_player_match_participation_df
    .join(
        player_final_position_df,
        ["match_id", "player_id", "team_id"],
        "left"
    ).select(
        "match_id",
        "team_id",
        "player_id",
        "is_starting_eleven",
        "shirt_no",
        fct_player_match_participation_df.position
    )
)

In [36]:
enriched_fct_player_match_participation_df.printSchema()
enriched_fct_player_match_participation_df.show()

root
 |-- match_id: integer (nullable = true)
 |-- team_id: integer (nullable = true)
 |-- player_id: integer (nullable = true)
 |-- is_starting_eleven: boolean (nullable = false)
 |-- shirt_no: integer (nullable = true)
 |-- position: string (nullable = true)



[Stage 275:>                                                      (0 + 16) / 16]

+--------+-------+---------+------------------+--------+--------+
|match_id|team_id|player_id|is_starting_eleven|shirt_no|position|
+--------+-------+---------+------------------+--------+--------+
|  566045|    338|     2987|             false|      23|     Sub|
|  566045|    338|     4056|             false|       9|     Sub|
|  566045|    338|     8040|              true|      10|     FWR|
|  566045|    338|    14102|              true|      21|      FW|
|  566050|    331|       88|              true|      10|      FW|
|  566050|    338|     2302|              true|       8|      MC|
|  566050|    338|     9484|             false|      12|     Sub|
|  566050|    338|    13361|             false|      19|     Sub|
|  566057|    462|     9674|              true|       9|      DR|
|  789613|    409|     4871|             false|      14|     Sub|
|  789613|    409|    17108|             false|      18|     Sub|
|  789613|    409|    36399|              true|       8|     DMC|
|  789613|

                                                                                

In [74]:
write_to_silver(
    df=enriched_fct_player_match_participation_df,
    table_name="fct_player_match_participation",
    primary_keys=["match_id", "team_id", "player_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)

Table 'silver.fct_player_match_participation' does not exist. Creating new table...


                                                                                

Table successfully created and registered.


25/09/05 12:13:16 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`silver`.`fct_player_match_participation` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In [None]:
# # Assume 'new_granular_player_slots_df' contains ONLY today's new data from Auto Loader

# # --- Step 1: Read the pre-built, complete mapping table ---
# formation_map = spark.read.table("gold.dim_formation_slot_map").cache()

# # --- Step 2: Find the primary slot for players in the NEW matches ---
# window_spec = Window.partitionBy("match_id", "player_id").orderBy(col("start_minute").asc())
# new_player_primary_slots_df = (
#     new_granular_player_slots_df
#     .withColumn("appearance_rank", row_number().over(window_spec))
#     .filter(col("appearance_rank") == 1)
#     .select("match_id", "player_id", "team_id", "formation_name", "slot_id")
# )

# # --- Step 3: Join the NEW data with the COMPLETE map ---
# new_player_positions_df = new_player_primary_slots_df.join(
#     broadcast(formation_map),
#     ["formation_name", "slot_id"],
#     "left"
# ).select("match_id", "player_id", "team_id", "played_position")

# # --- Step 4: MERGE the new, enriched data into the final Gold table ---
# # Assume 'new_player_positions_df' is now joined with all other new participation facts
# new_enriched_participation_df.createOrReplaceTempView("daily_updates")

# spark.sql("""
#     MERGE INTO gold.fct_player_match_participation AS target
#     USING daily_updates AS source
#     ON target.match_id = source.match_id AND target.player_id = source.player_id
#     WHEN NOT MATCHED THEN INSERT *
# """)

In [12]:
from pyspark.sql.functions import countDistinct

# joined_formations.agg(countDistinct("formation.formationName")).show()

joined_formations.select(
    "formation.formationName"
).distinct().show(25)

In [14]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType, BooleanType
from pyspark.sql.functions import col, explode_outer, asc, desc, row_number, monotonically_increasing_id, explode, transform, filter, regexp_replace, lower, when
from pyspark.sql.window import Window

match_event_schema = StructType([
    StructField("matchCentreData", StructType([
        StructField("events", ArrayType(
            StructType([
                StructField("eventId", IntegerType(), False),
                StructField("minute", IntegerType(), False),
                StructField("second", IntegerType(), False),
                StructField("teamId", IntegerType(), False),
                StructField("playerId", IntegerType(), False),
                StructField("x", FloatType(), False),
                StructField("y", FloatType(), False),
                StructField("endX", FloatType(), True),
                StructField("endY", FloatType(), True),
                StructField("period", StructType([
                    StructField("value", IntegerType(), False),
                    StructField("displayName", StringType(), False),
                ])),
                StructField("type", StructType([
                    StructField("displayName", StringType(), False),
                ])),
                StructField("outcomeType", StructType([
                    StructField("value", IntegerType(), False),
                ])),
                StructField("qualifiers", ArrayType(
                    StructType([
                        StructField("type", StructType([
                            StructField("value", IntegerType(), False),
                            StructField("displayName", StringType(), False),
                        ])),
                        StructField("value", StringType(), True)
                    ])
                ), True),
                StructField("satisfiedEventsTypes", ArrayType(IntegerType()), True)
            ])
        ), False)
    ]))
])

fct_match_events_df = json_to_df(
    schema=match_event_schema,
    crawled_after=last_crawl_time,
    path=MATCH_PATH,
    glob_pattern="*match_data.json"
)

# exploding column "event"
fct_match_events_df = fct_match_events_df.withColumn("event", explode_outer(col("matchCentreData.events"))).drop("matchCentreData")

# Explode and filter qualifiers to get qualifiers mapping
# Example: "StandingSave" -> 178
qualifier_mapping_df = (
    fct_match_events_df
    .select("event.qualifiers")
    .filter(col("qualifiers").isNotNull())
    
    # Keep only qualifiers without values
    .select(explode("qualifiers").alias("q"))
    .filter(col("q.value").isNull())
    
    # remove dups
    .select(
        col("q.type.value").alias("qualifier_id"),
        col("q.type.displayName").alias("qualifier_name_raw")
    )
    .distinct()
    
    # to snake_case
    .withColumn(
        "qualifier_name",
        lower(regexp_replace(col("qualifier_name_raw"), r"([a-z])([A-Z])", r"$1_$2"))
    )
    .select("qualifier_name", "qualifier_id")
)


fct_match_events_df = fct_match_events_df.withColumn(
    "qualifiers_display_names",
    transform(
        # Filter the array to keep only flags
        filter(
            col("event.qualifiers"),
            lambda q: q.value.isNull()
        ),
        # Transform the filtered array to get the names
        lambda q: lower(
            regexp_replace(q.type.displayName, r"([a-z])([A-Z])", r"$1_$2")
        )
    )
).withColumn(
    "qualifiers_values",
    transform(
        filter(
            col("event.qualifiers"),
            lambda q: q.value.isNull()
        ),
        lambda q: (q.type.value)
    )
).withColumn(
    "event",
    col("event").dropFields("qualifiers")
)

# Since event_id is unique for each team per match
# We create an unique _event_id to preserve the original event order of every match
fct_match_events_df = fct_match_events_df.withColumn(
    "surrogate_event_id", monotonically_increasing_id()
)
w = Window.partitionBy("match_id").orderBy("surrogate_event_id")
fct_match_events_df = fct_match_events_df.withColumn("_event_id", row_number().over(w))

df_to_flatten = fct_match_events_df.select(
    col("surrogate_event_id"),
    col("match_id"),
    col("_event_id"),
    "event.*",
    col("qualifiers_display_names"),
    col("qualifiers_values"),
    col("league"),
    col("season"),
    col("stage_id")
)

# convert text to snake case
categorical_cols_to_clean = {
    "period_display_name",
    "type_display_name"
}

base_match_events_df = flatten_df(
    df_to_flatten, categorical_cols_to_clean
)

base_match_events_df = flatten_df(
    df_to_flatten, categorical_cols_to_clean
).withColumn(
    "is_successful", when(col("outcome_type_value") == 1, True).otherwise(False)
).drop(
    "outcome_type_value", "surrogate_event_id"
)


# exploded_match_events_df = base_match_events_df.select(
#     col("surrogate_event_id"),
#     explode(col("satisfied_events_types")).alias("event_type_id")
# )

# fct_match_events_df = fct_match_events_df.orderBy(
#     asc("match_id"),
#     asc("_event_id")
# )

25/09/05 20:30:30 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.


In [13]:
write_to_silver(qualifier_mapping_df, "qualifier_mapping")

TypeError: write_to_silver() missing 1 required positional argument: 'primary_keys'

In [15]:
import json
from pyspark.sql import Row

def get_event_type_mapping():
    event_type_mapping_file = "mapping/event_type_mapping.json"

    print(f"Reading mapping_event_type_df from file {event_type_mapping_file}")
    with open(event_type_mapping_file, "r", encoding="utf-8") as f:
        event_type_rows = json.load(f)
    
    # need the reversed
    mapping_event_type_df = spark.createDataFrame([Row(event_type_id=id, event_type=camel_to_snake(name)) for name, id in event_type_rows.items()])
    return mapping_event_type_df

mapping_event_type_df = get_event_type_mapping()
mapping_event_type_df.show(10)

Reading mapping_event_type_df from file mapping/event_type_mapping.json
+-------------+------------------+
|event_type_id|        event_type|
+-------------+------------------+
|            0| shot_six_yard_box|
|            1| shot_penalty_area|
|            2|   shot_obox_total|
|            3|    shot_open_play|
|            4|      shot_counter|
|            5|    shot_set_piece|
|            6|shot_direct_corner|
|            7|   shot_off_target|
|            8|      shot_on_post|
|            9|    shot_on_target|
+-------------+------------------+
only showing top 10 rows



In [27]:
from pyspark.sql import Row
import pyspark.sql.functions as sf

str_to_num_mapping_rows = [Row(event_type_value=camel_to_snake(v), event_type_id=k) for k, v in global_mapping.items()]
num_to_str_mapping_rows = [Row(event_type_id=k, event_type=camel_to_snake(v)) for k, v in global_mapping.items()]

global_mapping_df = spark.createDataFrame(num_to_str_mapping_rows)
global_mapping_df.show(10)

In [None]:
from pyspark.sql.functions import col

# Count total rows
total_rows = base_match_events_df.count()
half_rows = total_rows // 2

df_first_half, df_second_half = base_match_events_df.randomSplit([0.5, 0.5], seed=42)

In [16]:
from pyspark.sql import functions as sf

# Lookup map for converting array of ids into value
lookup_map_df = mapping_event_type_df.groupBy().agg(
    sf.map_from_entries(
        sf.collect_list(sf.struct("event_type_id", "event_type"))
    ).alias("map")
)

fct_match_events_df = (
    base_match_events_df.crossJoin(lookup_map_df)
    .withColumn(
        "satisfied_events_types_names",
        sf.transform(
            "satisfied_events_types",
            lambda x: sf.element_at(sf.col("map"), x)
        )
    )
    .drop("map")
)


In [17]:
from helpers.expression_helpers import CONFIG, get_select_expressions

fct_me_select_expressions = get_select_expressions(layer="silver_layer", table_name="fct_match_events")
fct_match_events_df.select(*fct_me_select_expressions)

DataFrame[match_id: int, _event_id: int, event_id: int, team_id: int, player_id: int, minute: int, second: int, period_value: int, period_display_name: string, type_display_name: string, is_successful: boolean, x: float, y: float, end_x: float, end_y: float, qualifiers_display_names: array<string>, satisfied_events_types_names: array<string>, qualifiers_values: array<int>, satisfied_events_types: array<int>, league: string, season: string, stage_id: int]

In [None]:
fct_match_events_df.count()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/dottier/big_data/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/dottier/big_data/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

[Stage 74:>                                                      (0 + 16) / 928]

In [19]:
fct_match_events_df.show(truncate=False)

25/09/05 16:18:29 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Traceback (most recent call last):5:> (0 + 0) / 928][Stage 76:>  (0 + 0) / 16]
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dottier/big_data/venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 199, in manager
    while sock.recv(1024):
          ^^^^^^^^^^^^^^^
  File "/home/dottier/big_data/venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 199, in manager
    while sock.recv(1024):
          ^^^^^^^^^^^^^^^
  File "/home/dottier/big_data/venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 199, in manager
    while sock.recv(1024):
          ^^^^^^^^^^^^^^^
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
  F

KeyboardInterrupt: 

[Stage 74:(16 + 16) / 928][Stage 75:> (0 + 0) / 928][Stage 76:>  (0 + 0) / 16]

[Stage 74:(64 + 16) / 928][Stage 75:> (0 + 0) / 928][Stage 76:>  (0 + 0) / 16]

In [18]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# Define a window over the DataFrame
w = Window.orderBy("match_id", "event_id")  # or surrogate_event_id if available

# Add row index
df_with_index = fct_match_events_df.withColumn("row_idx", row_number().over(w))

total_rows = df_with_index.count()
half_rows = total_rows // 2

df_first_half = df_with_index.filter(col("row_idx") <= half_rows).drop("row_idx")
df_second_half = df_with_index.filter(col("row_idx") > half_rows).drop("row_idx")



                                                                                

In [None]:
write_to_silver(
    df=df_first_half,
    table_name="fct_match_events",
    primary_keys=["match_id", "team_id", "event_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)




Table 'silver.fct_match_events' does not exist. Creating new table...


25/09/05 20:41:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/05 20:41:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/05 20:41:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/05 20:41:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/09/05 20:43:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/09/05 20:43:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance

In [None]:
write_to_silver(
    df=df_second_half,
    table_name="fct_match_events",
    primary_keys=["match_id", "team_id", "event_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)

In [None]:
write_to_silver(
    df=fct_match_events_df,
    table_name="fct_match_events",
    primary_keys=["match_id", "team_id", "event_id"],
    partition_by=["load_date_hour"],
    load_date_hour=JOB_RUN_DATE
)

Table 'silver.fct_match_events' does not exist. Creating new table...


25/09/05 16:46:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 76:>                                                       (0 + 16) / 56]

In [None]:
from pyspark.sql.functions import max as spark_max, min as spark_min

fct_match_events_df.filter(
    col("match_id") == 1834247
).orderBy(
    col("_event_id").asc(),
).show(n=20)

Check if match event mapping matches global_mapping

In [None]:
# think this is not really needed but can't hurt to check just in case
from pyspark.sql.types import StructType, StructField, MapType, StringType, IntegerType, MapType
from pyspark.sql.functions import broadcast

mapping_path = "/home/dottier/big_data/bronze/match_data"

mapping_schema = StructType([
    StructField("matchCentreEventTypeJson", MapType(StringType(), IntegerType()), True)
])

mapping_df = spark.read \
    .schema(mapping_schema) \
    .option("multiLine", True) \
    .option("pathGlobFilter", "*match_data.json") \
    .json(mapping_path)


exploded_mapping_df = mapping_df.select(
    "match_id", "league", "season", "stage_id",
    explode("matchCentreEventTypeJson").alias("event_type", "event_type_id")
)
exploded_mapping_df.printSchema()

joined = exploded_mapping_df.join(
    broadcast(global_mapping_df),
    exploded_mapping_df["event_type"] == global_mapping_df["event_type"],
    how="left"
).select(
    exploded_mapping_df["*"],
    global_mapping_df["event_type_id"].alias("global_value")
)

mismatches = joined.filter(
    (col("event_type_id") != col("global_value")) | col("global_value").isNull()
)

if mismatches.limit(1).count() > 0:
    mismatches.show(truncate=False)
    raise ValueError("❌ Found mismatches in matchCentreEventTypeJson")
else:
    print("✅ All matchCentreEventTypeJson match the global mapping.")


In [None]:
from pyspark.sql import functions as sf

# make it a list of tuple
df = spark.createDataFrame([("4 : 1",), ("2 : 1",), ("5 : 0",)], ["score"])
df.show()

df.select(sf.regexp_extract(sf.split(sf.col("score"), ":").getItem(0))).show()

In [None]:
from pyspark.sql import functions as sf

# make it a list of tuple
df = spark.createDataFrame([(1, 2, 3, 4), (5, 6, 7, 8)], ["val1", "val2", "val3", "val4"])
df.show()


# flattened_df = df.select(sf.explode(sf.array("val1", "val2")).alias("value"), sf.col("val3"))
# flattened_df.show()

In [None]:
from pyspark.sql import functions as sf

# make it a list of tuple
df = spark.createDataFrame([([1, 2, 3],), ([4, 5, 6],)], ["val1"])
df.show()


# flattened_df = df.select(sf.explode(sf.array("val1", "val2")).alias("value"), sf.col("val3"))
# flattened_df.show()

In [None]:
map_df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c'),
                                (4, 'd'), (5, 'e'), (6, 'f')], ["key", "val"])
map_df.show()

In [None]:
from pyspark.sql import functions as sf

# map_df: (key:int, val:string)
# df: val1: array<int>

# 1) Aggregate map_df into a single-row map column
lookup_map_df = map_df.groupBy().agg(
    sf.map_from_entries(sf.collect_list(sf.struct("key", "val"))).alias("m")
)

lookup_map_df.show()

# 2) Cross join once to make the map available, then transform the array
result_df = (
    df.crossJoin(sf.broadcast(lookup_map_df))
      .withColumn("val1", sf.transform("val1", lambda x: sf.element_at(sf.col("m"), x)))
      .select(df['*'])
)

result_df.show(truncate=False)


In [None]:
def mapping(key):
    return global_mapping.get(key, -1)

print(mapping(1))

In [None]:
from pyspark.sql.functions import transform, lit
df.select(transform("val1", lambda x: lit(global_mapping[x]))).show()

In [None]:
import json

base_path = "/home/dottier/big_data/bronze/season_data/league=germany_bundesliga/season=2023_2024/season_info.json"

with open(base_path, "r", encoding="utf-8") as f:
    json_object = json.load(f)
    print(json_object)

In [None]:
from pyspark.sql import functions as F

mapping = {0: "a", 1: "b", 2: "c"}

# Your DataFrame has a column "nums" like: [0, 1, 2]
df = spark.createDataFrame(
    [([0, 1, 2],), ([1, 0],)],
    ["nums"]
)

# Transform each element in the array
df = df.withColumn(
    "letters",
    F.expr(f"transform(nums, x -> '{mapping[0]}' if x = 0 else '{mapping[1]}' if x = 1 else '{mapping[2]}')")
)

df.show(truncate=False)


In [None]:
from pyspark.sql import Row
import pyspark.sql.functions as sf

mapping_rows = [Row(event_type_id=k, event_type=v) for k, v in global_mapping.items()]

global_mapping_df = spark.createDataFrame(mapping_rows)
global_mapping_df = global_mapping_df.groupBy().agg(
    sf.map_from_entries(sf.collect_list(sf.struct("event_type_id", "event_type"))).alias("map")
)

fct_match_events_df = (
    fct_match_events_df
    .crossJoin(sf.broadcast(global_mapping_df))
    .withColumn("satisfied_events_types", sf.transform("satisfied_events_types", lambda x: sf.element_at(sf.col("map"), x)))
    .drop("map")
)

In [None]:
fct_match_events = read_from_silver("fct_match_events")

In [None]:


events_for_join = fct_match_events_df.select(
    "match_id",
    "_event_id",
    "minute",
    "second",
    "type_display_name",
    "outcome_type_display_name",
    "player_id",
    "team_id",
    "qualifiers_list"
)

event1 = events_for_join.alias("event1")
event2 = events_for_join.alias("event2")

join_conditions = [
    col("event1.match_id") == col("event2.match_id"), # Must be in the same match
    col("event1.minute") == col("event2.minute"),
    col("event1.second") == col("event2.second"),
    col("event1.team_id") < col("event2.team_id"),
    col("event1._event_id") < col("event2._event_id")
]

event_pairs_df = event1.join(event2, on=join_conditions, how="inner")

print(event_pairs_df.count())
event_pairs_df.show()

In [None]:
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

# Define a window that partitions by match and orders by our _event_id
# This puts the events in the sequence we want to test.
window_spec = Window.partitionBy("match_id").orderBy("_event_id")

# Create columns for the minute and second of the *previous* event
events_with_previous_time = fct_match_events.withColumn(
    "previous_minute", lag("minute").over(window_spec)
).withColumn(
    "previous_second", lag("second").over(window_spec)
).withColumn(
    "previous_period", lag("period_value").over(window_spec)
)

# Now, find the rows where the time goes "backwards".
# This is our alarm for incorrect ordering.
ordering_errors = events_with_previous_time.filter(
    (col("minute") < col("previous_minute")) | 
    ((col("minute") == col("previous_minute")) & (col("second") < col("previous_second")))
).filter(
    col("previous_period") == col("period_value")
)

# The assertion is a simple count.
error_count = ordering_errors.count()

if error_count > 0:
    print(f"DATA QUALITY FAILED: Found {error_count} events that are out of chronological order!")
    # Show the specific events that broke the rule
    ordering_errors.select(
        "match_id", "_event_id", "minute", "second", "previous_minute", "previous_second"
    ).show()
else:
    print("DATA QUALITY PASSED: Event order is chronologically correct.")

In [None]:
from pyspark.sql.functions import array_contains, col

fct_match_events.filter(
    col("match_id") == 959673
    # (col("_event_id") == 1) &
    # (col("type_display_name") != "Start") &
    # (col("second").isNotNull())
    # (array_contains(col("qualifiers_list"), "redCard"))
).orderBy(
    "_event_id"
).show(n=10000)