In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("soccer players dataset analysis").getOrCreate()

In [4]:
#load data from csv into spark dataframe
data = spark.read.format('csv').option('header', True).load('player.csv')

In [5]:
data

DataFrame[id: string, player_api_id: string, player_name: string, player_fifa_api_id: string, birthday: string, height: string, weight: string]

In [6]:
data.show()

+---+-------------+--------------------+------------------+-------------------+------+------+
| id|player_api_id|         player_name|player_fifa_api_id|           birthday|height|weight|
+---+-------------+--------------------+------------------+-------------------+------+------+
|  1|       505942|  Aaron Appindangoye|            218353|1992-02-29 00:00:00|182.88|   187|
|  2|       155782|     Aaron Cresswell|            189615|1989-12-15 00:00:00|170.18|   146|
|  3|       162549|         Aaron Doran|            186170|1991-05-13 00:00:00|170.18|   163|
|  4|        30572|       Aaron Galindo|            140161|1982-05-08 00:00:00|182.88|   198|
|  5|        23780|        Aaron Hughes|             17725|1979-11-08 00:00:00|182.88|   154|
|  6|        27316|          Aaron Hunt|            158138|1986-09-04 00:00:00|182.88|   161|
|  7|       564793|          Aaron Kuhl|            221280|1996-01-30 00:00:00|172.72|   146|
|  8|        30895|        Aaron Lennon|            152747|1

In [7]:
#print schema of the dataframe
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [8]:
#load data from player attributes
player_att = spark.read.format('csv').option('header', True).load('player_attributes.csv')

In [9]:
player_att.show()

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_posit

In [10]:
#get the count of players from both the dataset
[data.count() ,
player_att.count()]

[11060, 183978]

In [11]:
#get count of only distinct player_api_id
player_att.select('player_api_id').distinct().count()

11060

In [12]:
#drop not useful columns from players
data = data.drop('id', 'player_fifa_api_id')
data.columns

['player_api_id', 'player_name', 'birthday', 'height', 'weight']

In [13]:
player_att = player_att.drop('id', 'player_fifa_api_id', 'preferred_foot', 'attacking_work_rate', 'defensive_work_rate',
                   'crossing', 'jumping', 'sprint_speed','balance','aggression','short_passing', 'potential')
player_att.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [14]:
#remove records with null values
data = data.dropna()
player_att =   player_att.dropna()

In [15]:
#get the count of players from both the dataset
[data.count() ,
player_att.count()]

[11060, 181265]

In [16]:
from pyspark.sql.functions import udf

In [17]:
year_extract_udf = udf(lambda date: date.split('-')[0])
player_att = player_att.withColumn('year', year_extract_udf(player_att.date))
player_att.select('year').show()

+----+
|year|
+----+
|2016|
|2015|
|2015|
|2015|
|2007|
|2016|
|2016|
|2016|
|2015|
|2015|
|2015|
|2015|
|2015|
|2015|
|2014|
|2014|
|2014|
|2014|
|2014|
|2014|
+----+
only showing top 20 rows



In [18]:
player_att = player_att.drop('date')
player_att.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [19]:
#get the count of distinct players in year 2016
pl_2016 = player_att.filter(player_att.year == 2016)
pl_2016.select('player_api_id').distinct().count()

5586

In [20]:
#get the striker characteristics 
pl_striker = pl_2016.groupBy('player_api_id').agg({'finishing' : 'avg', "shot_power":'avg','acceleration':'avg'})
pl_striker.show()

+-------------+-----------------+------------------+------------------+
|player_api_id|   avg(finishing)| avg(acceleration)|   avg(shot_power)|
+-------------+-----------------+------------------+------------------+
|       309726|75.44444444444444| 74.11111111111111|              76.0|
|        26112|             53.0|              51.0|              76.0|
|        38433|            68.25|              74.0|              74.0|
|       295060|             25.0|              62.0|              40.0|
|       161396|             29.0|              72.0|              69.0|
|        37774|             61.0|              64.0|              68.0|
|        41157|             81.0|              87.0|              80.0|
|        40740|             58.0|              73.5|              75.0|
|        31432|             14.0|              59.0|              65.0|
|       109653|             62.0|              65.0|              83.5|
|       282680|             12.0|              33.0|            

In [21]:
 pl_striker = pl_striker.withColumnRenamed('avg(finishing)', 'finishing')\
            .withColumnRenamed('avg(shot_power)', 'shot_p')\
            .withColumnRenamed('avg(acceleration)', 'acc')

In [22]:
  #give weights to these characteristics and find total capacity for each player
wt_shotp = 2
wt_finishing = 1
wt_acc = 1
total_wt = wt_shotp + wt_finishing + wt_acc
strikers = pl_striker.withColumn('striker_grade', (pl_striker.finishing * wt_finishing\
                                + pl_striker.shot_p * wt_shotp\
                                + pl_striker.acc * wt_acc) / total_wt)
strikers

DataFrame[player_api_id: string, finishing: double, acc: double, shot_p: double, striker_grade: double]

