##  Install the Python modules

In [None]:
#pip install psycopg2
#pip install psycopg2-binary
#pip install sqlalchemy
#!pip install -U pandasql

In [1]:
import psycopg2
import pandas as pd

#### Connect to PostgreSQL

In [2]:
pgconn = psycopg2.connect(
    host="localhost",
    user="postgres",
    password="enteryourpassword")

In [3]:
pgcursor = pgconn.cursor()

#### Creating a PostgreSQL database using Psycopg2


In [4]:
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

pgconn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) 

pgcursor.execute('DROP DATABASE IF EXISTS fifapd')
pgcursor.execute('CREATE DATABASE fifapd')
 


In [5]:
pgconn.close()

####  Connect to PostgreSQL DBMS

In [6]:
pgconn = psycopg2.connect(
    host="localhost",
    database="fifapd",
    user="postgres",
    password="enteryourpassword")

# Save pandas DataFrame to PostgreSQL

#### 1. Create a pandas DataFrame

In [7]:
playerdf = pd.read_csv("data/playernames.csv", index_col = False)
playerdf.head()

Unnamed: 0,Name,url
0,Cristiano Ronaldo,/player/20801/cristiano-ronaldo/
1,Lionel Messi,/player/158023/lionel-messi/
2,Neymar,/player/190871/neymar/
3,Luis Suárez,/player/176580/luis-su%C3%A1rez/
4,Manuel Neuer,/player/167495/manuel-neuer/


#### 2. save the dataframe to PostgreSQL database.

In [8]:
from sqlalchemy import create_engine

# Create SQLAlchemy engine
engine = create_engine('postgresql+psycopg2://postgres:enteryourpassword@localhost/fifapd')

In [9]:
#Use the pandas to_sql() method to save the dataframe to a PostgreSQL table.
#create player table 

playerdf.to_sql('player', engine, if_exists='replace', index = False)

In [10]:
#create club table
clubdf = pd.read_csv("data/clubnames.csv", index_col = False)
clubdf.to_sql('club', engine, if_exists='replace', index = False)

In [11]:
#create national table
nationaldf = pd.read_csv("data/nationalnames.csv", index_col = False)
nationaldf.to_sql('national', engine, if_exists='replace', index = False)

In [12]:
#create fulldata table
fulldf = pd.read_csv("data/fulldata.csv", index_col = False)
fulldf.to_sql('fulldata', engine, if_exists='replace', index = False)

#  Load data from PostgreSQL in Pandas SQL

In [13]:
# check how many rows are in club table
pd.read_sql_query('select count(*) from club', engine)

Unnamed: 0,count
0,633


In [14]:
#Querying the database catalog, information_schema for fulldata table

pd.read_sql_query('''select ordinal_position, column_name, data_type  
                     from information_schema.columns 
                     where table_name = 'fulldata'
                ''', engine).head(10)

Unnamed: 0,ordinal_position,column_name,data_type
0,53,GK_Reflexes,bigint
1,41,Heading,bigint
2,42,Shot_Power,bigint
3,43,Finishing,bigint
4,44,Long_Shots,bigint
5,45,Curve,bigint
6,46,Freekick_Accuracy,bigint
7,47,Penalties,bigint
8,48,Volleys,bigint
9,49,GK_Positioning,bigint


In [125]:
#read fulldata table
fulldata_df= pd.read_sql('select * from fulldata', engine)
fulldata_df.head(3)

Unnamed: 0,Name,Nationality,National_Position,National_Kit,Club,Club_Position,Club_Kit,Club_Joining,Contract_Expiry,Rating,...,Long_Shots,Curve,Freekick_Accuracy,Penalties,Volleys,GK_Positioning,GK_Diving,GK_Kicking,GK_Handling,GK_Reflexes
0,Cristiano Ronaldo,Portugal,LS,7.0,Real Madrid,LW,7.0,07/01/2009,2021.0,94,...,90,81,76,85,88,14,7,15,11,11
1,Lionel Messi,Argentina,RW,10.0,FC Barcelona,RW,10.0,07/01/2004,2018.0,93,...,88,89,90,74,85,14,6,15,11,8
2,Neymar,Brazil,LW,10.0,FC Barcelona,LW,11.0,07/01/2013,2021.0,92,...,77,79,84,81,83,15,9,15,9,11


#  Pandas SQL

In [144]:
select_df= pd.read_sql('fulldata', engine, columns=['Name', 'Age' ,'Speed','Height','Weight'])
select_df.head(3)


