## PySpark Data Engineering Practice (Sandboxing)
### Olympic Athlete Data

This notebook is for data engineering practicing purposes.
During this notebook I want to explore data by using and learning PySpark.
The data is from: https://www.kaggle.com/mysarahmadbhat/120-years-of-olympic-history

In [1]:
## Imports
from pyspark.sql import SparkSession ## Create session
from pyspark.sql.types import StructType, StructField, StringType, IntegerType ## Create schema

In [2]:
## Create spark sessions
spark = (SparkSession.builder.appName("AthletesAnalytics").getOrCreate())

### Import the data

In [3]:
## Create schema
schema = StructType([
    StructField("ID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", StringType(), True),
    StructField("Height", StringType(), True),
    StructField("Weight", StringType(), True),
    StructField("Team", StringType(), True),
    StructField("NOC", StringType(), True),
    StructField("Games", StringType(), True),
    StructField("Year", StringType(), True),
    StructField("Season", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Sport", StringType(), True),
    StructField("Event", StringType(), True),
    StructField("Medal", StringType(), True),
])

In [4]:
## Read CSV into dataframe
file_path = "./data/athlete_events.csv"
athletes_df = (spark.read.format("csv")
            .option("header", True)
            .schema(schema)
            .load(file_path))

In [5]:
## Showing first 10 rows
athletes_df.show(10, False)

+---+------------------------+---+---+------+------+--------------+---+-----------+----+------+-----------+-------------+----------------------------------+-----+
|ID |Name                    |Sex|Age|Height|Weight|Team          |NOC|Games      |Year|Season|City       |Sport        |Event                             |Medal|
+---+------------------------+---+---+------+------+--------------+---+-----------+----+------+-----------+-------------+----------------------------------+-----+
|1  |A Dijiang               |M  |24 |180   |80    |China         |CHN|1992 Summer|1992|Summer|Barcelona  |Basketball   |Basketball Men's Basketball       |NA   |
|2  |A Lamusi                |M  |23 |170   |60    |China         |CHN|2012 Summer|2012|Summer|London     |Judo         |Judo Men's Extra-Lightweight      |NA   |
|3  |Gunnar Nielsen Aaby     |M  |24 |NA    |NA    |Denmark       |DEN|1920 Summer|1920|Summer|Antwerpen  |Football     |Football Men's Football           |NA   |
|4  |Edgar Lindenau Aa

In [6]:
## Print out schema details
athletes_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)



In [7]:
athletes_df.show(3, vertical=True)

-RECORD 0----------------------
 ID     | 1                    
 Name   | A Dijiang            
 Sex    | M                    
 Age    | 24                   
 Height | 180                  
 Weight | 80                   
 Team   | China                
 NOC    | CHN                  
 Games  | 1992 Summer          
 Year   | 1992                 
 Season | Summer               
 City   | Barcelona            
 Sport  | Basketball           
 Event  | Basketball Men's ... 
 Medal  | NA                   
-RECORD 1----------------------
 ID     | 2                    
 Name   | A Lamusi             
 Sex    | M                    
 Age    | 23                   
 Height | 170                  
 Weight | 60                   
 Team   | China                
 NOC    | CHN                  
 Games  | 2012 Summer          
 Year   | 2012                 
 Season | Summer               
 City   | London               
 Sport  | Judo                 
 Event  | Judo Men's Extra-... 
 Medal  

### Exploration & Cleansing

In [8]:
### Check for NA values by exploring columns
from pyspark.sql.functions import col
athletes_df.filter(col("Medal") == "NA").show(10)
## NA values in: 
## Age, Height, Weight, Team, NOC National Olympics Committee, and Medal.

