In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.config("spark.jars", "/Drivers/SQL_Sever/jdbc/postgresql-42.7.3.jar")\
.getOrCreate()

In [4]:
players=spark.read.format('csv')\
    .option('header','True') \
    .option('inferSchema','True') \
    .load('../data/players.csv')

In [5]:
players.show()

+---------+---------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----------+---+
| LastName|FirstName|     Date|Start|Pos|Min|  G|  A| PK|PKA|  S|SoT| YK| RK|Touches|Tackles|Ints|Blocks| xG|npxG|xAG|Passes|PassesA|PrgPas|Carries|PrgCar|      Line|  C|
+---------+---------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----------+---+
| Bellerin|   Hector|8/11/2017|    1| WB| 90|  0|  0|  0|  0|  1|  1|  0|  0|     79|      3|   0|     0|0.3| 0.3|0.0|    61|     70|     3|     51|     1|  Defender|  0|
|   Elneny|  Mohamed|8/11/2017|    1| CM| 66|  0|  1|  0|  0|  1|  0|  0|  0|     82|      4|   0|     2|0.0| 0.0|0.1|    65|     72|     4|     57|     0|Midfielder|  0|
|  Holding|      Rob|8/11/2017|    1| CB| 66|  0|  0|  0|  0|  0|  0|  0|  0|     75|      1|   1|     0|0.0| 0.0|0.0|    50|     60|     4|     

In [6]:
players.columns

['LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C']

In [7]:
players.schema