Unnamed: 0,Name,Age,Speed,Height,Weight
0,Cristiano Ronaldo,32,92,185 cm,80 kg
1,Lionel Messi,29,87,170 cm,72 kg
2,Neymar,25,90,174 cm,68 kg


In [145]:
from pandasql import sqldf

In [146]:
pysqldf = lambda q: sqldf(q, globals())
query = 'SELECT * FROM select_df LIMIT 3'
pysqldf(query)

Unnamed: 0,Name,Age,Speed,Height,Weight
0,Cristiano Ronaldo,32,92,185 cm,80 kg
1,Lionel Messi,29,87,170 cm,72 kg
2,Neymar,25,90,174 cm,68 kg


In [148]:
query = 'SELECT Age, AVG("Speed") AS mean_Speed FROM select_df GROUP BY Age LIMIT 10' 
pysqldf(query)

Unnamed: 0,Age,mean_Speed
0,17,62.280255
1,18,63.440901
2,19,64.053785
3,20,65.536424
4,21,66.754181
5,22,67.62198
6,23,68.19469
7,24,68.244599
8,25,68.352453
9,26,68.457741


# Load data from PostgreSQL in Spark 

In [15]:
import findspark
findspark.init()

In [16]:
from pyspark.sql import SparkSession

In [17]:
spark=SparkSession.builder.master('local').appName('Spark_PostgresSQL').getOrCreate()

In [18]:

#engine = create_engine('postgresql+psycopg2://postgres:enteryourpassword@localhost/fifapd')
df= pd.read_sql('select * from player', engine)


In [19]:
df.head()

Unnamed: 0,Name,url
0,Cristiano Ronaldo,/player/20801/cristiano-ronaldo/
1,Lionel Messi,/player/158023/lionel-messi/
2,Neymar,/player/190871/neymar/
3,Luis Suárez,/player/176580/luis-su%C3%A1rez/
4,Manuel Neuer,/player/167495/manuel-neuer/


In [20]:

# Convert Pandas dataframe to spark DataFrame
player_spark = spark.createDataFrame(df)
print(player_spark.schema)


StructType(List(StructField(Name,StringType,true),StructField(url,StringType,true)))


In [21]:
player_spark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- url: string (nullable = true)



In [22]:
player_spark.show()

+------------------+--------------------+
|              Name|                 url|
+------------------+--------------------+
| Cristiano Ronaldo|/player/20801/cri...|
|      Lionel Messi|/player/158023/li...|
|            Neymar|/player/190871/ne...|
|       Luis Suárez|/player/176580/lu...|
|      Manuel Neuer|/player/167495/ma...|
|            De Gea|/player/193080/de...|
|Robert Lewandowski|/player/188545/ro...|
|       Gareth Bale|/player/173731/ga...|
|Zlatan Ibrahimović|/player/41236/zla...|
|  Thibaut Courtois|/player/192119/th...|
|    Jérôme Boateng|/player/183907/j%...|
|       Eden Hazard|/player/183277/ed...|
|       Luka Modrić|/player/177003/lu...|
|        Mesut Özil|/player/176635/me...|
|   Gonzalo Higuaín|/player/167664/go...|
|      Thiago Silva|/player/164240/th...|
|      Sergio Ramos|/player/155862/se...|
|     Sergio Agüero|/player/153079/se...|
|        Paul Pogba|/player/195864/pa...|
| Antoine Griezmann|/player/194765/an...|
+------------------+--------------

In [23]:
player_spark.describe('Name').show()


+-------+----------------+
|summary|            Name|
+-------+----------------+
|  count|           17588|
|   mean|            null|
| stddev|            null|
|    min|  A.J. DeLaGarza|
|    max|Željko Filipović|
+-------+----------------+



In [24]:
player_spark.describe('url').show()

+-------+--------------------+
|summary|                 url|
+-------+--------------------+
|  count|               17588|
|   mean|                null|
| stddev|                null|
|    min|/player/100557/br...|
|    max|/player/9833/died...|
+-------+--------------------+



# Spark SQL

In [25]:
#loading fulldata table 
df_1= pd.read_sql('select * from fulldata', engine)
fulldata_spark= spark.createDataFrame(df_1)
print(fulldata_spark.printSchema())


