In [4]:
from pyspark.sql.functions import  col, input_file_name, monotonically_increasing_id, row_number, lit, current_timestamp, dayofmonth, dayofweek, dayofyear, month, year, weekofyear, date_format, concat_ws, sha2
from datetime import datetime, timedelta
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from delta.tables import *
import ast

In [3]:
# Parameters from pipeline
years=""

In [5]:
year_list=ast.literal_eval(years)

In [6]:
# Paths Silver
pathWSRSilver="abfss://silver@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/world-standing-riders/"
pathEventsSilver="abfss://silver@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/events/"
pathFRSilver="abfss://silver@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/full-results/"

# Paths Gold
pathRaceFacts="abfss://gold@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/races/"
pathEventsDim="abfss://gold@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/events/"
pathRidersDim="abfss://gold@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/riders/"
pathCircuitsDim="abfss://gold@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/circuits/"
pathTeamsDim="abfss://gold@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/teams/"
pathStandingsDim="abfss://gold@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/standings/"
pathDateDim="abfss://gold@datalakemotogp.dfs.core.windows.net/api-racing-mike/motogp/date/"

# Gold Paths Dictionary
paths_dict = {
    "pathRaceFacts": pathRaceFacts,
    "pathEventsDim": pathEventsDim,
    "pathRidersDim": pathRidersDim,
    "pathCircuitsDim": pathCircuitsDim,
    "pathTeamsDim": pathTeamsDim,
    "pathStandingsDim": pathStandingsDim
}

### ***Common functions***

In [7]:
def add_load_date_to_df(df)->DataFrame:
    if "load_date" not in df.columns:
        df=df.withColumn("load_date", current_timestamp())
    return df

In [8]:
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")

### ***Race Facts***

In [9]:
def process_race_facts():
    df=spark.read.format("delta").load(pathFRSilver)
    dfRace=df.select("event_id","result_id","rider_id","circuit_id","team_id",\
        "position","points","time","gap_first","gap_lap","total_laps","average_speed","rider_type",\
        "ground_condition","humidity_condition","weather_condition","track_condition","date","month","year")
    dfRace=dfRace.withColumn("pk_race_id",sha2(concat_ws("_", "event_id", "result_id", "rider_id", "circuit_id", "team_id"), 256))\
        .withColumnRenamed("event_id","fk_event_id").withColumnRenamed("result_id","fk_result_id").withColumnRenamed("rider_id","fk_rider_id")\
        .withColumnRenamed("circuit_id","fk_circuit_id").withColumnRenamed("team_id","fk_team_id")
    dfRace=dfRace.dropDuplicates(["pk_race_id"])
    dfRace=add_load_date_to_df(dfRace)
    try:
        race_gold=DeltaTable.forPath(spark,pathRaceFacts)
        update_columns = {col: f"new.{col}" for col in dfRace.columns if col != "load_date"}
        race_gold.alias("existing").merge(dfRace.alias("new"),\
            "existing.pk_race_id=new.pk_race_id")\
            .whenMatchedUpdate(set=update_columns).whenNotMatchedInsertAll().execute()
    except Exception as e:
        if 'not a Delta table' in str(e):
            dfRace.write.format("delta").save(pathRaceFacts)
        else:
            raise e

### ***Event Dimension***

In [10]:
def process_event_dim():
    df=spark.read.format("delta").load(pathEventsSilver)
    dfEvents=df.select("event_id","event_name","short_name","sponsored_name","date_start","date_end","country_iso","country_name","year")
    dfEvents=dfEvents.withColumnRenamed("event_id","pk_event_id")
    dfEvents=dfEvents.dropDuplicates(["pk_event_id"])
    dfEvents=add_load_date_to_df(dfEvents)
    try:
        events_gold=DeltaTable.forPath(spark,pathEventsDim)
        update_columns = {col: f"new.{col}" for col in dfEvents.columns if col != "load_date"}
        events_gold.alias("existing").merge(dfEvents.alias("new"),\
            "existing.pk_event_id = new.pk_event_id")\
            .whenMatchedUpdate(set=update_columns).whenNotMatchedInsertAll().execute()
    except Exception as e:
        if 'not a Delta table' in str(e):
            dfEvents.write.format("delta").save(pathEventsDim)
        else:
            raise e  

### ***Riders Dimension***

In [11]:
def process_riders_dim():
    df=spark.read.format("delta").load(pathFRSilver)
    dfRiders=df.select("rider_id","rider_full_name","rider_number","rider_country_iso","rider_country_name","years_old",\
        "birth_date","birth_city","profile_picture_url","portrait_picture_url","number_picture_url","helmet_picture_url","country_flag_url") 
    dfRiders=dfRiders.withColumnRenamed("rider_id","pk_rider_id")
    dfRiders=dfRiders.dropDuplicates(["pk_rider_id"])
    dfRiders=add_load_date_to_df(dfRiders)
    try:
        riders_gold=DeltaTable.forPath(spark,pathRidersDim)
        update_columns = {col: f"new.{col}" for col in dfRiders.columns if col != "load_date"}
        riders_gold.alias("existing").merge(dfRiders.alias("new"),\
            "existing.pk_rider_id = new.pk_rider_id")\
            .whenMatchedUpdate(set=update_columns).whenNotMatchedInsertAll().execute()
    except Exception as e:
        if 'not a Delta table' in str(e):
            dfRiders.write.format("delta").save(pathRidersDim)
        else:
            raise e 

### ***Circuits Dimension***

