In [0]:
from pyspark.sql import SparkSession  #df_1_csv

In [0]:
spark = SparkSession.builder.master('localhost[*]').appName('Players').getOrCreate()

In [0]:
spark

In [0]:
sc = spark.sparkContext

In [0]:
import pandas as pd



In [0]:
player_stats = spark.read.table('player_attributes_4_csv')
players = spark.read.table('player_12_csv')

In [0]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: integer (nullable = true)



In [0]:
player_stats.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: integer (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: integer (nullable = true)
 |-- sprint_speed: integer (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = t

In [0]:
players.count()
players.select('player_fifa_api_id').distinct().count()

Out[10]: 11060

In [0]:
player_stats.count()

Out[11]: 183978

In [0]:
player_stats.select('player_fifa_api_id').distinct().count()

Out[12]: 11062

In [0]:
player_stats.select('player_fifa_api_id', 'date', 'free_kick_accuracy').show()

+------------------+-------------------+------------------+
|player_fifa_api_id|               date|free_kick_accuracy|
+------------------+-------------------+------------------+
|            218353|2016-02-18 00:00:00|                39|
|            218353|2015-11-19 00:00:00|                39|
|            218353|2015-09-21 00:00:00|                39|
|            218353|2015-03-20 00:00:00|                38|
|            218353|2007-02-22 00:00:00|                38|
|            189615|2016-04-21 00:00:00|                69|
|            189615|2016-04-07 00:00:00|                69|
|            189615|2016-01-07 00:00:00|                69|
|            189615|2015-12-24 00:00:00|                69|
|            189615|2015-12-17 00:00:00|                69|
|            189615|2015-10-16 00:00:00|                69|
|            189615|2015-09-25 00:00:00|                69|
|            189615|2015-09-21 00:00:00|                69|
|            189615|2015-01-09 00:00:00|

Looking at the count of each tables, we can see players - 11062 (distinct id) while player_stats - 11062 (distinct id) 

but 183978 (count of the id and table which means, It has lots of duplicate id but maybe not same field info).
Hence, we need to be careful while joining both tables, and in a case like this, since it is player_stats that has the higher count and duplicate, we will join players table to it, 
cause if we join players table to it and player also have duplicate, it will multiply each duplicate of the table it is been join to twice. eg. Player_stats has and 5 duplicate id, and the related id on players_stats have two duplicate, it will be 10 duplicates join together...

In [0]:
# How to join tables. NOTE: By default how is inner/left join and you might not need to put it.

players_with_stats = player_stats.join(players, players.player_fifa_api_id == player_stats.player_fifa_api_id, how= 'left')

In [0]:
# Since the both tables has the same field name to join on, it is better to use in this format : 

players_with_stats = player_stats.join(players, ['player_fifa_api_id'])

In [0]:
players_with_stats.show()

+------------------+---+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+---+-------------+------------------+-------------------+------+------+
|player_fifa_api_id| id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|s

In [0]:
# It is good to count thereafter to know if it has more rows than than the table it been joined on. And if the rows are reduced, that means some id from the table joined has no id.

players_with_stats.count()

Out[17]: 183929

BROADCAST is a good way of joining whereby you wrap the join around the broadcast. broadcast the player dataset into all the executor and 
this will make the join with the player_Stats faster because the player_Stats each partition of it has in the memory of the executor already the entire player dataset.


IMPORT BOARDCAST FIRST

In [0]:
from pyspark.sql.functions import broadcast

In [0]:
# doing a broadcast of our player dataset

player_stats.join(broadcast(players), ['player_fifa_api_id']).show()

+------------------+---+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+---+-------------+------------------+-------------------+------+------+
|player_fifa_api_id| id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|s

In [0]:
player_stats.select('date').show(3)

+-------------------+
|               date|
+-------------------+
|2016-02-18 00:00:00|
|2015-11-19 00:00:00|
|2015-09-21 00:00:00|
+-------------------+
only showing top 3 rows



In [0]:
from pyspark.sql.functions import year

In [0]:
player_stats = player_stats.withColumn('Years', year(player_stats['date']))

In [0]:
player_stats.select('player_api_id', 'Years').show()

+-------------+-----+
|player_api_id|Years|
+-------------+-----+
|       505942| 2016|
|       505942| 2015|
|       505942| 2015|
|       505942| 2015|
|       505942| 2007|
|       155782| 2016|
|       155782| 2016|
|       155782| 2016|
|       155782| 2015|
|       155782| 2015|
|       155782| 2015|
|       155782| 2015|
|       155782| 2015|
|       155782| 2015|
|       155782| 2014|
|       155782| 2014|
|       155782| 2014|
|       155782| 2014|
|       155782| 2014|
|       155782| 2014|
+-------------+-----+
only showing top 20 rows



#### We wanna find who are the best attacking players, without using the overall rating from the dataset. Using field of:

 'sprint_speed',
 'finishing',
 'acceleration',
 'shot_power'.
 
#### To get the overall best players

In [0]:
player_stats.columns

Out[24]: ['id',
 'player_fifa_api_id',
 'player_api_id',
 'date',
 'overall_rating',
 'potential',
 'preferred_foot',
 'attacking_work_rate',
 'defensive_work_rate',
 'crossing',
 'finishing',
 'heading_accuracy',
 'short_passing',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'sprint_speed',
 'agility',
 'reactions',
 'balance',
 'shot_power',
 'jumping',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'Years']

In [0]:
#  We wanna create A dataframe whereby for each year, each player occur only once.

grouped_stats = player_stats.groupBy('player_api_id', 'Years').agg\
    ({'sprint_speed':'mean', 'finishing':'mean', 
      'acceleration':'mean', 'shot_power':'mean'})

In [0]:
grouped_stats.show()

+-------------+-----+------------------+-----------------+-----------------+-----------------+
|player_api_id|Years|    avg(finishing)|avg(acceleration)|avg(sprint_speed)|  avg(shot_power)|
+-------------+-----+------------------+-----------------+-----------------+-----------------+
|       191784| 2015|              21.6|             31.0|             55.0|             57.6|
|       209372| 2015| 65.66666666666667|71.66666666666667|             74.0|73.66666666666667|
|       170593| 2014|              42.0|             80.5|             82.0|             68.0|
|       185336| 2012|              63.0|             71.0|             68.0|             76.0|
|       243068| 2016|              66.0|             78.0|             76.0|             69.0|
|        47394| 2015|              70.8|             76.5|             69.3|             74.6|
|        39311| 2008|              92.0|             88.0|             84.0|             87.0|
|       182847| 2013|              38.0|          

In [0]:
# Create a new col to sum all the grouped_Stats to get the best players

grouped_stats = grouped_stats.withColumn('overall', (grouped_stats['avg(finishing)'] + grouped_stats['avg(acceleration)'] + grouped_stats['avg(sprint_speed)'] + grouped_stats['avg(shot_power)'])
                                        / 4)
# Doing a divide by 4 is like getting the percent of the matrics

In [0]:
# We want our best player by the overall criteria

grouped_stats.orderBy('overall', ascending = False).show()

+-------------+-----+--------------+-----------------+-----------------+-----------------+-----------------+
|player_api_id|Years|avg(finishing)|avg(acceleration)|avg(sprint_speed)|  avg(shot_power)|          overall|
+-------------+-----+--------------+-----------------+-----------------+-----------------+-----------------+
|        30893| 2015|          95.0|             91.0|             93.4|             94.0|            93.35|
|        30893| 2008|          92.0|             95.0|             94.0|             92.0|            93.25|
|        33639| 2007|          94.0|             95.0|             94.0|             89.5|           93.125|
|        30893| 2014|          93.5|             91.0|             94.0|             94.0|           93.125|
|        30893| 2013|          92.0|             91.0|             94.0|             94.4|            92.85|
|        30893| 2010|          89.5|             93.0|             94.0|             92.5|            92.25|
|        30893| 201

In [0]:
# We now have lesser rows compare to the main dataset
grouped_stats.count()

Out[29]: 73059

In [0]:
# We now join the players dataset to it.

final_best_players = grouped_stats.join(players, ['player_api_id']).orderBy('overall', ascending= False)

In [0]:
final_best_players.show()

+-------------+-----+--------------+-----------------+-----------------+-----------------+-----------------+-----+-----------------+------------------+-------------------+------+------+
|player_api_id|Years|avg(finishing)|avg(acceleration)|avg(sprint_speed)|  avg(shot_power)|          overall|   id|      player_name|player_fifa_api_id|           birthday|height|weight|
+-------------+-----+--------------+-----------------+-----------------+-----------------+-----------------+-----+-----------------+------------------+-------------------+------+------+
|        30893| 2015|          95.0|             91.0|             93.4|             94.0|            93.35| 1995|Cristiano Ronaldo|             20801|1985-02-05 00:00:00|185.42|   176|
|        30893| 2008|          92.0|             95.0|             94.0|             92.0|            93.25| 1995|Cristiano Ronaldo|             20801|1985-02-05 00:00:00|185.42|   176|
|        33639| 2007|          94.0|             95.0|             94.

In [0]:
final_best_players = final_best_players.select('Years', 'overall', 'player_name')
final_best_players.show()

+-----+-----------------+-----------------+
|Years|          overall|      player_name|
+-----+-----------------+-----------------+
| 2015|            93.35|Cristiano Ronaldo|
| 2008|            93.25|Cristiano Ronaldo|
| 2007|           93.125|     Samuel Eto'o|
| 2014|           93.125|Cristiano Ronaldo|
| 2013|            92.85|Cristiano Ronaldo|
| 2010|            92.25|Cristiano Ronaldo|
| 2011|           92.125|Cristiano Ronaldo|
| 2012|             92.0|Cristiano Ronaldo|
| 2009|           91.875|Cristiano Ronaldo|
| 2009|            91.75|  Fernando Torres|
| 2007|           91.625|    Thierry Henry|
| 2009|           91.625|     Samuel Eto'o|
| 2008|            91.25|     Samuel Eto'o|
| 2008|            90.75|  Fernando Torres|
| 2013|            90.65|     Lionel Messi|
| 2008|             90.5|   Lukas Podolski|
| 2007|             90.5|     Wayne Rooney|
| 2012|            90.25|     Lionel Messi|
| 2014|90.16666666666666|     Lionel Messi|
| 2011|           90.125|     Sa

In [0]:
# To get the top players for a certain year :

year = 2015

final_best_players.filter(final_best_players.Years == year).show()

+-----+-----------------+--------------------+
|Years|          overall|         player_name|
+-----+-----------------+--------------------+
| 2015|            93.35|   Cristiano Ronaldo|
| 2015|89.78571428571428|        Lionel Messi|
| 2015|             89.0|         Gareth Bale|
| 2015|88.91666666666666|        Arjen Robben|
| 2015|88.32499999999999|Pierre-Emerick Au...|
| 2015|            87.95|       Sergio Aguero|
| 2015|             87.5|        Theo Walcott|
| 2015|87.33333333333334|          Marco Reus|
| 2015|             87.0|                Hulk|
| 2015|          86.6875|        Carlos Tevez|
| 2015|             86.5|      Seydou Doumbia|
| 2015|          86.4375| Alexandre Lacazette|
| 2015|          86.3125|    Daniel Sturridge|
| 2015|             86.0|       Juan Cuadrado|
| 2015|           85.875|              Neymar|
| 2015|            85.75|          Ahmed Musa|
| 2015|85.64285714285714|         Luis Suarez|
| 2015|             85.5|         Eden Hazard|
| 2015|85.321

INTRODUCTION TO SPARK SQL (SQL LIKE)

In spark you can't directly do this:

SELECT * FROM players

The reason is we are referencing to the players table. 
Our first step to create a 'SQL LIKE' table which we can query with spark >>>

players.createOrReplaceTempView('players')

Here we are defining a sql view, which is coming from our df, so sql is interpreting the metadata of our df and creating a table called players.

In [0]:
# Here we are creating our temp 'SQL LIKE' table from the df and associating it with spark

players.createOrReplaceTempView('players')

In [0]:
#We can now communicate with sql with spark in this manner, whereby we wrap it with spark.sql

spark.sql('SELECT * FROM players').show()

+---+-------------+--------------------+------------------+-------------------+------+------+
| id|player_api_id|         player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+--------------------+------------------+-------------------+------+------+
|  1|       505942|  Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|     Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|         Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|       Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|        Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
|  6|        27316|          Aaron Hunt|            158138|1986-09-04 00:00:00|182.88|   161|
|  7|       564793|          Aaron Kuhl|            221280|1996-01-30 00:00:00|172.72|   146|
|  8|        30895|        Aaron Lennon|            152747|1

In [0]:
df_players = spark.sql('SELECT *\
          FROM players\
          WHERE height > 170 ORDER BY height DESC')

In [0]:
 df_players.select('player_name').show()

+--------------------+
|         player_name|
+--------------------+
|    Kristof van Hout|
|        Bogdan Milic|
|   Costel Pantilimon|
|        Fejsal Mulic|
|       Jurgen Wevers|
|          Kevin Vink|
|       Lacina Traore|
|        Nikola Zigic|
|       Paolo Acerbis|
|       Pietro Marino|
|   Stefan Maierhofer|
|Vanja Milinkovic-...|
|        Zeljko Kalac|
|           Abdoul Ba|
|       Asmir Begovic|
|         Daniel Burn|
|      Danny Wintjens|
|      Fraser Forster|
|      Konrad Jalocha|
|        Peter Crouch|
+--------------------+
only showing top 20 rows



In [0]:
player_stats.createOrReplaceTempView('stats')

In [0]:
''' Here we use sql query and further combine with pyspark df operation , if you notice, the pyspark part here, we used player_stats rather than stats, 
#this is because we have moved outta the sql query.'''

#NOTE: Always remember to wrap your sql query in colon ''

#spark.sql('SELECT * FROM stats').filter(player_stats['Stamina'] > 50).select('Stamina').show()
spark.sql('SELECT * FROM stats WHERE stamina > 50').select('Stamina').show()

+-------+
|Stamina|
+-------+
|     54|
|     54|
|     54|
|     54|
|     54|
|     79|
|     79|
|     79|
|     79|
|     79|
|     79|
|     79|
|     79|
|     79|
|     79|
|     79|
|     79|
|     80|
|     80|
|     79|
+-------+
only showing top 20 rows



In [0]:
# I first select the first 50, and then groupby in pyspark and agg over the overall mean (agg the top 50), so it return the result from the top 20, which will be lesser than 50. 

goats = spark.sql('SELECT player_api_id, overall_rating FROM stats ORDER BY overall_rating DESC LIMIT 50').groupby('player_api_id').agg({'overall_rating' : 'mean'})

goats.createOrReplaceTempView('goats')

Above i saved it to Tempview 


JOINS WITH SPARK SQL

Below, then call sql like way.

In [0]:
spark.sql('SELECT * FROM goats INNER JOIN players USING(player_api_id)').show()

+-------------+-------------------+-----+-----------------+------------------+-------------------+------+------+
|player_api_id|avg(overall_rating)|   id|      player_name|player_fifa_api_id|           birthday|height|weight|
+-------------+-------------------+-----+-----------------+------------------+-------------------+------+------+
|        30723|               91.0|  388| Alessandro Nesta|              1088|1976-03-19 00:00:00|187.96|   174|
|        30955|               91.0|  742|   Andres Iniesta|                41|1984-05-11 00:00:00|170.18|   150|
|        30893|  92.05263157894737| 1995|Cristiano Ronaldo|             20801|1985-02-05 00:00:00|185.42|   176|
|        34520|               91.0| 3183|  Fabio Cannavaro|              1183|1973-09-13 00:00:00|175.26|   165|
|        30717|               92.0| 3826| Gianluigi Buffon|              1179|1978-01-28 00:00:00|193.04|   201|
|        39989|               92.0| 3994|   Gregory Coupet|              1747|1972-12-31 00:00:0

GROUP BY IS EASY TO MANIPULATE WITH SQL BUT IF I WANNA JOIN WITH BROADCAST I WILL RATHER USE PYSPARK DF LOGIC

WINDOW FUNCTIONS WITH SQL (SQL IS VERY POWERFUL WITH WINDOW FUNCTIONS)

Window functions operation allows all the rows that is in the column to get max of it without having to groupby, 

because if with groupby i will be loosing the detail of the year or other field selected, but we dont want that.

IN WINDOW FUNCTIONS you select the fields and Simply state the operation you want (maybe max()), THEN OVER what you want it 

(This means i want it to go through all the date/year and see for each player the best / max rating in his history) And 

make sure you always PARTITION the OVER field, so it can calculate the max id per player, or else it will give you

result of the max player and apply it to all players.

In [0]:
spark.sql('SELECT player_api_id, Years, MAX(overall_rating) OVER (PARTITION BY player_api_id ORDER BY date) AS Maximum_ever FROM stats').select('player_api_id', 'Maximum_ever').show()

+-------------+------------+
|player_api_id|Maximum_ever|
+-------------+------------+
|       101042|          64|
|       101042|          64|
|       101042|          64|
|       101042|          67|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          68|
|       101042|          69|
|       101042|          69|
|       101042|          69|
|       101042|          69|
|       101070|          68|
+-------------+------------+
only showing top 20 rows



PARTITION 
 
IT BREAKS UP THE FILES TO PARTITION IN THE SMARTEST WAY POSSIBLE TO BE ABLE TO MAKE THIS OPERATION AS ULTIMA AS POSSIBLE.
We will use aparche pauquet to do our partitioning.
Aparche Pauquet means column_oriented data file format designed for efficient data storage and retrival. By default we can't ctrl the partition.
We have RDD, so we can get the number of partition of a certain rdd/df. For our sample superstore because i got it from a csv file there were no partition involved. 
 

NOTE: if read csv from my computer i get one partition by default and so all operation will be carried out in 1 partition, but now that i repartition it and i store my file in parquet format in 6 partition, now when i read the parquet file it already 6 partition and so the data is very more optimized.

In [0]:
player_stats

Out[96]: DataFrame[id: string, player_fifa_api_id: string, player_api_id: string, date: string, overall_rating: string, potential: string, preferred_foot: string, attacking_work_rate: string, defensive_work_rate: string, crossing: string, finishing: int, heading_accuracy: string, short_passing: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: int, sprint_speed: int, agility: string, reactions: string, balance: string, shot_power: int, jumping: string, stamina: int, strength: string, long_shots: string, aggression: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string, Years: int]

In [0]:
player_stats.rdd.getNumPartitions()

# This has been partition if not it was meant to be 1

Out[98]: 8

In [0]:
# On the other hand i can now one operation called .... So by doing this i have repartition my df

player_stats = player_stats.repartition(6)

Out[100]: DataFrame[id: string, player_fifa_api_id: string, player_api_id: string, date: string, overall_rating: string, potential: string, preferred_foot: string, attacking_work_rate: string, defensive_work_rate: string, crossing: string, finishing: int, heading_accuracy: string, short_passing: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: int, sprint_speed: int, agility: string, reactions: string, balance: string, shot_power: int, jumping: string, stamina: int, strength: string, long_shots: string, aggression: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string, Years: int]

In [0]:
player_stats.rdd.getNumPartitions()

Out[101]: 8

In [0]:
# I can actually store my parquet file by writting them into parquet or store in csv.
player_stats.write.parquet('C:\Users\pc1\Desktop\Player_Attributes', encoding='windows-1252', mode = 'overwrite')


In [0]:
new_player_stats = spark.read.parquet('C:\Users\pc1\Desktop\Player_Attributes')
new_player_stats