root
 |-- Name: string (nullable = true)
 |-- Nationality: string (nullable = true)
 |-- National_Position: string (nullable = true)
 |-- National_Kit: double (nullable = true)
 |-- Club: string (nullable = true)
 |-- Club_Position: string (nullable = true)
 |-- Club_Kit: double (nullable = true)
 |-- Club_Joining: string (nullable = true)
 |-- Contract_Expiry: double (nullable = true)
 |-- Rating: long (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Preffered_Foot: string (nullable = true)
 |-- Birth_Date: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Preffered_Position: string (nullable = true)
 |-- Work_Rate: string (nullable = true)
 |-- Weak_foot: long (nullable = true)
 |-- Skill_Moves: long (nullable = true)
 |-- Ball_Control: long (nullable = true)
 |-- Dribbling: long (nullable = true)
 |-- Marking: long (nullable = true)
 |-- Sliding_Tackle: long (nullable = true)
 |-- Standing_Tackle: long (nullable = true)


In [26]:
# Selecting by column name
select_df=fulldata_spark.select('Name','Nationality','Club_Joining','Height','Weight','Age','Speed','Reactions')
select_df.show()

+------------------+-----------+------------+------+------+---+-----+---------+
|              Name|Nationality|Club_Joining|Height|Weight|Age|Speed|Reactions|
+------------------+-----------+------------+------+------+---+-----+---------+
| Cristiano Ronaldo|   Portugal|  07/01/2009|185 cm| 80 kg| 32|   92|       96|
|      Lionel Messi|  Argentina|  07/01/2004|170 cm| 72 kg| 29|   87|       95|
|            Neymar|     Brazil|  07/01/2013|174 cm| 68 kg| 25|   90|       88|
|       Luis Suárez|    Uruguay|  07/11/2014|182 cm| 85 kg| 30|   77|       93|
|      Manuel Neuer|    Germany|  07/01/2011|193 cm| 92 kg| 31|   61|       85|
|            De Gea|      Spain|  07/01/2011|193 cm| 82 kg| 26|   56|       88|
|Robert Lewandowski|     Poland|  07/01/2014|185 cm| 79 kg| 28|   82|       88|
|       Gareth Bale|      Wales|  09/02/2013|183 cm| 74 kg| 27|   95|       87|
|Zlatan Ibrahimović|     Sweden|  07/01/2016|195 cm| 95 kg| 35|   74|       85|
|  Thibaut Courtois|    Belgium|  07/26/

In [27]:
# Selecting by Age of player and speed 
select_df.where((fulldata_spark.Age < 25) & (fulldata_spark.Speed > 80)).show()


+--------------------+-----------+------------+------+------+---+-----+---------+
|                Name|Nationality|Club_Joining|Height|Weight|Age|Speed|Reactions|
+--------------------+-----------+------------+------+------+---+-----+---------+
|        Paulo Dybala|  Argentina|  07/01/2015|177 cm| 74 kg| 23|   86|       85|
|         David Alaba|    Austria|  02/10/2010|180 cm| 76 kg| 24|   86|       84|
|       Romelu Lukaku|    Belgium|  07/30/2014|190 cm| 94 kg| 23|   89|       77|
|    Yannick Carrasco|    Belgium|  07/10/2015|180 cm| 66 kg| 23|   89|       83|
|      Raphaël Varane|     France|  07/01/2011|191 cm| 78 kg| 23|   83|       79|
|         Eric Bailly|Ivory Coast|  07/01/2016|187 cm| 77 kg| 22|   83|       75|
|     Raheem Sterling|    England|  07/14/2015|170 cm| 69 kg| 22|   92|       79|
|     Anthony Martial|     France|  09/01/2015|184 cm| 76 kg| 21|   91|       81|
|       Mohamed Salah|      Egypt|  01/26/2014|175 cm| 72 kg| 24|   92|       80|
|          Sadio

In [28]:
# SQL aggregate function for speed (return AVG of speed)
select_df.agg({'Speed':'avg'}).show()

+-----------------+
|       avg(Speed)|
+-----------------+
|65.48385262679099|
+-----------------+



In [29]:
# aggregate function by group

select_df.groupBy('Weight').agg({'Speed':'avg'}).orderBy('avg(Speed)', ascending=False).show()

+------+-----------------+
|Weight|       avg(Speed)|
+------+-----------------+
| 49 kg|             85.0|
| 50 kg|             80.5|
| 59 kg|79.34782608695652|
| 56 kg|             77.1|
| 61 kg|75.70833333333333|
| 58 kg|75.47826086956522|
| 62 kg| 75.1592356687898|
| 60 kg|73.97435897435898|
| 57 kg|             73.2|
| 63 kg|73.09895833333333|
| 66 kg|           72.875|
| 65 kg|72.42163355408388|
| 64 kg|71.76530612244898|
| 68 kg|71.35082458770614|
| 67 kg|71.24511930585683|
| 55 kg|           70.625|
| 71 kg|70.52202283849918|
| 69 kg|70.31856540084388|
| 53 kg|             70.0|
| 54 kg|69.66666666666667|
+------+-----------------+
only showing top 20 rows



#### User-Defined Functions (UDF)

In [89]:
select_df.describe('Speed').show()

+-------+------------------+
|summary|             Speed|
+-------+------------------+
|  count|             17588|
|   mean| 65.48385262679099|
| stddev|14.100614851107773|
|    min|                11|
|    max|                96|
+-------+------------------+



In [90]:
select_df.describe('Age').show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|             17588|
|   mean|25.460313850352513|
| stddev| 4.680217413869141|
|    min|                17|
|    max|                47|
+-------+------------------+



In [112]:
def age_group(age):
    
    if age < 20:
         return '10–20'
    elif age < 30:
         return '20–30'
    elif age < 40:
        return '30–40'
    else:
        return '40+'

In [113]:
from pyspark.sql.functions import udf
agegroup_udf = udf(lambda z: age_group(z))
userDFAgeGroup =select_df.withColumn('AgeGroup',agegroup_udf(F.col('age')))

In [114]:
userDFAgeGroup.show()

+------------------+-----------+------------+------+------+---+-----+---------+--------+
|              Name|Nationality|Club_Joining|Height|Weight|Age|Speed|Reactions|AgeGroup|
+------------------+-----------+------------+------+------+---+-----+---------+--------+
| Cristiano Ronaldo|   Portugal|  07/01/2009|185 cm| 80 kg| 32|   92|       96|   30–40|
|      Lionel Messi|  Argentina|  07/01/2004|170 cm| 72 kg| 29|   87|       95|   20–30|
|            Neymar|     Brazil|  07/01/2013|174 cm| 68 kg| 25|   90|       88|   20–30|
|       Luis Suárez|    Uruguay|  07/11/2014|182 cm| 85 kg| 30|   77|       93|   30–40|
|      Manuel Neuer|    Germany|  07/01/2011|193 cm| 92 kg| 31|   61|       85|   30–40|
|            De Gea|      Spain|  07/01/2011|193 cm| 82 kg| 26|   56|       88|   20–30|
|Robert Lewandowski|     Poland|  07/01/2014|185 cm| 79 kg| 28|   82|       88|   20–30|
|       Gareth Bale|      Wales|  09/02/2013|183 cm| 74 kg| 27|   95|       87|   20–30|
|Zlatan Ibrahimović| 

In [120]:
def speed_group(speed):
    if speed < 50:
        return 'low speed'
    elif speed < 80:
        return 'average speed'
    else:
        return 'high speed'
    

In [121]:
from pyspark.sql.functions import udf
speedgroup_udf = udf(lambda z: speed_group(z))
AgeSpeedGroup =userDFAgeGroup.withColumn('SpeedGroup',speedgroup_udf(F.col('speed')))

In [122]:
AgeSpeedGroup.show()

+------------------+-----------+------------+------+------+---+-----+---------+--------+-------------+
|              Name|Nationality|Club_Joining|Height|Weight|Age|Speed|Reactions|AgeGroup|   SpeedGroup|
+------------------+-----------+------------+------+------+---+-----+---------+--------+-------------+
| Cristiano Ronaldo|   Portugal|  07/01/2009|185 cm| 80 kg| 32|   92|       96|   30–40|   high speed|
|      Lionel Messi|  Argentina|  07/01/2004|170 cm| 72 kg| 29|   87|       95|   20–30|   high speed|
|            Neymar|     Brazil|  07/01/2013|174 cm| 68 kg| 25|   90|       88|   20–30|   high speed|
|       Luis Suárez|    Uruguay|  07/11/2014|182 cm| 85 kg| 30|   77|       93|   30–40|average speed|
|      Manuel Neuer|    Germany|  07/01/2011|193 cm| 92 kg| 31|   61|       85|   30–40|average speed|
|            De Gea|      Spain|  07/01/2011|193 cm| 82 kg| 26|   56|       88|   20–30|average speed|
|Robert Lewandowski|     Poland|  07/01/2014|185 cm| 79 kg| 28|   82|    

#### Closing the connection

In [23]:
pgconn.close()
engine.dispose()