In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import * 


spark = SparkSession \
    .builder \
    .appName("Analyzing soccer players") \
    .getOrCreate()

spark

# Loading DataFrames

In [32]:
player_df =spark.read.option('header', True).option('inferSchema', True).csv('gs://dataproc-staging-us-east1-548550014762-rie5an4z/notebooks/jupyter/player.csv')
player_df.printSchema()
  
player_attr_df =spark.read.option('header', True).option('inferSchema', True).csv('gs://dataproc-staging-us-east1-548550014762-rie5an4z/notebooks/jupyter/player_attributes.csv')
player_attr_df.printSchema()
  

root
 |-- id: integer (nullable = true)
 |-- player_api_id: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: integer (nullable = true)
 |-- birthday: timestamp (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: integer (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- player_fifa_api_id: integer (nullable = true)
 |-- player_api_id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- overall_rating: integer (nullable = true)
 |-- potential: integer (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: integer (nullable = true)
 |-- finishing: integer (nullable = true)
 |-- heading_accuracy: integer (nullable = true)
 |-- short_passing: integer (nullable = true)
 |-- volleys: integer (nullable = true)
 |-- dribbling: integer (nullable = true)
 |-- curve: integer (nulla

# Selecting  columns

In [33]:
df = player_df.select('player_api_id')
df.show(5,False)
player_df.select('id','player_api_id').show(5,False)

+-------------+
|player_api_id|
+-------------+
|505942       |
|155782       |
|162549       |
|30572        |
|23780        |
+-------------+
only showing top 5 rows

+---+-------------+
|id |player_api_id|
+---+-------------+
|1  |505942       |
|2  |155782       |
|3  |162549       |
|4  |30572        |
|5  |23780        |
+---+-------------+
only showing top 5 rows



# Add and update columns 
#### withColumn() is a DataFrame function that is used to add a new column to DataFrame, change the value of an existing column, convert the datatype of a column, derive a new column from an existing column


In [34]:
  
player_df = player_df.withColumn("country", lit("USA"))
player_df.show(5,False)  
player_df = player_df.withColumn("height", col('height')/2.54)
player_df.show(5,False)  
  



+---+-------------+------------------+------------------+-------------------+------+------+-------+
|id |player_api_id|player_name       |player_fifa_api_id|birthday           |height|weight|country|
+---+-------------+------------------+------------------+-------------------+------+------+-------+
|1  |505942       |Aaron Appindangoye|218353            |1992-02-29 00:00:00|182.88|187   |USA    |
|2  |155782       |Aaron Cresswell   |189615            |1989-12-15 00:00:00|170.18|146   |USA    |
|3  |162549       |Aaron Doran       |186170            |1991-05-13 00:00:00|170.18|163   |USA    |
|4  |30572        |Aaron Galindo     |140161            |1982-05-08 00:00:00|182.88|198   |USA    |
|5  |23780        |Aaron Hughes      |17725             |1979-11-08 00:00:00|182.88|154   |USA    |
+---+-------------+------------------+------------------+-------------------+------+------+-------+
only showing top 5 rows

+---+-------------+------------------+------------------+------------------

# Rename columns 
#### withColumnRenamed() function on DataFrame to change a column name. This is the most straight forward approach; this function takes two parameters; the first is your existing column name and the second is the new column name you wish for.


In [35]:
player_df = player_df.withColumnRenamed('height', 'height_in_inches')
player_df.show(5,False)  
  
 

+---+-------------+------------------+------------------+-------------------+----------------+------+-------+
|id |player_api_id|player_name       |player_fifa_api_id|birthday           |height_in_inches|weight|country|
+---+-------------+------------------+------------------+-------------------+----------------+------+-------+
|1  |505942       |Aaron Appindangoye|218353            |1992-02-29 00:00:00|72.0            |187   |USA    |
|2  |155782       |Aaron Cresswell   |189615            |1989-12-15 00:00:00|67.0            |146   |USA    |
|3  |162549       |Aaron Doran       |186170            |1991-05-13 00:00:00|67.0            |163   |USA    |
|4  |30572        |Aaron Galindo     |140161            |1982-05-08 00:00:00|72.0            |198   |USA    |
|5  |23780        |Aaron Hughes      |17725             |1979-11-08 00:00:00|72.0            |154   |USA    |
+---+-------------+------------------+------------------+-------------------+----------------+------+-------+
only showi

# Drop columns 

In [36]:
player_df.printSchema()
player_df = player_df.drop('id', 'player_fifa_api_id')
player_df.printSchema()
  
player_attr_df = player_attr_df.drop(
    'id', 
    'player_fifa_api_id', 
    'preferred_foot',
    'attacking_work_rate',
    'defensive_work_rate',
    'crossing',
    'jumping',
    'sprint_speed',
    'balance',
    'aggression',
    'short_passing',
    'potential'
   )
player_attr_df.columns

root
 |-- id: integer (nullable = true)
 |-- player_api_id: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: integer (nullable = true)
 |-- birthday: timestamp (nullable = true)
 |-- height_in_inches: double (nullable = true)
 |-- weight: integer (nullable = true)
 |-- country: string (nullable = false)

root
 |-- player_api_id: integer (nullable = true)
 |-- player_name: string (nullable = true)
 |-- birthday: timestamp (nullable = true)
 |-- height_in_inches: double (nullable = true)
 |-- weight: integer (nullable = true)
 |-- country: string (nullable = false)



['player_api_id',
 'date',
 'overall_rating',
 'finishing',
 'heading_accuracy',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'agility',
 'reactions',
 'shot_power',
 'stamina',
 'strength',
 'long_shots',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes']

# When and otherwise function

In [37]:
player_df = player_df.withColumn("region", when(col("country") == "USA","North America").otherwise("Unknown"))
player_df.show(5,False)
player_df.count()

+-------------+------------------+-------------------+----------------+------+-------+-------------+
|player_api_id|player_name       |birthday           |height_in_inches|weight|country|region       |
+-------------+------------------+-------------------+----------------+------+-------+-------------+
|505942       |Aaron Appindangoye|1992-02-29 00:00:00|72.0            |187   |USA    |North America|
|155782       |Aaron Cresswell   |1989-12-15 00:00:00|67.0            |146   |USA    |North America|
|162549       |Aaron Doran       |1991-05-13 00:00:00|67.0            |163   |USA    |North America|
|30572        |Aaron Galindo     |1982-05-08 00:00:00|72.0            |198   |USA    |North America|
|23780        |Aaron Hughes      |1979-11-08 00:00:00|72.0            |154   |USA    |North America|
+-------------+------------------+-------------------+----------------+------+-------+-------------+
only showing top 5 rows



11060

 # Where and Filter

In [46]:
  
player_filtered_df = player_df.filter(col('weight')> 190)
player_filtered_df.count()
  

790

# Distinct

In [39]:
 
player_attr_df.select('player_api_id')\
                   .distinct()\
                   .count()
  

11060

# Sort

In [40]:

player_df = player_df.orderBy('player_api_id')
player_df.show(5, False)
  

+-------------+-----------------+-------------------+----------------+------+-------+-------------+
|player_api_id|player_name      |birthday           |height_in_inches|weight|country|region       |
+-------------+-----------------+-------------------+----------------+------+-------+-------------+
|2625         |Patryk Rachwal,18|1981-01-27 00:00:00|69.0            |154   |USA    |North America|
|2752         |Diego Mainz      |1982-12-29 00:00:00|74.0            |174   |USA    |North America|
|2768         |Jose Dorado      |1982-07-10 00:00:00|71.0            |154   |USA    |North America|
|2770         |Ignacio Gonzalez |1982-05-14 00:00:00|71.0            |161   |USA    |North America|
|2790         |Alberto Rey      |1974-02-15 00:00:00|69.0            |163   |USA    |North America|
+-------------+-----------------+-------------------+----------------+------+-------+-------------+
only showing top 5 rows



# Aggregation fucntions

In [41]:
player_attr_df_agg = player_attr_df.groupBy('player_api_id')\
                       .agg({
                           'finishing':"avg",
                           "shot_power":"avg",
                           "acceleration":"avg"
                       })
player_attr_df_agg.show(25, False)
 

+-------------+------------------+------------------+------------------+
|player_api_id|avg(finishing)    |avg(acceleration) |avg(shot_power)   |
+-------------+------------------+------------------+------------------+
|114503       |71.0              |80.0625           |74.4375           |
|244128       |42.0              |65.33333333333333 |58.0              |
|38395        |25.0              |47.0              |25.0              |
|40574        |34.69230769230769 |60.0              |40.46153846153846 |
|562698       |20.166666666666668|64.5              |40.166666666666664|
|171094       |38.1              |63.15             |35.1              |
|34239        |21.25             |58.0              |54.0              |
|27484        |64.8695652173913  |75.30434782608695 |71.65217391304348 |
|357220       |21.333333333333332|54.666666666666664|46.333333333333336|
|166648       |82.3076923076923  |83.65384615384616 |77.1923076923077  |
|265363       |53.0              |86.11764705882354

# join

In [42]:
#cond= player_df.player_api_id == player_attr_df.player_api_id and id ==id or date=date
join_df = player_df.join( player_attr_df, player_df.player_api_id == player_attr_df.player_api_id , 'inner')
join_df.show(25, False) 
  

+-------------+------------------+-------------------+----------------+------+-------+-------------+-------------+-------------------+--------------+---------+----------------+-------+---------+-----+------------------+------------+------------+------------+-------+---------+----------+-------+--------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+
|player_api_id|player_name       |birthday           |height_in_inches|weight|country|region       |player_api_id|date               |overall_rating|finishing|heading_accuracy|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|agility|reactions|shot_power|stamina|strength|long_shots|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_positioning|gk_reflexes|
+-------------+------------------+-------------------+----------------+------+------