+---+--------------------+---+---+------+------+-------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
| ID|                Name|Sex|Age|Height|Weight|         Team|NOC|      Games|Year|Season|       City|               Sport|               Event|Medal|
+---+--------------------+---+---+------+------+-------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
|  1|           A Dijiang|  M| 24|   180|    80|        China|CHN|1992 Summer|1992|Summer|  Barcelona|          Basketball|Basketball Men's ...|   NA|
|  2|            A Lamusi|  M| 23|   170|    60|        China|CHN|2012 Summer|2012|Summer|     London|                Judo|Judo Men's Extra-...|   NA|
|  3| Gunnar Nielsen Aaby|  M| 24|    NA|    NA|      Denmark|DEN|1920 Summer|1920|Summer|  Antwerpen|            Football|Football Men's Fo...|   NA|
|  5|Christine Jacoba ...|  F| 21|   185|    82|  Netherlands|NED|1988 Winter|1988|Winter|    

#### Drop rows where age, height or weight have NA values.

In [9]:
athletes_df = athletes_df.filter((col("Age") != "NA") & (col("Height") != "NA") & (col("Weight") != "NA"))

In [10]:
## Check if correct
athletes_df.filter((col("Age") == "NA")).show(5)
athletes_df.filter((col("Height") == "NA")).show(5)
athletes_df.filter((col("Weight") == "NA")).show(5)

+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
| ID|Name|Sex|Age|Height|Weight|Team|NOC|Games|Year|Season|City|Sport|Event|Medal|
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+

+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
| ID|Name|Sex|Age|Height|Weight|Team|NOC|Games|Year|Season|City|Sport|Event|Medal|
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+

+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
| ID|Name|Sex|Age|Height|Weight|Team|NOC|Games|Year|Season|City|Sport|Event|Medal|
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+



#### Check if other columns have the right values

In [11]:
### Check if ID, Age, Height, Weight and Year are indeed all integer values
### Checking ID first on non numeric values
from pyspark.sql.types import DataType, StructField, StructType, IntegerType, StringType
test_df = athletes_df.select('ID',col('ID').cast(IntegerType()).isNotNull().alias("Value"))
test_df.filter((col("Value") == False)).show(5)

+---+-----+
| ID|Value|
+---+-----+
+---+-----+



In [12]:
### Checking Age on non numeric values
from pyspark.sql.types import DataType, StructField, StructType, IntegerType, StringType
test_df = athletes_df.select('Age',col('Age').cast(IntegerType()).isNotNull().alias("Value"))
test_df.filter((col("Value") == False)).show(5)

+------------+-----+
|         Age|Value|
+------------+-----+
|           M|false|
|           M|false|
|           M|false|
| -Andersen)"|false|
|           M|false|
+------------+-----+
only showing top 5 rows



In [13]:
### As seen something isn't going well. There are gender and even name values in Age. 
### Let's see how many rows have this problem
test_df.filter((col("Value") == True)).count()

205688

In [14]:
### 500 out of 206188 values have this problem
test_df.filter((col("Value") == False)).count()

500

In [15]:
### Percentage of broken rows
print(str(round(500 / 206188 * 100,2)) + '%')

0.24%


In [16]:
athletes_df.filter((col("Age") == "M")).show(5)

+----+--------------------+-----+---+------+------+----+-------------+-----+-----------+------+------+---------+----------+--------------------+
|  ID|                Name|  Sex|Age|Height|Weight|Team|          NOC|Games|       Year|Season|  City|    Sport|     Event|               Medal|
+----+--------------------+-----+---+------+------+----+-------------+-----+-----------+------+------+---------+----------+--------------------+
|2781|"Robert Jeffrey "...|  II"|  M|    22|   185|  91|United States|  USA|1992 Summer|  1992|Summer|Barcelona|  Baseball|Baseball Men's Ba...|
|3874|"William Lloyd ""...| Jr."|  M|    21|   200|  86|United States|  USA|1988 Summer|  1988|Summer|    Seoul|Basketball|Basketball Men's ...|
|4361|"Joseph ""Joe"" A...| Jr."|  M|    31|   170|  66|United States|  USA|1948 Summer|  1948|Summer|   London|    Rowing|Rowing Men's Doub...|
|6270|"Arthur DeLancey ...| Jr."|  M|    21|   193|  86|United States|  USA|1956 Summer|  1956|Summer|Melbourne|    Rowing|Rowing 

