In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, date_format


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, date_format


In [3]:
spark = SparkSession.builder.config("spark.jars", "/Drivers/SQL_Sever/jdbc/postgresql-42.7.3.jar")\
.getOrCreate()


## Loading Data From Postgres

In [4]:
tbl_list = ['arsenalmatches', 'arsenalplayers','arsenalgk']
dataframe = {}
for table in tbl_list:
   df = spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"{table}") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .load()
   dataframe[table] = df

In [5]:
dataframe.keys()

dict_keys(['arsenalmatches', 'arsenalplayers', 'arsenalgk'])

In [6]:
print(dataframe)

{'arsenalmatches': DataFrame[Season: string, Tour: string, Date: string, Time: string, Opponent: string, HoAw: string, ArsenalScore: string, OpponentScore: string, Stadium: string, Attendance: string, Coach: string, Referee: string], 'arsenalplayers': DataFrame[LastName: string, FirstName: string, Date: string, Start: string, Pos: string, Min: string, G: string, A: string, PK: string, PKA: string, S: string, SoT: string, YK: string, RK: string, Touches: string, Tackles: string, Ints: string, Blocks: string, xG: string, npxG: string, xAG: string, Passes: string, PassesA: string, PrgPas: string, Carries: string, PrgCar: string, Line: string, C: string], 'arsenalgk': DataFrame[LastName: string, FirstName: string, Date: string, Start: string, Pos: string, Min: string, SoTA: string, GA: string, Saves: string, PSxG: string, PKatt: string, PKA: string, PKm: string, PassAtt: string, Throws: string, AvgLen: string, GKAtt: string, GKAvgLen: string, C: string]}


In [7]:
Matches = dataframe['arsenalmatches']
Matches.columns

