# Assignment 2

## Due date/time: February 13, 2023 at 11:59 PM
## Submit Jupyter notebook to class Gradescope

Variable `data` shows where data is located. Modify it as needed

In [1]:
data = "gs://pstat135-bh/notebooks/jupyter/data/"

## Data

This is a historical dataset on the modern Olympic Games, including all the Games from Athens 1896 to Rio 2016. The data was taken from Kaggle. The `athlete_events` Dataset contains $271,116$ rows and $15$ columns and the NOC region dataset contains $230$ rows and $3$ columns. They will be merged together by the National Olympic Committee (NOC) region. Both files are comma separated.

**Source:**

Griffin, R, H (2018) 120 years of Olympic history: athletes and results, athlete_events, Found at: https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results#athlete_events.csv

Griffin, R, H (2018) 120 years of Olympic history: athletes and results, noc_regions, Found at: https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results#noc_regions.csv

**ATTRIBUTES:**

**athlete_events.csv**

| Column Name | Data Type | Description/Notes |
|:----:|:----:|:----|
| ID |  integer | Unique number for each athlete |
| Name | string | Athlete’s name |
| Sex | string | M or F |
| Age | integer |  |
| Height | integer | In centimeters |
| Weight | integer | In kilograms |
| Team | string | Team name |
| NOC | string | National Olympic Committee, 3 letter code (Matches with `NOC` from noc_regions.csv) |
| Games | string | Year and season |
| Year | integer |  |
| Season | string | Summer or Winter |
| City | string | Host city |
| Sport | string |  |
| Event | string |  |
| Medal | string | Gold, Silver, Bronze, or NA |

**noc_regions.csv**

| Column Name | Data Type | Description/Notes |
|:--|--|:--|
| NOC | string | National Olympic Committee, 3 letter code (Matches with `NOC` from noc_regions.csv) |
| Region | string |  |
| notes | string |  |

## Upload the data into Google Cloud Storage

Use the paths above to download our two files and upload them to your Google bucket. For consistency use the following path:

`gs://<BUCKET-NAME>/notebooks/jupyter/data/olympics-analysis`

and upload the files into *olympics-analysis* directory.

Confirm that files were uploaded successfully and are accessible via the notebook by the following gsutil command:

In [2]:
!gsutil ls {data + "olympics-analysis"}

gs://pstat135-bh/notebooks/jupyter/data/olympics-analysis/athlete_events.csv
gs://pstat135-bh/notebooks/jupyter/data/olympics-analysis/noc_regions.csv


## Load the data into Spark

We can either ask Spark to infer the schema or we explicitely specify it ourselves. For this example we need to specify the schema explicitely since not all the columns will be converted the way we would like to by the default option.

As a reminder, here is how we can define a schema contained of two columns, one string and one integer:

```python
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("ID", LongType(), True),
  StructField("name", StringType(), True)
])

df = spark.read.format("csv")\
  .schema(myManualSchema)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load("gs/path/to/file")
```

Modify this code to load athlete_events.csv. Call this DataFrame `athlete_events`:

**Note:** We have "NA" values in our data. This could cause issues when loading the data. To overcome this we need to let Spark know that what string is representing `null` in the data. We can use the option/parameter `nullValue` (as used in the sample code above) and set it to "NA".

In [3]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

mySchema1 = StructType([
  StructField("ID", LongType(), True),
  StructField("Name", StringType(), True),
  StructField("Sex", StringType(), True),
  StructField("Age", LongType(), True),
  StructField("Height", LongType(), True),
  StructField("Weight", LongType(), True),
  StructField("Team", StringType(), True),
  StructField("NOC", StringType(), True),
  StructField("Games", StringType(), True),
  StructField("Year", LongType(), True),
  StructField("Season", StringType(), True),
  StructField("City", StringType(), True),
  StructField("Sport", StringType(), True),
  StructField("Event", StringType(), True),
  StructField("Medal", StringType(), True)


])

athlete_events = spark.read.format("csv")\
  .schema(mySchema1)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load("gs://pstat135-bh/notebooks/jupyter/data/olympics-analysis/athlete_events.csv")

Print the schema of this DataFrame:

In [4]:
athlete_events.schema