In [17]:
### The reason for this error is that there is a , in some of the names. 
### For now I'll drop these rows. This can be done with the following filter function
athletes_df = athletes_df.filter("CAST(Age AS INTEGER) IS NOT NULL")
athletes_df.filter((col("Age"))=="M").show()

+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
| ID|Name|Sex|Age|Height|Weight|Team|NOC|Games|Year|Season|City|Sport|Event|Medal|
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+



In [18]:
### By fixing the rows, there are also no wrong values anymore in Height
test_df = athletes_df.select('Height',col('Height').cast(IntegerType()).isNotNull().alias("Value"))
test_df.filter((col("Value") == False)).show(5)

+------+-----+
|Height|Value|
+------+-----+
+------+-----+



In [19]:
### As you can see, 500 rows where deleted.
athletes_df.count()

205688

In [20]:
### Check the distinct values for seasons.
### As seen there are no odd values in this column.
athletes_df.select("Season").distinct().show()

+------+
|Season|
+------+
|Summer|
|Winter|
+------+



In [21]:
### Check the length of NOC, as seen in the result this is always 3, so that is good.
from pyspark.sql.functions import length
test_df = athletes_df.withColumn("length_NOC",  length("NOC")).filter((col("length_NOC") != 3))
test_df.show()

+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+----------+
| ID|Name|Sex|Age|Height|Weight|Team|NOC|Games|Year|Season|City|Sport|Event|Medal|length_NOC|
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+----------+
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+----------+



In [22]:
### Check if sex is only M and F, as seen this is correct.
athletes_df.filter((col("Sex")!="F") & (col("Sex")!="M")).show()

+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
| ID|Name|Sex|Age|Height|Weight|Team|NOC|Games|Year|Season|City|Sport|Event|Medal|
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+
+---+----+---+---+------+------+----+---+-----+----+------+----+-----+-----+-----+



### Masking the name

To practice the idea of private information I want to explore masking the name.

#### Masking 

In [23]:
### Masks name showing the first and last two characters. 
### If name is less than 5 characters, it will only show the first character.
from pyspark.sql.functions import udf

def mask_name(columnValue):
        if len(columnValue) < 5:
            nameList=list(columnValue)
            start = "".join(nameList[:1])
            masking = 'x'*(len(nameList)-1)
            masked_name = start+masking
        else:
            nameList=list(columnValue)
            start = "".join(nameList[:2])
            end = "".join(nameList[-2:])
            masking = 'x'*(len(nameList)-4)
            masked_name = start+masking+end
        return masked_name
    
### Make the function work with PySpark
mask_name_udf = udf(mask_name, StringType())

### Test function
athletes_df.select("Name",mask_name_udf(athletes_df["Name"])).distinct().show(5, truncate=False)

+-------------------------+-------------------------+
|Name                     |mask_name(Name)          |
+-------------------------+-------------------------+
|Shakeel Abbasi           |Shxxxxxxxxxxsi           |
|Raouf Abdelraouf         |Raxxxxxxxxxxxxuf         |
|Sara Helena berg         |Saxxxxxxxxxxxxrg         |
|Jos Eugenio Acosta       |Joxxxxxxxxxxxxxxta       |
|"Frances ""Fran"" Adcock"|"Fxxxxxxxxxxxxxxxxxxxxxk"|
+-------------------------+-------------------------+
only showing top 5 rows



In [24]:
athletes_df = athletes_df.withColumn("MaskedName",mask_name_udf(athletes_df["Name"])).drop(col("Name"))

In [25]:
athletes_df.show(1,vertical=True)

-RECORD 0--------------------------
 ID         | 1                    
 Sex        | M                    
 Age        | 24                   
 Height     | 180                  
 Weight     | 80                   
 Team       | China                
 NOC        | CHN                  
 Games      | 1992 Summer          
 Year       | 1992                 
 Season     | Summer               
 City       | Barcelona            
 Sport      | Basketball           
 Event      | Basketball Men's ... 
 Medal      | NA                   
 MaskedName | A xxxxxng            
