## Accumulators and Broadcast Variables
to share variables

Broadcast variables - only 1 read-only copy per worker node. Tasks on the worker node will make use of it. Will be cached in-memory on each worker node. Use whenever there are tasks across stages, but all the tasks needs same data. Share dataset with all nodes, (training data in ML)


Accumulators - Broadcasts variables to the worker nodes, but can be modified by adding to it, read-write variables. Can be modified at the worker nodes. And these modifications are restricted; they can only be added - associatively and commutatively. Spark native support for accumulators of type - Long, Double, Collections. But, you can extend by by subclassing AccumulatorV2 library.


In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Analysing soccer players").getOrCreate()

In [3]:
players = spark.read.format('csv').option("header", "true").load('player.csv')

In [4]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [5]:
player_attributes = spark.read.format('csv').option("header", "true").load('player_attributes.csv')

In [6]:
player_attributes.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [7]:
players.count() , player_attributes.count()

(11060, 183978)

In [10]:
player_attributes.select('player_api_id').distinct().count()

11060

In [11]:
players = players.drop('id', 'player_fifa_api_id')
players.columns

['player_api_id', 'player_name', 'birthday', 'height', 'weight']

In [12]:
player_attributes = player_attributes.drop('id', 'player_fifa_api_id',
                                          'preferred_foot', 'attacking_work_rate',
                                          'defensive_work_rate','crossing',
                                          'jumping','sprint_speed',
                                          'balance','aggression','short_passing',
                                          'potential')

In [13]:
player_attributes.columns

['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

In [14]:
player_attributes = player_attributes.dropna()
players = players.dropna()

User defined function

In [15]:
from pyspark.sql.functions import udf

In [16]:
year_extract_udf = udf(lambda date: date.split('-')[0])

In [17]:
player_attributes = player_attributes.withColumn("year", year_extract_udf(player_attributes.date))

In [18]:
player_attributes = player_attributes.drop('date')

In [20]:
player_attributes.columns

['player_api_id',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

Find the best stricker based on specific attributes

In [21]:
pa_2016 = player_attributes.filter(player_attributes.year == 2016)

In [22]:
pa_2016.count()

14098

In [23]:
pa_2016.select(pa_2016.player_api_id).distinct().count()

5586

few aggregations

In [24]:
pa_stricker_2016 = pa_2016.groupBy('player_api_id').agg({"finishing": "avg", "shot_power":"avg", "acceleration":"avg"})

In [25]:
pa_stricker_2016.count()

5586

In [26]:
pa_stricker_2016.show(5)

+-------------+-----------------+-----------------+---------------+
|player_api_id|   avg(finishing)|avg(acceleration)|avg(shot_power)|
+-------------+-----------------+-----------------+---------------+
|       309726|75.44444444444444|74.11111111111111|           76.0|
|        26112|             53.0|             51.0|           76.0|
|        38433|            68.25|             74.0|           74.0|
|       295060|             25.0|             62.0|           40.0|
|       161396|             29.0|             72.0|           69.0|
+-------------+-----------------+-----------------+---------------+
only showing top 5 rows



In [27]:
pa_stricker_2016 = pa_stricker_2016.withColumnRenamed("avg(finishing)", "finishing").withColumnRenamed("avg(acceleration)", "acceleration").withColumnRenamed("avg(shot_power)", "shot_power")

Let me assign the different weights to the individual characteristics

In [28]:
weight_finishing = 1
weight_shot_power = 2
weight_acceleration=1

total_weight = weight_finishing + weight_shot_power + weight_acceleration

In [29]:
strickers = pa_stricker_2016.withColumn("stricker_grade", (pa_stricker_2016.finishing * weight_finishing + \
                                                          pa_stricker_2016.shot_power * weight_shot_power + \
                                                          pa_stricker_2016.acceleration * weight_acceleration)/total_weight)

In [30]:
strickers = strickers.drop('finishing', 'acceleration', 'shot_power')

In [31]:
strickers = strickers.filter(strickers.stricker_grade > 70).sort(strickers.stricker_grade.desc())


strickers.show(10)

+-------------+-----------------+
|player_api_id|   stricker_grade|
+-------------+-----------------+
|        20276|            89.25|
|        37412|             89.0|
|        38817|            88.75|
|        32118|            88.25|
|        31921|             87.0|
|        30834|            86.75|
|       303824|85.10714285714286|
|       129944|             85.0|
|       158263|            84.75|
|       150565|            84.75|
+-------------+-----------------+
only showing top 10 rows



Let's get the strickers details

In [33]:
stricker_details = players.join(strickers, players.player_api_id == strickers.player_api_id)

# or stricker_details = players.join(strickers, ['player_api_id'])

In [34]:
stricker_details.count()

1609

In [35]:
stricker_details.show()

+-------------+--------------------+-------------------+------+------+-------------+-----------------+
|player_api_id|         player_name|           birthday|height|weight|player_api_id|   stricker_grade|
+-------------+--------------------+-------------------+------+------+-------------+-----------------+
|        20276|                Hulk|1986-07-25 00:00:00|180.34|   187|        20276|            89.25|
|        37412|       Sergio Aguero|1988-06-02 00:00:00|172.72|   163|        37412|             89.0|
|        38817|        Carlos Tevez|1984-02-05 00:00:00|172.72|   157|        38817|            88.75|
|        32118|      Lukas Podolski|1985-06-04 00:00:00|182.88|   183|        32118|            88.25|
|        31921|         Gareth Bale|1989-07-16 00:00:00|182.88|   163|        31921|             87.0|
|        30834|        Arjen Robben|1984-01-23 00:00:00|180.34|   176|        30834|            86.75|
|       303824|       Memphis Depay|1994-02-13 00:00:00|175.26|   172|   

When we want to use .join() operation, we can make use of broadcast variables to limit the total number of copies of the data
Broadcast the smaller DataFrame to all nodes.

In [37]:
# import the broadcast function 

from pyspark.sql.functions import broadcast

In [39]:
striker_details = players.select("player_api_id", "player_name").join(broadcast(strickers), ['player_api_id'],'inner')

In [40]:
striker_details = striker_details.sort(striker_details.stricker_grade.desc())

In [41]:
striker_details.show(5)

+-------------+--------------+--------------+
|player_api_id|   player_name|stricker_grade|
+-------------+--------------+--------------+
|        20276|          Hulk|         89.25|
|        37412| Sergio Aguero|          89.0|
|        38817|  Carlos Tevez|         88.75|
|        32118|Lukas Podolski|         88.25|
|        31921|   Gareth Bale|          87.0|
+-------------+--------------+--------------+
only showing top 5 rows