In [12]:
def process_circuits_dim():
    df=spark.read.format("delta").load(pathFRSilver)
    dfCircuits=df.select("circuit_id","circuit_name","circuit_nation","circuit_place","circuit_country_iso","circuit_country_name")
    dfCircuits=dfCircuits.withColumnRenamed("circuit_id","pk_circuit_id")
    dfCircuits=dfCircuits.drop_duplicates(["pk_circuit_id"])
    dfCircuits=add_load_date_to_df(dfCircuits)
    try:
        circuits_gold=DeltaTable.forPath(spark,pathCircuitsDim)
        update_columns = {col: f"new.{col}" for col in dfCircuits.columns if col != "load_date"}
        circuits_gold.alias("existing").merge(dfCircuits.alias("new"),\
            "existing.pk_circuit_id = new.pk_circuit_id")\
        .whenMatchedUpdate(set=update_columns).whenNotMatchedInsertAll().execute()
    except Exception as e:
        if 'not a Delta table' in str(e):
            dfCircuits.write.format("delta").save(pathCircuitsDim)
        else:
            raise e 

### ***Teams Dimension***

In [13]:
def process_teams_dim():
    df=spark.read.format("delta").load(pathFRSilver)
    dfTeams=df.select("team_id","team_name","team_color","sponsored_team","constructor_name","picture_url","bike_picture_url")
    dfTeams=dfTeams.withColumnRenamed("team_id","pk_team_id")
    dfTeams=dfTeams.dropDuplicates(["pk_team_id"])
    dfTeams=add_load_date_to_df(dfTeams)
    try:
        teams_gold=DeltaTable.forPath(spark,pathTeamsDim)
        update_columns = {col: f"new.{col}" for col in dfTeams.columns if col != "load_date"}
        teams_gold.alias("existing").merge(dfTeams.alias("new"),\
            "existing.pk_team_id = new.pk_team_id")\
        .whenMatchedUpdate(set=update_columns).whenNotMatchedInsertAll().execute()
    except Exception as e:
        if 'not a Delta table' in str(e):
            dfTeams.write.format("delta").save(pathTeamsDim)
        else:
            raise e 

### ***Date Dimension***

In [14]:
def process_date_dim():
    start_date=datetime(year_list[0],1,1)
    end_date=datetime(year_list[-1],12,31)
    date_array = [(start_date + timedelta(days=x)).strftime("%Y-%m-%d") for x in range(0, (end_date-start_date).days + 1)]
    df=spark.createDataFrame(date_array,"string").toDF("date")
    df=df.withColumn("date",col("date").cast("date"))
    df = df.withColumn("day", dayofmonth(col("date")))
    df = df.withColumn("week", weekofyear(col("date")))
    df = df.withColumn("month", month(col("date")))
    df = df.withColumn("year", year(col("date")))
    df = df.withColumn("day_of_week", dayofweek(col("date")))
    df = df.withColumn("day_of_year", dayofyear(col("date")))
    df = df.withColumn("quarter", functions.quarter(col("date")))
    df = df.withColumn("week_day_name", date_format(col("date"), "EEEE"))
    df = df.withColumn("month_name", date_format(col("date"), "MMMM")).orderBy("date")
    df=df.dropDuplicates(["date"])
    try:
        date_gold=DeltaTable.forPath(spark,pathDateDim)
        date_gold.alias("existing").merge(df.alias("new"),\
            "existing.date = new.date")\
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    except Exception as e:
        if 'not a Delta table' in str(e):
            df.write.format("delta").save(pathDateDim)
        else:
            raise e

### ***Standing Facts***

In [15]:
#se debe ejecutar despues de races y  riders 
def process_standing_facts():
    #WSR join with Riders to make Standing
    df=spark.read.format("delta").load(pathWSRSilver)
    dfRiders=spark.read.format("delta").load(pathRidersDim)
    dfStand=df.alias("stand").join(dfRiders.alias("riders"),on="rider_full_name",how="left")\
        .select("stand.classification_id","riders.pk_rider_id","stand.position","stand.points","stand.year")
        
    dfStand=dfStand.withColumnRenamed("classification_id","pk_classification_id").withColumnRenamed("pk_rider_id","fk_rider_id")\
        .drop("rider_full_name")

    #Standing join with Races to get that year's team_id (same Rider and same Year, gettin max teamid in case there was more than 1)
    dfRace=spark.read.format("delta").load(pathRaceFacts)
    dfTPS=dfRace.groupBy("fk_rider_id","year").agg(functions.max("fk_team_id").alias("team_id"))
    dfStand=dfStand.alias("stand").join(dfTPS.alias("teams"),(dfStand["fk_rider_id"]==dfTPS["fk_rider_id"]) & (dfStand["year"]==dfTPS["year"]),
    how="inner").select("stand.*","teams.team_id").withColumnRenamed("team_id","fk_team_id")

    dfStand=dfStand.dropDuplicates(["pk_classification_id"])
    dfStand=add_load_date_to_df(dfStand)
    try:
        standing_gold=DeltaTable.forPath(spark,pathStandingsDim)
        update_columns = {col: f"new.{col}" for col in dfStand.columns if col != "load_date"}
        standing_gold.alias("existing").merge(dfStand.alias("new"),\
            "existing.pk_classification_id = new.pk_classification_id")\
        .whenMatchedUpdate(set=update_columns).whenNotMatchedInsertAll().execute()
    except Exception as e:
        if 'not a Delta table' in str(e):
            dfStand.write.format("delta").save(pathStandingsDim)
        else:
            raise e

### ***Final process of facts and dimensions***

In [16]:
process_race_facts()
process_event_dim()
process_riders_dim()
process_circuits_dim()
process_teams_dim()
process_date_dim()

In [17]:
process_standing_facts()