only showing top 1 row



### Fixing Schema

In [26]:
athletes_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Height: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)
 |-- MaskedName: string (nullable = true)



In [27]:
### ID, Age Height, Weight and Year should be integer
athletes_final_df = (athletes_df.withColumn("PlayerID", col("ID").cast(IntegerType()))
                    .drop(col("ID"))
                    .withColumn("Name", col("MaskedName").cast(StringType()))
                    .withColumn("Age", col("Age").cast(IntegerType()))
                    .withColumn("Height", col("Height").cast(IntegerType()))
                    .withColumn("Weight", col("Weight").cast(IntegerType()))
                    .withColumn("Year", col("Year").cast(IntegerType()))
                    )
athletes_final_df.printSchema()

root
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)
 |-- MaskedName: string (nullable = true)
 |-- PlayerID: integer (nullable = true)
 |-- Name: string (nullable = true)



In [28]:
### Sort column order
athletes_sorted_df = athletes_final_df.select(
    [athletes_final_df.columns[-2]]
    + [athletes_final_df.columns[-1]]
    + athletes_final_df.columns[:-3])

athletes_sorted_df.show(1, vertical=True)

-RECORD 0------------------------
 PlayerID | 1                    
 Name     | A xxxxxng            
 Sex      | M                    
 Age      | 24                   
 Height   | 180                  
 Weight   | 80                   
 Team     | China                
 NOC      | CHN                  
 Games    | 1992 Summer          
 Year     | 1992                 
 Season   | Summer               
 City     | Barcelona            
 Sport    | Basketball           
 Event    | Basketball Men's ... 
 Medal    | NA                   
only showing top 1 row



In [29]:
athletes_sorted_df.printSchema()

root
 |-- PlayerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)



### Save to parquet

In [30]:
## Write to parquet file, but this crashes laptop
#output_path = './output/athlete_data'
#athletes_sorted_df.write.partitionBy("Games").mode("overwrite").parquet(output_path)

### Aggregations

In [31]:
from pyspark.sql.functions import min, max, sum, sumDistinct, avg, col, expr, round, count

#### Medals per year

In [32]:
### Get year and medal
medals_per_year_df = athletes_sorted_df.select(
                    col("Year"),
                    col("Medal")
                    )
medals_per_year_df.show(5)

+----+-----+
|Year|Medal|
+----+-----+
|1992|   NA|
|2012|   NA|
|1988|   NA|
|1988|   NA|
|1992|   NA|
+----+-----+
only showing top 5 rows



In [33]:
### Filter out all rows with NA
medals_per_year_df = medals_per_year_df.filter(col("Medal")!="NA")
medals_per_year_df.show(5)

+----+------+
|Year| Medal|
+----+------+
|2014|Bronze|
|1948|Bronze|
|1948|  Gold|
|1948|  Gold|
|1948|  Gold|
+----+------+
only showing top 5 rows



In [34]:
### show amount of medals per Year
medals_per_year_df.groupBy("Year").agg(count("Medal").alias("Medals Amount")).orderBy("Year", ascending=False).show(5)

+----+-------------+
|Year|Medals Amount|
+----+-------------+
|2016|         2013|
|2014|          568|
|2012|         1914|
|2010|          511|
|2008|         2035|
+----+-------------+
only showing top 5 rows



#### Medals per country

In [35]:
### Show distinct medal values.
athletes_sorted_df.select("Medal").distinct().show()

+------+
| Medal|
+------+
|    NA|
|Silver|
|  Gold|
|Bronze|
+------+



In [36]:
### create new dataframe and filter out NA values for the medal column.
medals_per_country_df = athletes_sorted_df.select(
                    col("Team"),
                    col("Medal")
                    )

medals_per_country_df = medals_per_country_df.filter(col("Medal")!="NA")

medals_per_country_df.show(5)

