In [0]:
import dlt
import pyspark.sql.functions as F
import json
access_key = "123"
secret_key = "456"
spark.conf.set("fs.s3a.access.key", access_key)
spark.conf.set("fs.s3a.secret.key", secret_key)
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
file_location = "s3a://dbdata244/football_data.csv"
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

In [0]:
#Bronze Layer

@dlt.table
def bronze_football2():
    #This is RAW Ingestion data, adding only processing_time, source_file columns
    df = spark.read.format(file_type) \
        .option("header", first_row_is_header) \
        .option("quote", '"') \
        .option("escape", '"') \
        .option("multiline", "true") \
        .option("inferSchema",infer_schema) \
        .load(file_location)
    df = df.select(F.current_timestamp().alias("processing_time"), F.input_file_name().alias("source_file"), "*")
    return df

Name,Type
processing_time,timestamp
source_file,string
Unique_ID,string
Date,date
JSON_Data,string
Match_Performance,string
Injury_Status,boolean
Player_Position,string
Team_Name,string
Goals_Scored,int


In [0]:
@dlt.table()
#@dlt.expect_or_drop("valid_date", F.col("date") > "2023-07-07")
def silver_football2():
    # Constraints : I want to load data fro only rows where date is > 2023-07-07
    # I want flattened data like this:
    # Unique_ID, Date in (MMM d, yyyy), Player_name, Position, is_captain, goals, assists, team_name, home_stadium, match_performance, injury_status (from json_data columns)...
    #... ignore other columns
    sdf = dlt.read_stream("bronze_football2")
    sdf = sdf.withColumn("service_date", F.date_format(sdf['Date'], "MMM d, yyyy"))
    sdf = sdf.select(
        F.col('Unique_ID'),
        F.get_json_object(F.col("json_data"), "$.player_name").alias("player_name"),
        F.get_json_object(F.col("json_data"), "$.stats.position").alias("position"),
        F.get_json_object(F.col("json_data"), "$.is_captain").alias("is_captain"),
        F.get_json_object(F.col("json_data"), "$.stats.goals").cast("int").alias("goals"),
        F.get_json_object(F.col("json_data"), "$.stats.assists").cast("int").alias("assists"),
        F.get_json_object(F.col("json_data"), "$.stats.team.name").alias("team_name"),
        F.get_json_object(F.col("json_data"), "$.stats.team.stadium").alias("home_Stadium"),
        F.col('match_performance'),
        F.col('injury_status'),
        F.col('service_date'),
        F.col('processing_time'),
        F.col('source_file')
        )
    return sdf

Name,Type
Unique_ID,string
player_name,string
position,string
is_captain,string
goals,int
assists,int
team_name,string
home_Stadium,string
match_performance,string
injury_status,boolean


In [0]:
@dlt.table
def gold_football2():
    #i need Sum of goals, assists, player with most goals, player with most assists, total_captain goals, total_captain assists, in one row
        sdf = dlt.read("silver_football2")
        aggregated_data = sdf.select(
                F.col("goals").cast("int"),
                F.col("assists").cast("int"),
                F.col("is_captain"),
                F.col("player_name")
            ).agg(
                F.sum("goals").alias("total_goals"),
                F.sum("assists").alias("total_assists"),
                F.max(F.struct(F.col("goals"), F.col("player_name"))).alias("max_goals"),
                F.max(F.struct(F.col("assists"), F.col("player_name"))).alias("max_assists"),
                F.sum(F.when(F.col("is_captain") == "true", F.col("goals"))).alias("total_captain_goals"),
                F.sum(F.when(F.col("is_captain") == "true", F.col("assists"))).alias("total_captain_assists")
            )
        return aggregated_data

Name,Type
total_goals,bigint
total_assists,bigint
max_goals,"struct<goals:int,player_name:string>"
max_assists,"struct<assists:int,player_name:string>"
total_captain_goals,bigint
total_captain_assists,bigint