StructType(List(StructField(ID,LongType,true),StructField(Name,StringType,true),StructField(Sex,StringType,true),StructField(Age,LongType,true),StructField(Height,LongType,true),StructField(Weight,LongType,true),StructField(Team,StringType,true),StructField(NOC,StringType,true),StructField(Games,StringType,true),StructField(Year,LongType,true),StructField(Season,StringType,true),StructField(City,StringType,true),StructField(Sport,StringType,true),StructField(Event,StringType,true),StructField(Medal,StringType,true)))

Print the first 5 rows:

In [5]:
athlete_events.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+---------+-------------+--------------------+-----+
| ID|                Name|Sex|Age|Height|Weight|          Team|NOC|      Games|Year|Season|     City|        Sport|               Event|Medal|
+---+--------------------+---+---+------+------+--------------+---+-----------+----+------+---------+-------------+--------------------+-----+
|  1|           A Dijiang|  M| 24|   180|    80|         China|CHN|1992 Summer|1992|Summer|Barcelona|   Basketball|Basketball Men's ...| null|
|  2|            A Lamusi|  M| 23|   170|    60|         China|CHN|2012 Summer|2012|Summer|   London|         Judo|Judo Men's Extra-...| null|
|  3| Gunnar Nielsen Aaby|  M| 24|  null|  null|       Denmark|DEN|1920 Summer|1920|Summer|Antwerpen|     Football|Football Men's Fo...| null|
|  4|Edgar Lindenau Aabye|  M| 34|  null|  null|Denmark/Sweden|DEN|1900 Summer|1900|Summer|    Paris|   Tug-Of-War|Tug-Of-War Men's ...| Gold|

                                                                                

We won't use the following columns, let's drop them from the DataFrame in a persistent way:

* ID
* Games
* Event

In [6]:
athlete_events = athlete_events.drop('ID', 'Games', 'Event')

In [7]:
athlete_events.show(5) # uncomment before submission

+--------------------+---+---+------+------+--------------+---+----+------+---------+-------------+-----+
|                Name|Sex|Age|Height|Weight|          Team|NOC|Year|Season|     City|        Sport|Medal|
+--------------------+---+---+------+------+--------------+---+----+------+---------+-------------+-----+
|           A Dijiang|  M| 24|   180|    80|         China|CHN|1992|Summer|Barcelona|   Basketball| null|
|            A Lamusi|  M| 23|   170|    60|         China|CHN|2012|Summer|   London|         Judo| null|
| Gunnar Nielsen Aaby|  M| 24|  null|  null|       Denmark|DEN|1920|Summer|Antwerpen|     Football| null|
|Edgar Lindenau Aabye|  M| 34|  null|  null|Denmark/Sweden|DEN|1900|Summer|    Paris|   Tug-Of-War| Gold|
|Christine Jacoba ...|  F| 21|   185|    82|   Netherlands|NED|1988|Winter|  Calgary|Speed Skating| null|
+--------------------+---+---+------+------+--------------+---+----+------+---------+-------------+-----+
only showing top 5 rows



 Now load noc_regions.csv. Call this DataFrame `noc_regions`:

In [8]:
mySchema2 = StructType([
  StructField("NOC", StringType(), True),
  StructField("Region", StringType(), True),
  StructField("Notes", StringType(), True),
  
])

noc_regions = spark.read.format("csv")\
  .schema(mySchema2)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load("gs://pstat135-bh/notebooks/jupyter/data/olympics-analysis/noc_regions.csv")

In [9]:
noc_regions.show(5) # uncomment before submission

+---+-----------+--------------------+
|NOC|     Region|               Notes|
+---+-----------+--------------------+
|AFG|Afghanistan|                null|
|AHO|    Curacao|Netherlands Antilles|
|ALB|    Albania|                null|
|ALG|    Algeria|                null|
|AND|    Andorra|                null|
+---+-----------+--------------------+
only showing top 5 rows



### Caching

Since we will be using these two DataFrames a lot in this notebook let's `cache()` them to speed up our execution. Caching allows the DataFrame to be loaded and persist in the memory. If we don't use this option, every time we execute an action our DataFrame gets loaded from our Cloud Storage, which is not ideal and will add to our execution time:

**Note:** Caching is a lazy transformation. It will happen the first time you execute an action against the DataFrame, not when you cache that DataFrame.

In [10]:
athlete_events.cache()  # uncomment

