In [135]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, date_format


In [136]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, date_format


In [137]:
spark = SparkSession.builder.config("spark.jars", "/Drivers/SQL_Sever/jdbc/postgresql-42.7.3.jar")\
.getOrCreate()


## Loading Data From Postgres

In [138]:
tbl_list = ['arsenalmatches', 'arsenalPlayers','arsenalGK']
dataframe = {}
for table in tbl_list:
   df = spark.read.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"{table}") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .load()
   dataframe[table] = df

In [139]:
dataframe.keys()

dict_keys(['arsenalmatches', 'arsenalPlayers', 'arsenalGK'])

In [140]:
print(dataframe)

{'arsenalmatches': DataFrame[Season: string, Tour: string, Date: string, Time: string, Opponent: string, HoAw: string, ArsenalScore: string, OpponentScore: string, Stadium: string, Attendance: string, Coach: string, Referee: string], 'arsenalPlayers': DataFrame[LastName: string, FirstName: string, Date: string, Start: string, Pos: string, Min: string, G: string, A: string, PK: string, PKA: string, S: string, SoT: string, YK: string, RK: string, Touches: string, Tackles: string, Ints: string, Blocks: string, xG: string, npxG: string, xAG: string, Passes: string, PassesA: string, PrgPas: string, Carries: string, PrgCar: string, Line: string, C: string], 'arsenalGK': DataFrame[LastName: string, FirstName: string, Date: string, Start: string, Pos: string, Min: string, SoTA: string, GA: string, Saves: string, PSxG: string, PKatt: string, PKA: string, PKm: string, PassAtt: string, Throws: string, AvgLen: string, GKAtt: string, GKAvgLen: string, C: string]}


In [141]:
Matches = dataframe['arsenalmatches']
Matches.columns

