In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

In [45]:
spark = SparkSession.builder\
                    .appName('BcAccum')\
                    .getOrCreate()

In [46]:
players = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load('../dataset/player.csv')

In [47]:
#players.show(5)
#players.count()
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [48]:
player_attr = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load('../dataset/player_attributes.csv')

In [49]:
player_attr.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [50]:
players.count(), player_attr.count()

(11060, 183978)

In [51]:
player_attr.select('player_api_id')\
            .distinct()\
            .count()

11060

In [52]:
#clean up based on need
players = players.drop('id', 'player_fifa_api_id')
player_attr = player_attr.drop('id', 'player_fifa_api_id', 
                               'preferred_foot',
                              'attacking_work_rate',
                              'defensive_work_rate',
                              'crossing',
                              'jumping',
                              'sprint_speed',
                              'balance',
                              'aggression',
                              'short_passing',
                              'potential')
player_attr = player_attr.dropna()
players = players.dropna()

In [53]:
player_attr.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [54]:
year_extract_udf = udf(lambda date: date.split('-')[0])
player_attr = player_attr.withColumn(
        'year',
        year_extract_udf(player_attr.date)
)
player_attr = player_attr.drop('date')

In [55]:
pa_2016 = player_attr.filter(player_attr.year == 2016)

In [56]:
#pa_2016.select(pa_2016.player_api_id)\
pa_2016.select('player_api_id')\
    .distinct()\
    .count()

5586

In [57]:
pa_striker_2016 = pa_2016.groupBy('player_api_id')\
                    .agg({
                'finishing':'avg',
                'shot_power':'avg',
                'acceleration':'avg'
}).withColumnRenamed('avg(finishing)','finishing')\
.withColumnRenamed('avg(shot_power)','shot_power')\
.withColumnRenamed('avg(acceleration)','acceleration')

In [58]:
pa_striker_2016.show(5)

+-------------+-----------------+-----------------+----------+
|player_api_id|        finishing|     acceleration|shot_power|
+-------------+-----------------+-----------------+----------+
|       309726|75.44444444444444|74.11111111111111|      76.0|
|        26112|             53.0|             51.0|      76.0|
|        38433|            68.25|             74.0|      74.0|
|       295060|             25.0|             62.0|      40.0|
|       161396|             29.0|             72.0|      69.0|
+-------------+-----------------+-----------------+----------+
only showing top 5 rows



In [59]:
weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1
total_weight = weight_finishing+weight_shot_power+weight_acceleration

In [60]:
strikers = pa_striker_2016.withColumn('striker_grade',
                                  (pa_striker_2016.finishing*weight_finishing+\
                                  pa_striker_2016.acceleration*weight_acceleration+\
                                  pa_striker_2016.shot_power*weight_shot_power)/total_weight    
                                )

In [61]:
strikers.show(5)

+-------------+-----------------+-----------------+----------+-----------------+
|player_api_id|        finishing|     acceleration|shot_power|    striker_grade|
+-------------+-----------------+-----------------+----------+-----------------+
|       309726|75.44444444444444|74.11111111111111|      76.0|75.38888888888889|
|        26112|             53.0|             51.0|      76.0|             64.0|
|        38433|            68.25|             74.0|      74.0|          72.5625|
|       295060|             25.0|             62.0|      40.0|            41.75|
|       161396|             29.0|             72.0|      69.0|            59.75|
+-------------+-----------------+-----------------+----------+-----------------+
only showing top 5 rows



In [62]:
strikers = strikers.drop('finishing', 'acceleration', 'shot_power')

In [63]:
strikers = strikers.filter(strikers.striker_grade > 70)\
            .sort(strikers.striker_grade.desc())

In [64]:
strikers.show(5)

+-------------+-------------+
|player_api_id|striker_grade|
+-------------+-------------+
|        20276|        89.25|
|        37412|         89.0|
|        38817|        88.75|
|        32118|        88.25|
|        31921|         87.0|
+-------------+-------------+
only showing top 5 rows