DataFrame[Name: string, Sex: string, Age: bigint, Height: bigint, Weight: bigint, Team: string, NOC: string, Year: bigint, Season: string, City: string, Sport: string, Medal: string]

In [11]:
noc_regions.cache()  # uncomment

DataFrame[NOC: string, Region: string, Notes: string]

## Question 1

What is the minimum and maximum `year`?

**PSTAT 234**: use `agg` to show both minimum and maximum values in a single output.

In [12]:
import pyspark.sql.functions as F

athlete_events.select(F.min('year')).show()
athlete_events.select(F.max('year')).show()


                                                                                

+---------+
|min(year)|
+---------+
|     1896|
+---------+

+---------+
|max(year)|
+---------+
|     2016|
+---------+



## Question 2

Is the following statement True or False?

> Averag age of female athletes who attended the olympic games after 1990 has raised when compared to the era before then.

In [13]:
athlete_events.filter((athlete_events['Year'] <1990) & (athlete_events['Sex'] == 'F')).agg({'age':'avg'}).show() #before 1990
athlete_events.filter((athlete_events['Year'] >=1990) & (athlete_events['Sex'] == 'F')).agg({'age':'avg'}).show() #after 1990

+------------------+
|          avg(age)|
+------------------+
|22.034368070953438|
+------------------+

+------------------+
|          avg(age)|
+------------------+
|24.619499568593614|
+------------------+



In [14]:
#True

## Question 3

How many Gold medals were given to men from 1970 to 2000 (including both years)?

In [15]:
athlete_events.filter((athlete_events['Year'] >=1970) & (athlete_events['Year'] <=2000) & (athlete_events['Sex'] == 'M') & (athlete_events['Medal'] == 'Gold')).count()

3186

## Question 4

How many NOCs attended Summer Olympics 2016 in Rio de Janeiro?

NOC stands for National Olympic Committee. Almost equivalent to a country.

In [16]:
athlete_events.filter((athlete_events['Year'] == 2016)).select('NOC').distinct().count()

207

## Question 5

Create two DataFrames, one for the Winter games and one for the Summer games; these DataFrames should include a list of all NOCs that have wone gold medals in the colympics, and their count. Sort these DataFrame by the count in a descending order. Call these DataFrames `winter_gold_count` and `summer_gold_count` respectively. Using these two, answer the following questions:

Which country has the highest gold medal count in the Winter Olympics? How about the Summer Olympics?

In [17]:
#Highest gold medal count in Winter: Canada
#Highest gold medal count in Summer: USA

from pyspark.sql.functions import desc, col

winter_gold_count = athlete_events.filter((athlete_events['Season'] == 'Winter') & (athlete_events['Medal'] == 'Gold')).select('NOC').groupby('NOC').count().sort(col('count').desc())
winter_gold_count.show(1)
summer_gold_count = athlete_events.filter((athlete_events['Season'] == 'Summer') & (athlete_events['Medal'] == 'Gold')).select('NOC').groupby('NOC').count().sort(col('count').desc())
summer_gold_count.show(1)

+---+-----+
|NOC|count|
+---+-----+
|CAN|  305|
+---+-----+
only showing top 1 row

+---+-----+
|NOC|count|
+---+-----+
|USA| 2376|
+---+-----+
only showing top 1 row



## Question 6

Using the common field `NOC`, merge `summer_gold_count` and `noc_regions` DataFrames.

Which region takes the 10th place? This is based on the number of gold medals in all of the Summer Olympics in our dataset.

**PSTAT 234**: repeat the same procedure using SQL.

In [18]:
# Your answer goes here
merged = summer_gold_count.join(noc_regions, summer_gold_count.NOC == noc_regions.NOC)
merged = merged.withColumnRenamed('count','gold_count')
merged = merged.groupBy("Region").sum("gold_count")
merged.sort(col('sum(gold_count)').desc()).show(10)


+---------+---------------+
|   Region|sum(gold_count)|
+---------+---------------+
|      USA|           2376|
|   Russia|           1220|
|  Germany|           1074|
|       UK|            635|
|    Italy|            518|
|   France|            465|
|  Hungary|            432|
|Australia|            362|
|   Sweden|            352|
|    China|            335|
+---------+---------------+
only showing top 10 rows



In [19]:
#China