FACT TABLE


In [0]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

In [0]:
dim_time = spark.read.option("header", True).csv("/mnt/mount2/dim_time")

In [0]:
dim_club = spark.read.option("header", True).csv("/mnt/mount2/dim_club")
dim_country = spark.read.option("header", True).csv("/mnt/mount2/dim_country")
dim_match_type = spark.read.option("header", True).csv("/mnt/mount2/dim_match_type")
dim_player = spark.read.option("header", True).csv("/mnt/mount2/dim_player")

In [0]:
dim_league = spark.read.option("header", True).csv("/mnt/mount2/dim_league")

In [0]:
df_src1 = spark.read.option("header", True).csv("/mnt/mount1/Hockey_Src1.csv")
df_src2 = spark.read.option("header", True).csv("/mnt/mount1/Hockey_Src2.csv")
df_date = spark.read.option("header", True).csv("/mnt/mount1/DIM.Date.Table.csv")

df_src1.show(3)
df_src2.show(3)
df_date.show(3)

+----+--------+-------------+-----------+---------+------------+--------+---------+---------+-----------+-----------+------------+------------+-----------+---------------+----------+--------------+-----------+--------+-----------+-----------------------+-------------+------------------+---------+
|Year|   Month|        Match|Player Name|Club Name|country Name|Position|Jersey_No| D. O. B.|Nationality|appearances|goals scored|goals assist|total shots|shots on target|fouls made|fouls suffered|yellow card|red card|goals saved|goals conceded(stopped)|total penalty|successful penalty|   salary|
+----+--------+-------------+-----------+---------+------------+--------+---------+---------+-----------+-----------+------------+------------+-----------+---------------+----------+--------------+-----------+--------+-----------+-----------------------+-------------+------------------+---------+
|2010| January|International|     Adrian|       NA| Netherlands|      LW|       88|30-Mar-85|Netherlands| 

In [0]:
# Join with Time
fact_player = df_src1.join(dim_time.select("Time_ID", "Year"), on="Year", how="left")

# Join with Match Type
fact_player = fact_player.join(dim_match_type.select("Match_Type_ID", dim_match_type["Match_Name"].alias("Match")),
                               on="Match", how="left")

# Join with Player
fact_player = fact_player.join(dim_player.select("Player_ID", "Player Name"), on="Player Name", how="left")

# Join with Club
fact_player = fact_player.join(dim_club.select("Club_ID", dim_club["Club_Name"].alias("Club Name")),
                               on="Club Name", how="left")

# Join with Country
fact_player = fact_player.join(dim_country.select("Country_ID", dim_country["Country_Name"].alias("country Name")),
                               on="country Name", how="left")

# Select required columns + Generate Fact_ID
window_spec = Window.orderBy("Player_ID")

fact_player = fact_player.withColumn("Fact_ID", row_number().over(window_spec)).select(
    "Fact_ID", "Time_ID", "Match_Type_ID", "Player_ID", "Club_ID", "Country_ID",
    col("appearances").alias("Appearances"),
    col("goals scored").alias("Goals_Scored"),
    col("goals assist").alias("Goals_Assist"),
    col("total shots").alias("Total_Shots"),
    col("shots on target").alias("Shots_on_Target"),
    col("fouls made").alias("Fouls_Made"),
    col("fouls suffered").alias("Fouls_Suffered"),
    col("yellow card").alias("Yellow_Card"),
    col("red card").alias("Red_Card"),
    col("goals saved").alias("Goals_Saved"),
    col("goals conceded(stopped)").alias("Goals_Conceded"),
    col("total penalty").alias("Total_Penalty"),
    col("successful penalty").alias("Successful_Penalty"),
    col("salary").alias("Salary")
)

# Write to ADLS
fact_player.write.option("header", True).mode("overwrite").csv("/mnt/mount2/fact_player_statistics")

In [0]:
# Join with Time
fact_team = df_src2.join(dim_time.select("Time_ID", "Year"), on="Year", how="left")

# Match Type
fact_team = fact_team.join(dim_match_type.select("Match_Type_ID", dim_match_type["Match_Name"]),
                           on="Match_Name", how="left")

# League
fact_team = fact_team.join(dim_league.select("League_ID", "League_Name"), on="League_Name", how="left")

# Club
fact_team = fact_team.join(dim_club.select("Club_ID", "Club_Name"), on="Club_Name", how="left")

# Country
#fact_team = fact_team.join(dim_country.select("Country_ID", "Country_Name"), on="Country_Name", how="left")
fact_team = fact_team.withColumnRenamed("Country Name", "Country_Name") \
                     .join(dim_country.select("Country_ID", "Country_Name"), on="Country_Name", how="left")

# Select required columns + Generate Fact_ID
window_spec = Window.orderBy("Club_ID")

fact_team = fact_team.withColumn("Fact_ID", row_number().over(window_spec)).select(
    "Fact_ID", "Time_ID", "Match_Type_ID", "League_ID", "Club_ID", "Country_ID",
    col("appearances").alias("Appearances"),
    col("wins").alias("Wins"),
    col("losts").alias("Lost"),
    col("drawn").alias("Drawn"),
    col("clean sheets").alias("Clean_Sheets"),
    col("Net Worth").alias("Net_Worth")
)

# Write to ADLS
fact_team.write.option("header", True).mode("overwrite").csv("/mnt/mount2/fact_team_statistics")