In [0]:
# Import pysparksql
from pyspark.sql.functions import * 

In [0]:
# Read data from the Bronze table
df_silver = spark.read.table("football_matches_catalog.bronze.football_matches")

In [0]:
df_silver.printSchema()

In [0]:
display(df_silver.tail(10))

In [0]:
# Convert Match_ID, FTHG, and FTAG to integer
# Perform data transformations
df_silver = df_silver.select(
    col("Match_ID").cast("int").alias("Match_ID"),
    col("Div"),
    split(col("Season"), "-").getItem(1).cast("int").alias("Season"),
    to_date(col("Date"), "dd/MM/yyyy").alias("Date"),
    col("HomeTeam"),
    col("AwayTeam"),
    col("FTHG").cast("int").alias("HomeTeamGoals"),
    col("FTAG").cast("int").alias("AwayTeamGoals"),
    col("FTR").alias("FinalResult")
)


In [0]:
display(df_silver)

In [0]:
# Show columns and their data types
df_silver.printSchema()

In [0]:
# Filter the table to keep only matches where Div = 'D1' meaning Bundesliga first division
df_silver = df_silver.filter(col("Div") == "D1")
display(df_silver.count())

In [0]:
# Save the DataFrame as a Delta table in the Silver layer, partitioned by Season
(
    df_silver.write.format("delta") 
    .mode("overwrite") 
    .option("overwriteSchema", "true") 
    .partitionBy("Season") 
    .saveAsTable("football_matches_catalog.silver.football_matches")
)