StructType([StructField('LastName', StringType(), True), StructField('FirstName', StringType(), True), StructField('Date', StringType(), True), StructField('Start', IntegerType(), True), StructField('Pos', StringType(), True), StructField('Min', IntegerType(), True), StructField('G', IntegerType(), True), StructField('A', IntegerType(), True), StructField('PK', IntegerType(), True), StructField('PKA', IntegerType(), True), StructField('S', IntegerType(), True), StructField('SoT', IntegerType(), True), StructField('YK', IntegerType(), True), StructField('RK', IntegerType(), True), StructField('Touches', IntegerType(), True), StructField('Tackles', IntegerType(), True), StructField('Ints', IntegerType(), True), StructField('Blocks', IntegerType(), True), StructField('xG', DoubleType(), True), StructField('npxG', DoubleType(), True), StructField('xAG', DoubleType(), True), StructField('Passes', IntegerType(), True), StructField('PassesA', IntegerType(), True), StructField('PrgPas', Intege

In [8]:
players.createOrReplaceTempView("players")

In [9]:
spark.sql('''
        select concat(firstname," ",lastname) as fullname
        from players
          
          ''').count()

2741

In [10]:
distinct_players=spark.sql('''
        select distinct concat(firstname," ",lastname) as fullname
        from players
          
          ''')

In [11]:
players_dates=spark.sql('''
    select count(distinct Date)
    from players

''')

In [12]:
players_dates.show()

+--------------------+
|count(DISTINCT Date)|
+--------------------+
|                 214|
+--------------------+



## Here we make sure that the distinct count date for matches in DimMatches Equal the distinct count Date for DimPlayers 214

In [13]:
distinct_players=distinct_players.withColumn('PlayersID',monotonically_increasing_id())

In [14]:
distinct_players.show()

+-------------------+---------+
|           fullname|PlayersID|
+-------------------+---------+
|   Emile Smith Rowe|        0|
|    Folarin Balogun|        1|
|    Hector Bellerin|        2|
|        Joe Willock|        3|
|     William Saliba|        4|
|       Aaron Ramsey|        5|
|        Bukayo Saka|        6|
|     Kieran Tierney|        7|
|   Shkodran Mustafi|        8|
|    Daniel Ceballos|        9|
|Alexandre Lacazette|       10|
|     Kieran Willian|       11|
|      Gabriel Jesus|       12|
|      Danny Welbeck|       13|
| Gabriel Marquinhos|       14|
|         Pablo Mari|       15|
|          Ben White|       16|
|      Ethan Nwaneri|       17|
|     Sead Kolasinac|       18|
|     Calum Chambers|       19|
+-------------------+---------+
only showing top 20 rows



In [15]:
players=players.withColumn('fullname',concat_ws(' ',col('FirstName'),col('LastName')))

In [16]:
players.columns

['LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C',
 'fullname']

In [17]:
players.select('fullname').show(5)

+-------------------+
|           fullname|
+-------------------+
|    Hector Bellerin|
|     Mohamed Elneny|
|        Rob Holding|
|     Sead Kolasinac|
|Alexandre Lacazette|
+-------------------+
only showing top 5 rows



In [18]:
dimplayer=players.join(distinct_players,on='fullname',how='inner')

In [19]:
dimplayer.columns

['fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C',
 'PlayersID']

In [19]:
dimplayer.write.csv('../data/Dimplayers.csv', header=True)

In [20]:
dimplayer.select('PlayersID',
'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA' 
).show()

+---------+---------+---------+---------+-----+---+---+---+---+---+---+
|PlayersID| LastName|FirstName|     Date|Start|Pos|Min|  G|  A| PK|PKA|
+---------+---------+---------+---------+-----+---+---+---+---+---+---+
|        2| Bellerin|   Hector|8/11/2017|    1| WB| 90|  0|  0|  0|  0|
|       49|   Elneny|  Mohamed|8/11/2017|    1| CM| 66|  0|  1|  0|  0|
|       26|  Holding|      Rob|8/11/2017|    1| CB| 66|  0|  0|  0|  0|
|       18|Kolasinac|     Sead|8/11/2017|    1| CB| 90|  0|  1|  0|  0|
|       10|Lacazette|Alexandre|8/11/2017|    1| FW| 90|  1|  0|  0|  0|
|       21|  Monreal|    Nacho|8/11/2017|    1| CB| 90|  0|  0|  0|  0|
|       53|     Ozil|    Mesut|8/11/2017|    1| AM| 90|  0|  0|  0|  0|
|       13|  Welbeck|    Danny|8/11/2017|    1| AM| 74|  1|  0|  0|  0|
|       44|    Xhaka|   Granit|8/11/2017|    1| CM| 90|  0|  2|  0|  0|
|        5|   Ramsey|    Aaron|8/11/2017|    0| DM| 24|  1|  0|  0|  0|
|        2| Bellerin|   Hector|8/19/2017|    1| WB| 90|  0|  0| 

### Creating the FactPlayers

In [21]:
dimmatch=spark.read.format('csv')\
    .option('header','True')\
    .option('inferSchema','True')\
    .load('../data/DimMatches/DimMatches.csv')

In [22]:
dimmatch.columns

['Season',
 'Tour',
 'Date',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee',
 'MatchID']

In [23]:
dimplayers=spark.read.format('csv')\
    .option('header','True')\
    .option('inferSchema','True')\
    .load("../data/Dimplayers/DimPlayers.csv")

In [24]:
dimplayers.columns

['fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C',
 'PlayersID']

In [25]:
factplayers=dimmatch.join(dimplayers,on='Date',how='left')

In [26]:
factplayers.show()

+----------+-------+----+-------------------+--------------+----+------------+-------------+-----------------+----------+-------------+---------------+-------+--------+--------+---------+-----+----+----+----+----+----+----+----+----+----+----+-------+-------+----+------+----+----+----+------+-------+------+-------+------+----+----+---------+
|      Date| Season|Tour|               Time|      Opponent|HoAw|ArsenalScore|OpponentScore|          Stadium|Attendance|        Coach|        Referee|MatchID|fullname|LastName|FirstName|Start| Pos| Min|   G|   A|  PK| PKA|   S| SoT|  YK|  RK|Touches|Tackles|Ints|Blocks|  xG|npxG| xAG|Passes|PassesA|PrgPas|Carries|PrgCar|Line|   C|PlayersID|
+----------+-------+----+-------------------+--------------+----+------------+-------------+-----------------+----------+-------------+---------------+-------+--------+--------+---------+-----+----+----+----+----+----+----+----+----+----+----+-------+-------+----+------+----+----+----+------+-------+------+----

In [27]:
factplayers.columns

['Date',
 'Season',
 'Tour',
 'Time',
 'Opponent',
 'HoAw',
 'ArsenalScore',
 'OpponentScore',
 'Stadium',
 'Attendance',
 'Coach',
 'Referee',
 'MatchID',
 'fullname',
 'LastName',
 'FirstName',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C',
 'PlayersID']

In [28]:
factplayers=factplayers.drop(
     'fullname',
 'LastName',
 'FirstName',
 'Line',
 'Stadium',
 'Coach',
 'Referee',
 'Opponent',
 'HoAw',
 'Season',
 'Tour',
 'Time',
 'C'
)

In [29]:
factplayers.columns

['Date',
 'ArsenalScore',
 'OpponentScore',
 'Attendance',
 'MatchID',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'PlayersID']

In [32]:
factplayers.write.csv('../data/Factplayers', header=True)

### Transforming Players Data & Creating the DimPlayers and Fact Players

In [30]:
dimplayer=spark.read.csv('../data/Dimplayers/DimPlayers.csv',header=True)

In [31]:
dimplayer.show()

+-------------------+---------+---------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----------+---+---------+
|           fullname| LastName|FirstName|     Date|Start|Pos|Min|  G|  A| PK|PKA|  S|SoT| YK| RK|Touches|Tackles|Ints|Blocks| xG|npxG|xAG|Passes|PassesA|PrgPas|Carries|PrgCar|      Line|  C|PlayersID|
+-------------------+---------+---------+---------+-----+---+---+---+---+---+---+---+---+---+---+-------+-------+----+------+---+----+---+------+-------+------+-------+------+----------+---+---------+
|    Hector Bellerin| Bellerin|   Hector|8/11/2017|    1| WB| 90|  0|  0|  0|  0|  1|  1|  0|  0|     79|      3|   0|     0|0.3| 0.3|0.0|    61|     70|     3|     51|     1|  Defender|  0|        2|
|     Mohamed Elneny|   Elneny|  Mohamed|8/11/2017|    1| CM| 66|  0|  1|  0|  0|  1|  0|  0|  0|     82|      4|   0|     2|0.0| 0.0|0.1|    65|     72|     4|     57|     0|Midfielder|  0|      

In [32]:
dimplayer.columns

['fullname',
 'LastName',
 'FirstName',
 'Date',
 'Start',
 'Pos',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C',
 'PlayersID']

In [33]:
dimplayer=dimplayer.drop('Date',
 'Start',
 'Min',
 'G',
 'A',
 'PK',
 'PKA',
 'S',
 'SoT',
 'YK',
 'RK',
 'Touches',
 'Tackles',
 'Ints',
 'Blocks',
 'xG',
 'npxG',
 'xAG',
 'Passes',
 'PassesA',
 'PrgPas',
 'Carries',
 'PrgCar',
 'Line',
 'C','FormattedDate', 'Pos')

In [34]:
dimplayer.columns

['fullname', 'LastName', 'FirstName', 'PlayersID']

In [35]:
dimplayer.show()

+-------------------+---------+---------+---------+
|           fullname| LastName|FirstName|PlayersID|
+-------------------+---------+---------+---------+
|    Hector Bellerin| Bellerin|   Hector|        2|
|     Mohamed Elneny|   Elneny|  Mohamed|       49|
|        Rob Holding|  Holding|      Rob|       26|
|     Sead Kolasinac|Kolasinac|     Sead|       18|
|Alexandre Lacazette|Lacazette|Alexandre|       10|
|      Nacho Monreal|  Monreal|    Nacho|       21|
|         Mesut Ozil|     Ozil|    Mesut|       53|
|      Danny Welbeck|  Welbeck|    Danny|       13|
|       Granit Xhaka|    Xhaka|   Granit|       44|
|       Aaron Ramsey|   Ramsey|    Aaron|        5|
|    Hector Bellerin| Bellerin|   Hector|        2|
|     Sead Kolasinac|Kolasinac|     Sead|       18|
|Alexandre Lacazette|Lacazette|Alexandre|       10|
|      Nacho Monreal|  Monreal|    Nacho|       21|
|   Shkodran Mustafi|  Mustafi| Shkodran|        8|
|         Mesut Ozil|     Ozil|    Mesut|       53|
|       Aaro

#### Loading the DimPlayers for DWH Schema in ArsenalFC Database in PostgreSql

In [36]:
dimplayer.schema

StructType([StructField('fullname', StringType(), True), StructField('LastName', StringType(), True), StructField('FirstName', StringType(), True), StructField('PlayersID', StringType(), True)])

In [37]:
dimplayer.show()

+-------------------+---------+---------+---------+
|           fullname| LastName|FirstName|PlayersID|
+-------------------+---------+---------+---------+
|    Hector Bellerin| Bellerin|   Hector|        2|
|     Mohamed Elneny|   Elneny|  Mohamed|       49|
|        Rob Holding|  Holding|      Rob|       26|
|     Sead Kolasinac|Kolasinac|     Sead|       18|
|Alexandre Lacazette|Lacazette|Alexandre|       10|
|      Nacho Monreal|  Monreal|    Nacho|       21|
|         Mesut Ozil|     Ozil|    Mesut|       53|
|      Danny Welbeck|  Welbeck|    Danny|       13|
|       Granit Xhaka|    Xhaka|   Granit|       44|
|       Aaron Ramsey|   Ramsey|    Aaron|        5|
|    Hector Bellerin| Bellerin|   Hector|        2|
|     Sead Kolasinac|Kolasinac|     Sead|       18|
|Alexandre Lacazette|Lacazette|Alexandre|       10|
|      Nacho Monreal|  Monreal|    Nacho|       21|
|   Shkodran Mustafi|  Mustafi| Shkodran|        8|
|         Mesut Ozil|     Ozil|    Mesut|       53|
|       Aaro

In [38]:
dimplayer.write.format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/mydatabase") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "dwh.DimArsenalPlayers") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .mode("overwrite") \
        .save()