In [67]:
striker_details = players.join(strikers, players.player_api_id == strikers.player_api_id)

In [68]:
striker_details.show(5)

+-------------+--------------+-------------------+------+------+-------------+-------------+
|player_api_id|   player_name|           birthday|height|weight|player_api_id|striker_grade|
+-------------+--------------+-------------------+------+------+-------------+-------------+
|        20276|          Hulk|1986-07-25 00:00:00|180.34|   187|        20276|        89.25|
|        37412| Sergio Aguero|1988-06-02 00:00:00|172.72|   163|        37412|         89.0|
|        38817|  Carlos Tevez|1984-02-05 00:00:00|172.72|   157|        38817|        88.75|
|        32118|Lukas Podolski|1985-06-04 00:00:00|182.88|   183|        32118|        88.25|
|        31921|   Gareth Bale|1989-07-16 00:00:00|182.88|   163|        31921|         87.0|
+-------------+--------------+-------------------+------+------+-------------+-------------+
only showing top 5 rows



In [69]:
from pyspark.sql.functions import broadcast

In [70]:
striker_details = players.select('player_api_id', 'player_name')\
                        .join(
                            broadcast(strikers),
                            ['player_api_id'],
                            'inner'
                        )

In [71]:
striker_details.show(5)

+-------------+-----------------+-------------+
|player_api_id|      player_name|striker_grade|
+-------------+-----------------+-------------+
|        27316|       Aaron Hunt|        74.75|
|        40719|     Aaron Niguez|        74.25|
|        75489|     Aaron Ramsey|       76.875|
|       120919|Aatif Chahechouhe|         78.0|
|        67334|Abdoul Karim Yoda|         74.0|
+-------------+-----------------+-------------+
only showing top 5 rows



In [72]:
players_heading_acc = player_attr.select('player_api_id', 'heading_accuracy')\
        .join(broadcast(players), player_attr.player_api_id == players.player_api_id)

In [73]:
short_count = spark.sparkContext.accumulator(0)
medium_low_count = spark.sparkContext.accumulator(0)
medium_high_count = spark.sparkContext.accumulator(0)
tail_count = spark.sparkContext.accumulator(0)

In [77]:
def count_players_by_height(row):
    height = float(row.height)
    if (height <= 175):
        short_count.add(1)
    elif (height <=183 and height > 175):
        medium_low_count.add(1)
    elif (height <= 195 and height > 183):
        medium_high_count.add(1)
    elif (height > 195):
        tail_count.add(1)

In [78]:
players_heading_acc.foreach(lambda x: count_players_by_height(x))

In [79]:
all_players = (short_count.value, medium_low_count.value, medium_high_count.value, tail_count.value)

In [80]:
all_players

(18977, 97399, 61518, 3371)

In [81]:
short_ha_count = spark.sparkContext.accumulator(0)
medium_low_ha_count = spark.sparkContext.accumulator(0)
medium_high_ha_count = spark.sparkContext.accumulator(0)
tail_ha_count = spark.sparkContext.accumulator(0)

In [84]:
def count_players_by_height_and_ha(row, threshold_score):
    height = float(row.height)
    ha = float(row.heading_accuracy)
    
    if (ha <= threshold_score):
        return
    
    if (height <= 175):
        short_ha_count.add(1)
    elif (height <=183 and height > 175):
        medium_low_ha_count.add(1)
    elif (height <= 195 and height > 183):
        medium_high_ha_count.add(1)
    elif (height > 195):
        tail_ha_count.add(1)

In [85]:
players_heading_acc.foreach(lambda x: count_players_by_height_and_ha(x,60))

In [86]:
all_players_above_th = (short_ha_count.value, medium_low_ha_count.value, medium_high_ha_count.value, tail_ha_count.value)

In [87]:
all_players_above_th

(3653, 41448, 40270, 1573)

In [88]:
perc_vals = (short_ha_count.value/short_count.value*100\
            ,medium_low_ha_count.value/medium_low_count.value*100\
            ,medium_high_ha_count.value/medium_high_count.value*100\
            ,tail_ha_count.value/tail_count.value*100
            )