+-------+------+
|   Team| Medal|
+-------+------+
|Finland|Bronze|
|Finland|Bronze|
|Finland|  Gold|
|Finland|  Gold|
|Finland|  Gold|
+-------+------+
only showing top 5 rows



In [37]:
### Aggregate and order by medal amount
medals_per_country_df = medals_per_country_df.groupBy("Team","Medal").agg(count("Medal").alias("Amount")).orderBy("Amount", ascending=False)
medals_per_country_df.show(10)

+-------------+------+------+
|         Team| Medal|Amount|
+-------------+------+------+
|United States|  Gold|  1995|
|United States|Silver|  1230|
| Soviet Union|  Gold|   961|
|United States|Bronze|   909|
| Soviet Union|Silver|   629|
| Soviet Union|Bronze|   613|
|      Germany|Bronze|   538|
|      Germany|  Gold|   507|
|    Australia|Bronze|   471|
|      Germany|Silver|   470|
+-------------+------+------+
only showing top 10 rows



#### Show information about height and weight

In [38]:
### This could also be used to make sure there are no odd values in the columns
athletes_sorted_df.select("Height", "Weight").describe().show()

+-------+------------------+------------------+
|summary|            Height|            Weight|
+-------+------------------+------------------+
|  count|            205688|            205688|
|   mean|175.36527167360273| 70.67846933219245|
| stddev|10.544748708871328|14.335718186598772|
|    min|               127|                25|
|    max|               226|               214|
+-------+------------------+------------------+



In [39]:
### Weight of only 25?? Let's check out why that is.
athletes_sorted_df.select("Weight","Height","Age","PlayerID","Name","Team").filter(col("Weight")==25).distinct().show()

+------+------+---+--------+--------------+-----------+
|Weight|Height|Age|PlayerID|          Name|       Team|
+------+------+---+--------+--------------+-----------+
|    25|   135| 14|   21049|Chxxxxxxxxxxui|North Korea|
+------+------+---+--------+--------------+-----------+



#### Which country has the most medals in basketball?

In [40]:
athletes_sorted_df.show(2)

+--------+---------+---+---+------+------+-----+---+-----------+----+------+---------+----------+--------------------+-----+
|PlayerID|     Name|Sex|Age|Height|Weight| Team|NOC|      Games|Year|Season|     City|     Sport|               Event|Medal|
+--------+---------+---+---+------+------+-----+---+-----------+----+------+---------+----------+--------------------+-----+
|       1|A xxxxxng|  M| 24|   180|    80|China|CHN|1992 Summer|1992|Summer|Barcelona|Basketball|Basketball Men's ...|   NA|
|       2| A xxxxsi|  M| 23|   170|    60|China|CHN|2012 Summer|2012|Summer|   London|      Judo|Judo Men's Extra-...|   NA|
+--------+---------+---+---+------+------+-----+---+-----------+----+------+---------+----------+--------------------+-----+
only showing top 2 rows



In [41]:
best_in_basketball_df = athletes_sorted_df.select(
                    col("Team"),
                    col("Sport"),
                    col("Medal")
                    )

best_in_basketball_df = best_in_basketball_df.filter(col("Sport")=="Basketball")

best_in_basketball_df.show(3)

+-----+----------+-----+
| Team|     Sport|Medal|
+-----+----------+-----+
|China|Basketball|   NA|
|Spain|Basketball|   NA|
|Italy|Basketball|   NA|
+-----+----------+-----+
only showing top 3 rows



In [42]:
best_in_basketball_df = best_in_basketball_df.groupBy("Team","Sport").agg(count("Medal").alias("Amount")).orderBy("Amount", ascending=False)
best_in_basketball_df.show(5)

+-------------+----------+------+
|         Team|     Sport|Amount|
+-------------+----------+------+
|United States|Basketball|   331|
|    Australia|Basketball|   248|
|       Brazil|Basketball|   211|
|        China|Basketball|   200|
|        Spain|Basketball|   180|
+-------------+----------+------+
only showing top 5 rows



As you could expect, US has the most medals in Basketball.