['Season',
 'Tour',
 'Date',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee']

In [142]:
Players = dataframe['arsenalPlayers']
Players.columns

['LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C']

In [143]:
GoalKeepers = dataframe['arsenalGK']
GoalKeepers.columns


['LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen',
 'C']

### Transforming Arsenal Matches Data

In [144]:
Matches.createOrReplaceTempView("Matches")

In [145]:
distinct_matches = spark.sql ("""
select Count(distinct Date) from Matches

""").show()

+--------------------+
|count(DISTINCT Date)|
+--------------------+
|                 214|
+--------------------+



#### Making sure that all matches are unique for each date

In [146]:
matches = spark.sql ("""
select Count(Date) from Matches

""").show()

+-----------+
|count(Date)|
+-----------+
|        214|
+-----------+



#### Know the Max and Min date to create the DimDate Later

In [147]:
Dates = spark.sql ("""
select Max(Date) as Max_date , MIN(Date) as Min_Date from Matches

""").show()
2017-8-11
2023-2-25

+----------+----------+
|  Max_date|  Min_Date|
+----------+----------+
|2023-02-25|2017-08-11|
+----------+----------+



1996

##### Adding MatchID column to Matches DataFrame

In [148]:
DimMatch= Matches.withColumn("MatchID", monotonically_increasing_id())

In [149]:
DimMatch.columns

['Season',
 'Tour',
 'Date',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee',
 'MatchID']

In [150]:
# DimMatch = DimMatch.drop('ArsenalScore',
#  'OpponentScore',)

In [151]:
# For DimMatch with original format yyyy-M-d
# DimMatch = DimMatch.withColumn("FormattedDate", date_format(to_date("Date", "yyyy-M-d"), "yyyy-MM-dd"))
DimMatch.columns

['Season',
 'Tour',
 'Date',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee',
 'MatchID']

#### Loading the DimMatch for DWH Schema in ArsenalFC Database in Post

In [152]:
!pip install psycopg2-binary




In [153]:
from pyspark.sql import SparkSession
import psycopg2

# Establecer la conexión
conn = psycopg2.connect(
    dbname="mydatabase", 
    user="postgres", 
    password="postgres", 
    host="postgres", 
    port="5432"
)
cursor = conn.cursor()

# Crear el esquema 'dwh' si no existe
cursor.execute("CREATE SCHEMA IF NOT EXISTS dwh;")
conn.commit()

# Cerrar la conexión
cursor.close()
conn.close()


In [154]:
DimMatch.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimArsenalMatches") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


In [155]:
from pyspark.sql.functions import to_date, date_format

# Assuming `DimMatch` is already loaded with the schema you provided

# Update the `FormattedDate` in DimMatch to ensure it's in "yyyy-MM-dd" format
# Adjust the format in to_date() according to the actual format of your 'Date' string if needed
DimMatch = DimMatch.withColumn("FormattedDate", date_format(to_date("Date", "yyyy-MM-dd"), "yyyy-MM-dd"))


In [156]:
DimMatch.schema

StructType([StructField('Season', StringType(), True), StructField('Tour', StringType(), True), StructField('Date', StringType(), True), StructField('Time', StringType(), True), StructField('Opponent', StringType(), True), StructField('HoAw', StringType(), True), StructField('ArsenalScore', StringType(), True), StructField('OpponentScore', StringType(), True), StructField('Stadium', StringType(), True), StructField('Attendance', StringType(), True), StructField('Coach', StringType(), True), StructField('Referee', StringType(), True), StructField('MatchID', LongType(), False), StructField('FormattedDate', StringType(), True)])

### Transforming Players Data & Creating the DimPlayers and Fact Players

In [157]:
Players.createOrReplaceTempView("Players")

#### Addomg fullname column to Players Table 

In [158]:
spark.sql("""
    select concat(firstname, " ", lastname) as fullname
    from Players

""").show()

+-------------------+
|           fullname|
+-------------------+
|    Hector Bellerin|
|     Mohamed Elneny|
|        Rob Holding|
|     Sead Kolasinac|
|Alexandre Lacazette|
|      Nacho Monreal|
|         Mesut Ozil|
|      Danny Welbeck|
|       Granit Xhaka|
|       Aaron Ramsey|
|    Hector Bellerin|
|     Sead Kolasinac|
|Alexandre Lacazette|
|      Nacho Monreal|
|   Shkodran Mustafi|
|         Mesut Ozil|
|       Aaron Ramsey|
|      Danny Welbeck|
|       Granit Xhaka|
|         Alex Iwobi|
+-------------------+
only showing top 20 rows



#### Checking all Distinct Players

In [None]:
distnict_players= spark.sql("""
    select distinct concat(firstname, " ", lastname) as fullname
    from Players

""")
distnict_players.show(10)

+----------------+
|        fullname|
+----------------+
|Emile Smith Rowe|
| Folarin Balogun|
| Hector Bellerin|
|     Joe Willock|
|  William Saliba|
|    Aaron Ramsey|
|     Bukayo Saka|
|  Kieran Tierney|
|Shkodran Mustafi|
| Daniel Ceballos|
+----------------+
only showing top 10 rows



#### Count the distinct date to check if it matche with DimMatches or not

In [None]:
players_Dates= spark.sql("""
    select count(distinct Date) 
    from players

""")
players_Dates.show()

#### Adding New column to distinct Players as PlayerID

In [None]:
distnict_players= distnict_players.withColumn("PlayerID", monotonically_increasing_id())

#### Adding fullname column to Player so we can create the DimPlayers joinig it with Distinct_Player Table

In [None]:
Players= Players.withColumn('fullname', concat_ws(" ", col('FirstName'),col('LastName')))
Players.select("fullname").show(5, False)

#### Creating DimPlayers

In [None]:
DimPlayers= Players.join(distnict_players, on ='fullname', how="inner")
DimPlayers.columns

In [None]:
# For DimPlayers with original format M/d/yyyy
DimPlayers = DimPlayers.withColumn("FormattedDate", date_format(to_date("Date", "M/d/yyyy"), "yyyy-MM-dd"))


In [None]:
DimMatch.show(1)

#### Creating the FactPlayers

In [None]:
# Convert 'Date' column from string to date type in DimMatch
DimMatch = DimMatch.withColumn("Date", to_date("Date", "yyyy-M-d"))

# Convert 'Date' column from string to date type in DimPlayers
DimPlayers = DimPlayers.withColumn("Date", to_date("Date", "yyyy-M-d"))


In [None]:
DimPlayers.schema

In [None]:
FactPlayers = DimMatch.join(DimPlayers, on='FormattedDate', how= 'left')
FactPlayers.show()


In [None]:
FactPlayers = FactPlayers.drop('Date')

In [None]:
# Register the DataFrame as a temporary view
FactPlayers.createOrReplaceTempView("fact_players")

# SQL query to select rows with any null values
query = "SELECT * FROM fact_players WHERE " + ' OR '.join([f"{col} IS NULL" for col in FactPlayers.columns])

# Execute the query
rows_with_nulls_sql = spark.sql(query)

rows_with_nulls_sql.show()


In [None]:
FactPlayers.show()

In [None]:
FactPlayers = FactPlayers.drop('Season',
 'Tour',
 'Time',
 'Opponent',
 'HoAw',
 'Stadium','Coach',
 'Referee',
   'fullname',
 'LastName',
 'FirstName','Line')

In [None]:
FactPlayers.columns

#### Loading the FactPlayers for DWH Schema in ArsenalFC Database in PostgreSql

In [None]:
FactPlayers.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.FactArsenalPlayers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


In [None]:
DimPlayers.columns

In [None]:
DimPlayers= DimPlayers.drop('Date',
 'Start',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C','FormattedDate', 'Pos')



In [None]:
DimPlayers.createOrReplaceTempView("DimPlayers")

In [None]:
DimPlayers.columns

In [None]:
DimPlayers.show()

#### Loading the DimPlayers for DWH Schema in ArsenalFC Database in PostgreSql

In [None]:
DimPlayers.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimArsenalPlayers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


### Dim GoalKeepers

In [None]:
GoalKeepers.createOrReplaceTempView("GK")

#### Adding fullname to GoalKeepers so we can Join later creating the DimGoalKeepers

In [None]:
GoalKeepers= GoalKeepers.withColumn('fullname', concat_ws(" ", col('FirstName'),col('LastName')))
# GoalKeepers.select("fullname").show(5, False)

In [None]:
GoalKeepers.columns

In [None]:
GoalKeepers_f = spark.sql("""
    select distinct concat(firstname, " ", lastname) as fullname
    from GK

""")


In [None]:
GoalKeepers_f.columns

In [None]:
# Crear la columna 'fullname' en el DataFrame
GoalKeepers = GoalKeepers.withColumn('fullname', concat_ws(" ", col('FirstName'), col('LastName')))

# Registrar GoalKeepers como una vista temporal
GoalKeepers.createOrReplaceTempView("GK")

# Ahora puedes ejecutar las consultas SQL
GoalKeepers_f = spark.sql("""
    select distinct concat(firstname, " ", lastname) as fullname
    from GK
""")

GK_ = spark.sql("""
    select Count(fullname) from GK
""")


In [None]:
GK_ = spark.sql ("""
select Count(fullname) from GK
""")



In [None]:
GoalKeepers_f.show()

In [None]:
GoalKeepers_f= GoalKeepers_f.withColumn('GkID',monotonically_increasing_id()+1)


In [None]:
DimGoalKeepers= GoalKeepers.join(GoalKeepers_f, on ='fullname', how="inner")

In [None]:
DimGoalKeepers.columns

In [None]:
DimGoalKeepers = DimGoalKeepers.withColumn("FormattedDate", date_format(to_date("Date", "M/d/yyyy"), "yyyy-MM-dd"))


In [None]:
DimGoalKeepers.show()

#### Creating FactGoalKeepers

In [None]:
FactGk = DimMatch.join(DimGoalKeepers, on='FormattedDate', how='left')
FactGk.columns

In [None]:
FactGk.show()

In [None]:
FactGk = FactGk.drop( 
 'Season',
 'Tour',
 'Time',
 'Opponent',
 'HoAw',
 'Stadium',
 'Coach',
 'Referee',
  'Pos',
'fullname',
 'LastName',
 'FirstName','Date'
 
)
FactGk.columns

In [None]:
FactGk.columns

#### Loading the FactGoalkeepers for DWH Schema in ArsenalFC Database in PostgreSql

In [None]:
FactGk.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.FactArsenalGoalKeepers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


In [None]:
DimGoalKeepers.columns

#### Loading the DimGoalKeepers for DWH Schema in ArsenalFC Database in PostgreSql

In [None]:
DimGoalKeepers= DimGoalKeepers.drop('Min','Start',
 'SoTA',
 'GA',
 'Saves',
 'PSxG',
 'PKatt',
 'PKA',
 'PKm',
 'PassAtt',
 'Throws',
 'AvgLen',
 'GKAtt',
 'GKAvgLen','Date','C','FormattedDate')

DimGoalKeepers.columns

In [None]:
DimGoalKeepers.createOrReplaceTempView("DimGoalKeepers")

In [None]:
DimGoalKeepers = spark.sql(""" select distinct * from DimGoalKeepers""")
DimGoalKeepers.show()

In [None]:
DimGoalKeepers =DimGoalKeepers.dropDuplicates()
DimGoalKeepers.show()

In [None]:
DimGoalKeepers.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimArsenalGoalKeepers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()


### Creating DimDate and Loading into PostgreSQL

In [None]:
Dates = spark.sql ("""
select Max(Date) as Max_date , MIN(Date) as Min_Date from Matches

""").show()

In [None]:
date_df = spark.sql("SELECT * FROM Matches")

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, quarter

dim_date_df = date_df.select(
    "Date",
    year("Date").alias("Year"),
    month("Date").alias("Month"),
    dayofmonth("Date").alias("Day"),
    dayofweek("Date").alias("Weekday"),
    quarter("Date").alias("Quarter")
)

dim_date_df.show()


### Loading DimDate to DWH

In [None]:
dim_date_df.write.format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "dwh.DimDate") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .mode("overwrite") \
    .save()