In [89]:
perc_vals

(19.249617958581442, 42.55485169252251, 65.46051562144413, 46.66271136161376)

In [91]:
pa_2016.select('player_api_id', 'overall_rating')\
            .coalesce(1)\ # repartition into a single partition
            .write\
            .option('header', 'true')\
            .csv('/tmp/players_overall.csv')

In [94]:
type(pa_2016.select('player_api_id', 'overall_rating')\
            .coalesce(1).write)

pyspark.sql.readwriter.DataFrameWriter

In [96]:
pa_2016.select('player_api_id', 'overall_rating')\
            .write\
            .json('/tmp/players_overall.json')

In [97]:
from pyspark.accumulators import AccumulatorParam

In [98]:
class VectorAccumulatorParam(AccumulatorParam):
    
    def zero(self, value):
        return [0.0] * len(value)
    
    def addInPlace(self, v1, v2):
        for i in range(len(v1)):
            v1[i] += v2[i]
            
        return v1

In [106]:
vector_accum = spark.sparkContext.accumulator([10.0, 20.0, 30.0], VectorAccumulatorParam())

In [107]:
vector_accum.value

[10.0, 20.0, 30.0]

In [108]:
vector_accum += [1,2,3]
vector_accum.value

[11.0, 22.0, 33.0]

In [129]:
valuesA = [('Imaya', 5000), ('Chitra', 4000), ('Karthik', 4000), ('Kamal', 10)]

In [130]:
emps_sal = spark.createDataFrame(valuesA, ['name', 'salary'])

In [131]:
emps_sal.show()

+-------+------+
|   name|salary|
+-------+------+
|  Imaya|  5000|
| Chitra|  4000|
|Karthik|  4000|
|  Kamal|    10|
+-------+------+



In [132]:
valuesB = [('Imaya', 1), ('Chitra', 2), ('Karthik I', 3), ('Kamal I', 4)]

In [133]:
emp = spark.createDataFrame(valuesB, ['name', 'empid'])
emp.show()

+---------+-----+
|     name|empid|
+---------+-----+
|    Imaya|    1|
|   Chitra|    2|
|Karthik I|    3|
|  Kamal I|    4|
+---------+-----+



In [134]:
inner_join = emps_sal.join(emp, emps_sal.name == emp.name)

In [135]:
inner_join.show()

+------+------+------+-----+
|  name|salary|  name|empid|
+------+------+------+-----+
| Imaya|  5000| Imaya|    1|
|Chitra|  4000|Chitra|    2|
+------+------+------+-----+



In [136]:
left_join = emps_sal.join(emp, emps_sal.name == emp.name, how='left')

In [137]:
left_join.show()

+-------+------+------+-----+
|   name|salary|  name|empid|
+-------+------+------+-----+
|  Kamal|    10|  null| null|
|Karthik|  4000|  null| null|
|  Imaya|  5000| Imaya|    1|
| Chitra|  4000|Chitra|    2|
+-------+------+------+-----+



In [138]:
right_join = emps_sal.join(emp, emps_sal.name == emp.name, how='right')

In [139]:
right_join.show()

+------+------+---------+-----+
|  name|salary|     name|empid|
+------+------+---------+-----+
|  null|  null|  Kamal I|    4|
| Imaya|  5000|    Imaya|    1|
|  null|  null|Karthik I|    3|
|Chitra|  4000|   Chitra|    2|
+------+------+---------+-----+



In [140]:
full_outer_join = emps_sal.join(emp, emps_sal.name == emp.name, how='full')

In [141]:
full_outer_join.show()

+-------+------+---------+-----+
|   name|salary|     name|empid|
+-------+------+---------+-----+
|  Kamal|    10|     null| null|
|   null|  null|  Kamal I|    4|
|Karthik|  4000|     null| null|
|  Imaya|  5000|    Imaya|    1|
|   null|  null|Karthik I|    3|
| Chitra|  4000|   Chitra|    2|
+-------+------+---------+-----+