In [23]:
strikers.columns

['player_api_id', 'finishing', 'acc', 'shot_p', 'striker_grade']

In [24]:
strikers.select('striker_grade').show()

+-----------------+
|    striker_grade|
+-----------------+
|75.38888888888889|
|             64.0|
|          72.5625|
|            41.75|
|            59.75|
|            65.25|
|             82.0|
|           70.375|
|            50.75|
|             73.5|
|            23.25|
|             49.5|
|            74.25|
|47.75000000000001|
|             63.5|
|             68.5|
|          59.9375|
|73.33333333333334|
|            68.75|
|             46.5|
+-----------------+
only showing top 20 rows



In [25]:
#we can now drop unnecessary columns 
strikers.drop('finishing','acc','shot_p')

DataFrame[player_api_id: string, striker_grade: double]

In [26]:
strikers = strikers.filter(strikers.striker_grade > 70).sort(strikers.striker_grade.desc())

In [27]:
strikers.show()

+-------------+-----------------+-----+-----------------+-----------------+
|player_api_id|        finishing|  acc|           shot_p|    striker_grade|
+-------------+-----------------+-----+-----------------+-----------------+
|        20276|             85.0| 84.0|             94.0|            89.25|
|        37412|             90.0| 92.0|             87.0|             89.0|
|        38817|             88.0| 90.0|             88.5|            88.75|
|        32118|             85.0| 82.0|             93.0|            88.25|
|        31921|             81.0| 93.0|             87.0|             87.0|
|        30834|             85.0| 90.0|             86.0|            86.75|
|       303824|73.42857142857143| 91.0|             88.0|85.10714285714286|
|       129944|             83.0| 89.0|             84.0|             85.0|
|       158263|             77.0| 90.0|             86.0|            84.75|
|       150565|             88.0| 95.0|             78.0|            84.75|
|        257

In [28]:
#get the players details with their names using join
striker_details = data.join(strikers, data.player_api_id == strikers.player_api_id)
striker_details.columns

['player_api_id',
 'player_name',
 'birthday',
 'height',
 'weight',
 'player_api_id',
 'finishing',
 'acc',
 'shot_p',
 'striker_grade']

In [29]:
striker_details.select(strikers.player_api_id, 'player_name', 'striker_grade').show()

+-------------+--------------------+-----------------+
|player_api_id|         player_name|    striker_grade|
+-------------+--------------------+-----------------+
|        20276|                Hulk|            89.25|
|        37412|       Sergio Aguero|             89.0|
|        38817|        Carlos Tevez|            88.75|
|        32118|      Lukas Podolski|            88.25|
|        31921|         Gareth Bale|             87.0|
|        30834|        Arjen Robben|            86.75|
|       303824|       Memphis Depay|85.10714285714286|
|       129944|          Marco Reus|             85.0|
|       158263|        Dorlan Pabon|            84.75|
|       150565|Pierre-Emerick Au...|            84.75|
|        25759|     Gonzalo Higuain|84.66666666666667|
|       156726|       Douglas Costa|             84.5|
|       169193| Alexandre Lacazette|          84.4375|
|       286119|         Jamie Vardy|84.42857142857143|
|        30348|       Jermain Defoe|           84.375|
|        4

In [30]:
#get the players details with their names using join(in other way).Here join column will come only once
striker_details2 =  data.join(strikers, ['player_api_id'])

In [31]:
striker_details2.select('player_api_id', 'player_name', 'striker_grade').show()

+-------------+--------------------+-----------------+
|player_api_id|         player_name|    striker_grade|
+-------------+--------------------+-----------------+
|        20276|                Hulk|            89.25|
|        37412|       Sergio Aguero|             89.0|
|        38817|        Carlos Tevez|            88.75|
|        32118|      Lukas Podolski|            88.25|
|        31921|         Gareth Bale|             87.0|
|        30834|        Arjen Robben|            86.75|
|       303824|       Memphis Depay|85.10714285714286|
|       129944|          Marco Reus|             85.0|
|       158263|        Dorlan Pabon|            84.75|
|       150565|Pierre-Emerick Au...|            84.75|
|        25759|     Gonzalo Higuain|84.66666666666667|
|       156726|       Douglas Costa|             84.5|
|       169193| Alexandre Lacazette|          84.4375|
|       286119|         Jamie Vardy|84.42857142857143|
|        30348|       Jermain Defoe|           84.375|
|        4

In [32]:
from pyspark.sql.functions import broadcast

In [33]:
[data.count(), strikers.count()]

[11060, 1609]

In [34]:
#as strikers data is smaller we can broadcast it on worker nodes
striker_details3 = data.select('player_api_id', 'player_name')\
                    .join(
                    broadcast(strikers),
                    ['player_api_id'],
                    'inner'
                        )

In [35]:
striker_details3.sort(striker_details3.striker_grade.
                     desc()).show()