['Season',
 'Tour',
 'Date',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee']

In [8]:
Players = dataframe['arsenalplayers']
Players.columns

['LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C']

In [10]:
GoalKeepers = dataframe['arsenalgk']
GoalKeepers.columns


['LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C']

# Transforming Arsenal Matches Data

In [11]:
Matches.createOrReplaceTempView("Matches")

In [12]:
distinct_matches = spark.sql ("""
select Count(distinct Date) from Matches

""").show()

+--------------------+
|count(DISTINCT Date)|
+--------------------+
|                 214|
+--------------------+



#### Making sure that all matches are unique for each date

In [13]:
matches = spark.sql ("""
select Count(Date) from Matches

""").show()

+-----------+
|count(Date)|
+-----------+
|        642|
+-----------+



#### Know the Max and Min date to create the DimDate Later

In [14]:
Dates = spark.sql ("""
select Max(Date) as Max_date , MIN(Date) as Min_Date from Matches

""").show()

+----------+----------+
|  Max_date|  Min_Date|
+----------+----------+
|2023-02-25|2017-08-11|
+----------+----------+



##### Adding MatchID column to Matches DataFrame

In [15]:
DimMatch= Matches.withColumn("MatchID", monotonically_increasing_id())

In [16]:
DimMatch.columns

['Season',
 'Tour',
 'Date',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee',
 'MatchID']

In [17]:
# DimMatch = DimMatch.drop('ArsenalScore',
#  'OpponentScore',)

In [18]:
# DimMatch.show()

In [19]:
# For DimMatch with original format yyyy-M-d
DimMatch = DimMatch.withColumn("FormattedDate", date_format(to_date("Date", "yyyy-M-d"), "yyyy-MM-dd"))

In [20]:
# DimMatch.show()

#### Loading the DimMatch for DWH Schema in ArsenalFC Database in Post

In [21]:
DimMatch.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimArsenalMatches") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


In [22]:
from pyspark.sql.functions import to_date, date_format

# Assuming `DimMatch` is already loaded with the schema you provided

# Update the `FormattedDate` in DimMatch to ensure it's in "yyyy-MM-dd" format
# Adjust the format in to_date() according to the actual format of your 'Date' string if needed
DimMatch = DimMatch.withColumn("FormattedDate", date_format(to_date("Date", "yyyy-MM-dd"), "yyyy-MM-dd"))


In [23]:
DimMatch.schema

StructType([StructField('Season', StringType(), True), StructField('Tour', StringType(), True), StructField('Date', StringType(), True), StructField('Time', StringType(), True), StructField('Opponent', StringType(), True), StructField('HoAw', StringType(), True), StructField('ArsenalScore', StringType(), True), StructField('OpponentScore', StringType(), True), StructField('Stadium', StringType(), True), StructField('Attendance', StringType(), True), StructField('Coach', StringType(), True), StructField('Referee', StringType(), True), StructField('MatchID', LongType(), False), StructField('FormattedDate', StringType(), True)])

# Transforming Players Data & Creating the DimPlayers and Fact Players

In [24]:
Players.createOrReplaceTempView("Players")

#### Addomg fullname column to Players Table 

In [25]:
spark.sql("""
    select concat(firstname, " ", lastname) as fullname
    from Players

""").show()

+-------------------+
|           fullname|
+-------------------+
|    Hector Bellerin|
|     Mohamed Elneny|
|        Rob Holding|
|     Sead Kolasinac|
|Alexandre Lacazette|
|      Nacho Monreal|
|         Mesut Ozil|
|      Danny Welbeck|
|       Granit Xhaka|
|       Aaron Ramsey|
|    Hector Bellerin|
|     Sead Kolasinac|
|Alexandre Lacazette|
|      Nacho Monreal|
|   Shkodran Mustafi|
|         Mesut Ozil|
|       Aaron Ramsey|
|      Danny Welbeck|
|       Granit Xhaka|
|         Alex Iwobi|
+-------------------+
only showing top 20 rows



#### Checking all Distinct Players

In [26]:
distinct_players= spark.sql("""
    select distinct concat(firstname, " ", lastname) as fullname
    from Players

""")
distinct_players.show(10)

+----------------+
|        fullname|
+----------------+
|Emile Smith Rowe|
| Folarin Balogun|
| Hector Bellerin|
|     Joe Willock|
|  William Saliba|
|    Aaron Ramsey|
|     Bukayo Saka|
|  Kieran Tierney|
|Shkodran Mustafi|
| Daniel Ceballos|
+----------------+
only showing top 10 rows



#### Count the distinct date to check if it matche with DimMatches or not

In [27]:
players_Dates= spark.sql("""
    select count(distinct Date) 
    from players

""")
players_Dates.show()

+--------------------+
|count(DISTINCT Date)|
+--------------------+
|                 214|
+--------------------+



#### Adding New column to distinct Players as PlayerID

In [28]:
distinct_players= distinct_players.withColumn("PlayerID", monotonically_increasing_id())

#### Adding fullname column to Player so we can create the DimPlayers joinig it with Distinct_Player Table

In [29]:
Players= Players.withColumn('fullname', concat_ws(" ", col('FirstName'),col('LastName')))
Players.select("fullname").show(5, False)

+-------------------+
|fullname           |
+-------------------+
|Hector Bellerin    |
|Mohamed Elneny     |
|Rob Holding        |
|Sead Kolasinac     |
|Alexandre Lacazette|
+-------------------+
only showing top 5 rows



#### Creating DimPlayers

In [30]:
DimPlayers= Players.join(distinct_players, on ='fullname', how="inner")
DimPlayers.columns

['fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C',
 'PlayerID']

In [31]:
# For DimPlayers with original format M/d/yyyy
DimPlayers = DimPlayers.withColumn("FormattedDate", date_format(to_date("Date", "M/d/yyyy"), "yyyy-MM-dd"))


In [32]:
DimMatch.show(1)

+-------+----+----------+--------+---------+----+------------+-------------+----------------+----------+-------------+---------+-------+-------------+
| Season|Tour|      Date|    Time| Opponent|HoAw|ArsenalScore|OpponentScore|         Stadium|Attendance|        Coach|  Referee|MatchID|FormattedDate|
+-------+----+----------+--------+---------+----+------------+-------------+----------------+----------+-------------+---------+-------+-------------+
|2017/18|   1|2017-08-11|20:45:00|Leicester|home|           4|            3|Emirates Stadium|     59387|Arsène Wenger|Mike Dean|      0|   2017-08-11|
+-------+----+----------+--------+---------+----+------------+-------------+----------------+----------+-------------+---------+-------+-------------+
only showing top 1 row



#### Creating the FactPlayers

In [33]:
# Convert 'Date' column from string to date type in DimMatch
DimMatch = DimMatch.withColumn("Date", to_date("Date", "yyyy-M-d"))

# Convert 'Date' column from string to date type in DimPlayers
DimPlayers = DimPlayers.withColumn("Date", to_date("Date", "yyyy-M-d"))


In [34]:
DimPlayers.schema

StructType([StructField('fullname', StringType(), False), StructField('LastName', StringType(), True), StructField('FirstName', StringType(), True), StructField('Date', DateType(), True), StructField('Start', StringType(), True), StructField('Pos', StringType(), True), StructField('Min', StringType(), True), StructField('G', StringType(), True), StructField('A', StringType(), True), StructField('PK', StringType(), True), StructField('PKA', StringType(), True), StructField('S', StringType(), True), StructField('SoT', StringType(), True), StructField('YK', StringType(), True), StructField('RK', StringType(), True), StructField('Touches', StringType(), True), StructField('Tackles', StringType(), True), StructField('Ints', StringType(), True), StructField('Blocks', StringType(), True), StructField('xG', StringType(), True), StructField('npxG', StringType(), True), StructField('xAG', StringType(), True), StructField('Passes', StringType(), True), StructField('PassesA', StringType(), True), 

In [35]:
FactPlayers = DimMatch.join(DimPlayers, on='FormattedDate', how= 'left')
FactPlayers.show()


+-------------+-------+----+----------+--------+---------+----+------------+-------------+----------------+----------+-------------+---------+-------+-------------------+---------+---------+----+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----------+---+--------+
|FormattedDate| Season|Tour|      Date|    Time| Opponent|HoAw|ArsenalScore|OpponentScore|         Stadium|Attendance|        Coach|  Referee|MatchID|           fullname| LastName|FirstName|Date|Start|Pos|Min|  G|  A| PK|PKA|  S|SoT| YK| RK|Touches|Tackles|Ints|Blocks| xG|npxG|xAG|Passes|PassesA|PrgPas|Carries|PrgCar|      Line|  C|PlayerID|
+-------------+-------+----+----------+--------+---------+----+------------+-------------+----------------+----------+-------------+---------+-------+-------------------+---------+---------+----+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+

In [36]:
FactPlayers = FactPlayers.drop('Date')

In [37]:
# Register the DataFrame as a temporary view
FactPlayers.createOrReplaceTempView("fact_players")

# SQL query to select rows with any null values
query = "SELECT * FROM fact_players WHERE " + ' OR '.join([f"{col} IS NULL" for col in FactPlayers.columns])

# Execute the query
rows_with_nulls_sql = spark.sql(query)

rows_with_nulls_sql.show()


+-------------+------+----+----+--------+----+------------+-------------+-------+----------+-----+-------+-------+--------+--------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----+---+--------+
|FormattedDate|Season|Tour|Time|Opponent|HoAw|ArsenalScore|OpponentScore|Stadium|Attendance|Coach|Referee|MatchID|fullname|LastName|FirstName|Start|Pos|Min|  G|  A| PK|PKA|  S|SoT| YK| RK|Touches|Tackles|Ints|Blocks| xG|npxG|xAG|Passes|PassesA|PrgPas|Carries|PrgCar|Line|  C|PlayerID|
+-------------+------+----+----+--------+----+------------+-------------+-------+----------+-----+-------+-------+--------+--------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----+---+--------+
+-------------+------+----+----+--------+----+------------+-------------+-------+----------+-----+-------+-------+--------+--------+---------+---

In [38]:
FactPlayers.show()

+-------------+-------+----+--------+---------+----+------------+-------------+----------------+----------+-------------+---------+-------+-------------------+---------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----------+---+--------+
|FormattedDate| Season|Tour|    Time| Opponent|HoAw|ArsenalScore|OpponentScore|         Stadium|Attendance|        Coach|  Referee|MatchID|           fullname| LastName|FirstName|Start|Pos|Min|  G|  A| PK|PKA|  S|SoT| YK| RK|Touches|Tackles|Ints|Blocks| xG|npxG|xAG|Passes|PassesA|PrgPas|Carries|PrgCar|      Line|  C|PlayerID|
+-------------+-------+----+--------+---------+----+------------+-------------+----------------+----------+-------------+---------+-------+-------------------+---------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----------+---+--------+
|   2017-08-11|2

In [39]:
FactPlayers = FactPlayers.drop('Season',
 'Tour',
 'Time',
 'Opponent',
 'HoAw',
 'Stadium','Coach',
 'Referee',
   'fullname',
 'LastName',
 'FirstName','Line')

In [40]:
FactPlayers.columns

['FormattedDate',
 'ArsenalScore',
 'OpponentScore',
 'Attendance',
 'MatchID',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'C',
 'PlayerID']

#### Loading the FactPlayers for DWH Schema in ArsenalFC Database in PostgreSql

In [41]:
FactPlayers.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.FactArsenalPlayers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


In [42]:
DimPlayers.columns

['fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C',
 'PlayerID',
 'FormattedDate']

In [43]:
DimPlayers= DimPlayers.drop('Date',
 'Start',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C','FormattedDate', 'Pos')



In [44]:
DimPlayers.createOrReplaceTempView("DimPlayers")

In [45]:
DimPlayers.columns

['fullname', 'LastName', 'FirstName', 'PlayerID']

In [46]:
DimPlayers.show()

+----------------+----------+---------+--------+
|        fullname|  LastName|FirstName|PlayerID|
+----------------+----------+---------+--------+
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Smith Rowe|    Emile|       0|
|Emile Smith Rowe|Sm

#### Loading the DimPlayers for DWH Schema in ArsenalFC Database in PostgreSql

In [47]:
DimPlayers.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimArsenalPlayers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


# Dim GoalKeepers

In [60]:
GoalKeepers.createOrReplaceTempView("GK")

#### Adding fullname to GoalKeepers so we can Join later creating the DimGoalKeepers

In [61]:
GoalKeepers= GoalKeepers.withColumn('fullname', concat_ws(" ", col('FirstName'),col('LastName')))
# GoalKeepers.select("fullname").show(5, False)

In [62]:
GoalKeepers.columns

['LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C',
 'fullname']

In [63]:
GoalKeepers_f = spark.sql("""
    select distinct concat(firstname, " ", lastname) as fullname
    from GK

""")


In [64]:
GoalKeepers_f.columns

['fullname']

In [65]:
GK_ = spark.sql("""
    select Count(fullname) from GK
""").show()

+---------------+
|count(fullname)|
+---------------+
|            218|
+---------------+



In [66]:
GoalKeepers_f.show()

+-----------------+
|         fullname|
+-----------------+
|Emiliano Martinez|
|   Aaron Ramsdale|
|       Bernd Leno|
|        Petr Cech|
|  Runar Runarsson|
|      Mathew Ryan|
|     David Ospina|
+-----------------+



In [67]:
GoalKeepers_f= GoalKeepers_f.withColumn('GkID',monotonically_increasing_id()+1)


In [68]:
DimGoalKeepers= GoalKeepers.join(GoalKeepers_f, on ='fullname', how="inner")

In [69]:
DimGoalKeepers.columns

['fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C',
 'GkID']

In [70]:
DimGoalKeepers = DimGoalKeepers.withColumn("FormattedDate", date_format(to_date("Date", "M/d/yyyy"), "yyyy-MM-dd"))


In [71]:
DimGoalKeepers.show()

+-----------------+--------+---------+---------+-----+---+---+----+---+-----+----+-----+---+---+-------+------+------+-----+--------+---+----+-------------+
|         fullname|LastName|FirstName|     Date|Start|Pos|Min|SoTA| GA|Saves|PSxG|PKatt|PKA|PKm|PassAtt|Throws|AvgLen|GKAtt|GKAvgLen|  C|GkID|FormattedDate|
+-----------------+--------+---------+---------+-----+---+---+----+---+-----+----+-----+---+---+-------+------+------+-----+--------+---+----+-------------+
|Emiliano Martinez|Martinez| Emiliano|7/26/2020|    1| GK| 90|   6|  2|    4| 3.1|    1|  1|  0|     26|     8|  29.9|    7|    41.6|  0|   1|   2020-07-26|
|Emiliano Martinez|Martinez| Emiliano|7/21/2020|    1| GK| 90|   3|  1|    2| 0.2|    0|  0|  0|     20|     5|  17.6|    5|      15|  0|   1|   2020-07-21|
|Emiliano Martinez|Martinez| Emiliano|7/15/2020|    1| GK| 90|   8|  1|    7|   2|    0|  0|  0|     32|     7|    40|   15|    51.1|  0|   1|   2020-07-15|
|Emiliano Martinez|Martinez| Emiliano|7/12/2020|    1| GK|

#### Creating FactGoalKeepers

In [72]:
FactGk = DimMatch.join(DimGoalKeepers, on='FormattedDate', how='left')
FactGk.columns

['FormattedDate',
 'Season',
 'Tour',
 'Date',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee',
 'MatchID',
 'fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C',
 'GkID']

In [73]:
FactGk.show()

+-------------+-------+----+----------+--------+--------------+----+------------+-------------+-----------------+----------+-------------+---------------+-------+---------+--------+---------+----------+-----+---+---+----+---+-----+----+-----+---+---+-------+------+------+-----+--------+---+----+
|FormattedDate| Season|Tour|      Date|    Time|      Opponent|HoAw|ArsenalScore|OpponentScore|          Stadium|Attendance|        Coach|        Referee|MatchID| fullname|LastName|FirstName|      Date|Start|Pos|Min|SoTA| GA|Saves|PSxG|PKatt|PKA|PKm|PassAtt|Throws|AvgLen|GKAtt|GKAvgLen|  C|GkID|
+-------------+-------+----+----------+--------+--------------+----+------------+-------------+-----------------+----------+-------------+---------------+-------+---------+--------+---------+----------+-----+---+---+----+---+-----+----+-----+---+---+-------+------+------+-----+--------+---+----+
|   2017-08-11|2017/18|   1|2017-08-11|20:45:00|     Leicester|home|           4|            3| Emirates Stad

In [74]:
FactGk = FactGk.drop( 
 'Season',
 'Tour',
 'Time',
 'Opponent',
 'HoAw',
 'Stadium',
 'Coach',
 'Referee',
  'Pos',
'fullname',
 'LastName',
 'FirstName','Date'
 
)
FactGk.columns

['FormattedDate',
 'ArsenalScore',
 'OpponentScore',
 'Attendance',
 'MatchID',
 'Start',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C',
 'GkID']

In [75]:
FactGk.columns

['FormattedDate',
 'ArsenalScore',
 'OpponentScore',
 'Attendance',
 'MatchID',
 'Start',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C',
 'GkID']

#### Loading the FactGoalkeepers for DWH Schema in ArsenalFC Database in PostgreSql

In [76]:
FactGk.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.FactArsenalGoalKeepers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


In [77]:
DimGoalKeepers.columns

['fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C',
 'GkID',
 'FormattedDate']

#### Loading the DimGoalKeepers for DWH Schema in ArsenalFC Database in PostgreSql

In [78]:
DimGoalKeepers= DimGoalKeepers.drop('Min','Start',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen','Date','C','FormattedDate')

DimGoalKeepers.columns

['fullname', 'LastName', 'FirstName', 'Pos', 'GkID']

In [79]:
DimGoalKeepers.createOrReplaceTempView("DimGoalKeepers")

In [80]:
DimGoalKeepers = spark.sql(""" select distinct * from DimGoalKeepers""")
DimGoalKeepers.show()

+-----------------+---------+---------+----+----+
|         fullname| LastName|FirstName| Pos|GkID|
+-----------------+---------+---------+----+----+
|Emiliano Martinez| Martinez| Emiliano|  GK|   1|
|   Aaron Ramsdale| Ramsdale|    Aaron|  GK|   2|
|       Bernd Leno|     Leno|    Bernd|  GK|   3|
|        Petr Cech|     Cech|     Petr|  GK|   4|
|  Runar Runarsson|Runarsson|    Runar|GKDM|   5|
|      Mathew Ryan|     Ryan|   Mathew|  GK|   6|
|     David Ospina|   Ospina|    David|  GK|   7|
+-----------------+---------+---------+----+----+



In [81]:
DimGoalKeepers =DimGoalKeepers.dropDuplicates()
DimGoalKeepers.show()

+-----------------+---------+---------+----+----+
|         fullname| LastName|FirstName| Pos|GkID|
+-----------------+---------+---------+----+----+
|Emiliano Martinez| Martinez| Emiliano|  GK|   1|
|   Aaron Ramsdale| Ramsdale|    Aaron|  GK|   2|
|       Bernd Leno|     Leno|    Bernd|  GK|   3|
|        Petr Cech|     Cech|     Petr|  GK|   4|
|  Runar Runarsson|Runarsson|    Runar|GKDM|   5|
|      Mathew Ryan|     Ryan|   Mathew|  GK|   6|
|     David Ospina|   Ospina|    David|  GK|   7|
+-----------------+---------+---------+----+----+



In [82]:
DimGoalKeepers.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimArsenalGoalKeepers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


### Creating DimDate and Loading into PostgreSQL

In [87]:
Dates = spark.sql ("""
select Max(Date) as Max_date , MIN(Date) as Min_Date from Matches

""").show()

+----------+----------+
|  Max_date|  Min_Date|
+----------+----------+
|2023-02-25|2017-08-11|
+----------+----------+



In [88]:
date_df = spark.sql("SELECT * FROM Matches")

In [89]:
# date_df.show()

In [90]:
# date_df = spark.range(date_diff + 1).select(to_date(expr(f"date_add(to_date('{min_date}', 'yyyy-MM-dd'), cast(id as int))"), "yyyy-MM-dd").alias("Date"))

# date_df.show()

TypeError: unsupported operand type(s) for +: 'function' and 'int'

In [66]:
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, quarter

dim_date_df = date_df.select(
    "Date",
    year("Date").alias("Year"),
    month("Date").alias("Month"),
    dayofmonth("Date").alias("Day"),
    dayofweek("Date").alias("Weekday"),
    quarter("Date").alias("Quarter")
)

dim_date_df.show()

+----------+----+-----+---+-------+-------+
|      Date|Year|Month|Day|Weekday|Quarter|
+----------+----+-----+---+-------+-------+
|2017-08-11|2017|    8| 11|      6|      3|
|2017-08-19|2017|    8| 19|      7|      3|
|2017-08-27|2017|    8| 27|      1|      3|
|2017-09-09|2017|    9|  9|      7|      3|
|2017-09-17|2017|    9| 17|      1|      3|
|2017-09-25|2017|    9| 25|      2|      3|
|2017-10-01|2017|   10|  1|      1|      4|
|2017-10-14|2017|   10| 14|      7|      4|
|2017-10-22|2017|   10| 22|      1|      4|
|2017-10-28|2017|   10| 28|      7|      4|
|2017-11-05|2017|   11|  5|      1|      4|
|2017-11-18|2017|   11| 18|      7|      4|
|2017-11-26|2017|   11| 26|      1|      4|
|2017-11-29|2017|   11| 29|      4|      4|
|2017-12-02|2017|   12|  2|      7|      4|
|2017-12-10|2017|   12| 10|      1|      4|
|2017-12-13|2017|   12| 13|      4|      4|
|2017-12-16|2017|   12| 16|      7|      4|
|2017-12-22|2017|   12| 22|      6|      4|
|2017-12-28|2017|   12| 28|     

### Loading DimDate to DWH

In [195]:
dim_date_df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/arsenalfc") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimDate") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()