+-------------+--------------------+-----------------+-----+-----------------+-----------------+
|player_api_id|         player_name|        finishing|  acc|           shot_p|    striker_grade|
+-------------+--------------------+-----------------+-----+-----------------+-----------------+
|        20276|                Hulk|             85.0| 84.0|             94.0|            89.25|
|        37412|       Sergio Aguero|             90.0| 92.0|             87.0|             89.0|
|        38817|        Carlos Tevez|             88.0| 90.0|             88.5|            88.75|
|        32118|      Lukas Podolski|             85.0| 82.0|             93.0|            88.25|
|        31921|         Gareth Bale|             81.0| 93.0|             87.0|             87.0|
|        30834|        Arjen Robben|             85.0| 90.0|             86.0|            86.75|
|       303824|       Memphis Depay|73.42857142857143| 91.0|             88.0|85.10714285714286|
|       129944|          Marco

In [36]:
#get count of player and player attributes tables 
[data.count(), player_att.count()]

[11060, 181265]

In [37]:
#as player's data is less, broadcast it all worker nodes
player_heading_accuracy = player_att.select('player_api_id', 'heading_accuracy')\
                    .join(
                    broadcast(data),
                    ['player_api_id'],
                    'inner'
                        )

In [38]:
player_heading_accuracy.show()

+-------------+----------------+------------------+-------------------+------+------+
|player_api_id|heading_accuracy|       player_name|           birthday|height|weight|
+-------------+----------------+------------------+-------------------+------+------+
|       505942|              71|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              71|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              71|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              70|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       505942|              70|Aaron Appindangoye|1992-02-29 00:00:00|182.88|   187|
|       155782|              58|   Aaron Cresswell|1989-12-15 00:00:00|170.18|   146|
|       155782|              58|   Aaron Cresswell|1989-12-15 00:00:00|170.18|   146|
|       155782|              57|   Aaron Cresswell|1989-12-15 00:00:00|170.18|   146|
|       155782|              57|   Aaron Cresswell|198

In [39]:
#set accumulators to be broadcasted for computing across worker nodes for range of heights
short_count = spark.sparkContext.accumulator(0)
med_low_count = spark.sparkContext.accumulator(0)
med_high_count = spark.sparkContext.accumulator(0)
high_count = spark.sparkContext.accumulator(0)

In [40]:
def count_players_by_ht(row):
    ht = float(row.height)
    if (ht <= 175):
        short_count.add(1)
    elif (ht <= 183 and ht > 175):
        med_low_count.add(1)
    elif (ht <= 195 and ht > 183):
        med_high_count.add(1)
    elif (ht > 195):
        high_count.add(1)

In [41]:
#apply count_players_by_ht function to all records in player_heading_accuracy
player_heading_accuracy.foreach(lambda x: count_players_by_ht(x))
all_players = [short_count, med_low_count, med_high_count, high_count]
all_players

[Accumulator<id=0, value=18977>,
 Accumulator<id=1, value=97399>,
 Accumulator<id=2, value=61518>,
 Accumulator<id=3, value=3371>]

In [42]:
#set accumulators for heading accuracy
short_count_ha = spark.sparkContext.accumulator(0)
med_low_count_ha = spark.sparkContext.accumulator(0)
med_high_count_ha = spark.sparkContext.accumulator(0)
high_count_ha = spark.sparkContext.accumulator(0)

In [46]:
def count_players_by_ht_ha(row, threshold_score):
    ht = float(row.height)
    ha = float(row.heading_accuracy)
    if (ha <= threshold_score):
        return
    if (ht <= 175):
        short_count_ha.add(1)
    elif (ht <= 183 and ht > 175):
        med_low_count_ha.add(1)
    elif (ht <= 195 and ht > 183):
        med_high_count_ha.add(1)
    elif (ht > 195):
        high_count_ha.add(1)

In [47]:
player_heading_accuracy.foreach(lambda x: count_players_by_ht_ha(x,60))
all_players_above_ha = [short_count_ha, med_low_count_ha, med_high_count_ha, high_count_ha]
all_players_above_ha

[Accumulator<id=4, value=3653>,
 Accumulator<id=5, value=41448>,
 Accumulator<id=6, value=40270>,
 Accumulator<id=7, value=1573>]

In [50]:
#percentage of players with high heading accuracy bucketed by height
percentage_value = [short_count_ha.value/short_count.value * 100,
                    med_low_count_ha.value/med_low_count.value * 100,
                    med_high_count_ha.value/med_high_count.value * 100,
                    high_count_ha.value/ high_count.value * 100]
percentage_value

[16.14228899690676, 29.851563231470614, 39.56262034817464, 31.8163430420712]

In [51]:
pl_2016.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [52]:
#store players 2016 dataframe into .json and .csv files --(coalesce for repartitioning data into no. of partition files we want)
pl_2016.select('player_api_id', 'overall_rating').coalesce(1).write\
        .option('header', 'true').csv('player_rating.csv')

In [53]:
pl_2016.select('player_api_id', 'overall_rating').write\
        .option('header', 'true').csv('player_rating.json')