# Assignment 2

Variable `data` shows where data is located. Modify it as needed

In [1]:
data = "gs://is-843-avp-01/notebooks/jupyter/data/olympics-analysis/"

## Data

This is a historical dataset on the modern Olympic Games, including all the Games from Athens 1896 to Rio 2016. The data was taken from Kaggle. The `athlete_events` Dataset contains $271,116$ rows and $15$ columns and the NOC region dataset contains $230$ rows and $3$ columns. They will be merged together by the National Olympic Committee (NOC) region. Both files are comma separated.

**Source:**

Griffin, R, H (2018) 120 years of Olympic history: athletes and results, athlete_events, Found at: https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results#athlete_events.csv

Griffin, R, H (2018) 120 years of Olympic history: athletes and results, noc_regions, Found at: https://www.kaggle.com/heesoo37/120-years-of-olympic-history-athletes-and-results#noc_regions.csv

**ATTRIBUTES:**

**athlete_events.csv**

| Column Name | Data Type | Description/Notes |
|:----:|:----:|:----|
| ID |  integer | Unique number for each athlete |
| Name | string | Athlete’s name |
| Sex | string | M or F |
| Age | integer |  |
| Height | integer | In centimeters |
| Weight | integer | In kilograms |
| Team | string | Team name |
| NOC | string | National Olympic Committee, 3 letter code (Matches with `NOC` from noc_regions.csv) |
| Games | string | Year and season |
| Year | integer |  |
| Season | string | Summer or Winter |
| City | string | Host city |
| Sport | string |  |
| Event | string |  |
| Medal | string | Gold, Silver, Bronze, or NA |

**noc_regions.csv**

| Column Name | Data Type | Description/Notes |
|:--|--|:--|
| NOC | string | National Olympic Committee, 3 letter code (Matches with `NOC` from noc_regions.csv) |
| Region | string |  |
| notes | string |  |

## Upload the data into Google Cloud Storage

Use the paths above to download our two files and upload them to your Google bucket. For consistency use the following path:

`gs://is-843-avp-01/notebooks/jupyter/data/olympics-analysis`

and upload the files into *olympics-analysis* directory.

Confirm that files were uploaded successfully and are accessible via the notebook by the following gsutil command:

In [2]:
!gsutil ls {data}

gs://is-843-avp-01/notebooks/jupyter/data/olympics-analysis/
gs://is-843-avp-01/notebooks/jupyter/data/olympics-analysis/athlete_events_new.csv
gs://is-843-avp-01/notebooks/jupyter/data/olympics-analysis/noc_regions.csv


## Load the data into Spark

As seen in the [class notes](https://github.com/soltaniehha/Big-Data-Analytics-for-Business/blob/master/05-Basic-DF-Operations/01-Basic-Structured-Operations.ipynb) we can either ask Spark to infer the schema or we explicitely specify it ourselves. For this example we need to specify the schema explicitely since not all the columns will be converted the way we would like to by the default option.

As a reminder, here is how we can define a schema contained of two columns, one string and one integer:

```python
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
  StructField("ID", LongType(), True),
  StructField("name", StringType(), True)
])

df = spark.read.format("csv")\
  .schema(myManualSchema)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load("gs/path/to/file")
```

Modify this code to load athlete_events.csv. Call this DataFrame `athlete_events`:

**Note:** We have "NA" values in our data. This could cause issues when loading the data. To overcome this we need to let Spark know that what string is representing `null` in the data. We can use the option/parameter `nullValue` (as used in the sample code above) and set it to "NA".

In [3]:
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType

myManualSchema = StructType([
  StructField("ID", LongType(), True),
  StructField("Name", StringType(), True),
  StructField("Sex", StringType(), True),
  StructField("Age", LongType(), True),
  StructField("Height", LongType(), True),
  StructField("Weight", LongType(), True),
  StructField("Team", StringType(), True),
  StructField("NOC", StringType(), True),
  StructField("Games", StringType(), True),
  StructField("Year", LongType(), True),
  StructField("Season", StringType(), True),
  StructField("City", StringType(), True),
  StructField("Sport", StringType(), True),
  StructField("Event", StringType(), True),
  StructField("Medal", StringType(), True)
])

Print the schema of this DataFrame:

In [4]:


from pyspark.sql.functions import avg, col,sum,count

athlete_events = spark.read.format("csv")\
  .schema(myManualSchema)\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .option("nullValue", "NA")\
  .load(data+"athlete_events_new.csv")

athlete_events.count()

                                                                                

271116

In [5]:
# Ensuring there are no duplicate rows and dropping them if they are present.
# This should print the list of the non-duplicate rows.

athlete_events = athlete_events.drop_duplicates()
athlete_events.count()

                                                                                

269729

In [6]:
# Your answer goes here
athlete_events.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Height: long (nullable = true)
 |-- Weight: long (nullable = true)
 |-- Team: string (nullable = true)
 |-- NOC: string (nullable = true)
 |-- Games: string (nullable = true)
 |-- Year: long (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Event: string (nullable = true)
 |-- Medal: string (nullable = true)



Print the first 5 rows:

In [7]:
athlete_events.head(5)

                                                                                

[Row(ID=167, Name='Ould Lamine Abdallah', Sex='M', Age=None, Height=None, Weight=None, Team='France', NOC='FRA', Games='1952 Summer', Year=1952, Season='Summer', City='Helsinki', Sport='Athletics', Event="Athletics Men's 10,000 metres", Medal=None),
 Row(ID=181, Name='Sara Abdel Gawad', Sex='F', Age=18, Height=153, Weight=46, Team='Egypt', NOC='EGY', Games='2000 Summer', Year=2000, Season='Summer', City='Sydney', Sport='Synchronized Swimming', Event="Synchronized Swimming Women's Duet", Medal=None),
 Row(ID=317, Name='Roosevelt M. Abdulgafur', Sex='M', Age=24, Height=181, Weight=73, Team='Philippines', NOC='PHI', Games='1968 Summer', Year=1968, Season='Summer', City='Mexico City', Sport='Swimming', Event="Swimming Men's 200 metres Freestyle", Medal=None),
 Row(ID=519, Name='Harold Maurice Abrahams', Sex='M', Age=24, Height=183, Weight=75, Team='Great Britain', NOC='GBR', Games='1924 Summer', Year=1924, Season='Summer', City='Paris', Sport='Athletics', Event="Athletics Men's 200 metres"

In [8]:
athlete_events.columns

['ID',
 'Name',
 'Sex',
 'Age',
 'Height',
 'Weight',
 'Team',
 'NOC',
 'Games',
 'Year',
 'Season',
 'City',
 'Sport',
 'Event',
 'Medal']

We won't use the following columns, let's drop them from the DataFrame in a persistent way:

* ID
* Games
* Event

In [23]:
# Creating a copy of the original file as new_athlete_events

new_athlete_events = athlete_events.select('*')
athlete_events = athlete_events.drop("ID","Games","Event")

In [11]:
athlete_events.show(5)



+---+--------------------+---+----+------+------+-------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
| ID|                Name|Sex| Age|Height|Weight|         Team|NOC|      Games|Year|Season|       City|               Sport|               Event|Medal|
+---+--------------------+---+----+------+------+-------------+---+-----------+----+------+-----------+--------------------+--------------------+-----+
|167|Ould Lamine Abdallah|  M|null|  null|  null|       France|FRA|1952 Summer|1952|Summer|   Helsinki|           Athletics|Athletics Men's 1...| null|
|181|    Sara Abdel Gawad|  F|  18|   153|    46|        Egypt|EGY|2000 Summer|2000|Summer|     Sydney|Synchronized Swim...|Synchronized Swim...| null|
|317|Roosevelt M. Abdu...|  M|  24|   181|    73|  Philippines|PHI|1968 Summer|1968|Summer|Mexico City|            Swimming|Swimming Men's 20...| null|
|519|Harold Maurice Ab...|  M|  24|   183|    75|Great Britain|GBR|1924 Summer|1924|Summ

                                                                                

 Now load noc_regions.csv. Call this DataFrame `noc_regions`:

In [12]:
regionSchema = StructType([
  StructField("NOC", StringType(), True),
  StructField("Region", StringType(), True),
  StructField("Notes", StringType(), True)
])

noc_regions = spark.read.format("csv")\
  .schema(regionSchema)\
  .option("header", "true")\
  .option("nullValue", "NA")\
  .load(data + "noc_regions.csv")


In [13]:
noc_regions.show(8)

+---+-----------+--------------------+
|NOC|     Region|               Notes|
+---+-----------+--------------------+
|AFG|Afghanistan|                null|
|AHO|    Curacao|Netherlands Antilles|
|ALB|    Albania|                null|
|ALG|    Algeria|                null|
|AND|    Andorra|                null|
|ANG|     Angola|                null|
|ANT|    Antigua| Antigua and Barbuda|
|ANZ|  Australia|         Australasia|
+---+-----------+--------------------+
only showing top 8 rows



### Caching

Since we will be using these two DataFrames a lot in this notebook let's `cache()` them to speed up our execution. Caching allows the DataFrame to be loaded and persist in the memory. If we don't use this option, every time we execute an action our DataFrame gets loaded from our Cloud Storage, which is not ideal and will add to our execution time:

**Note:** Caching is a lazy transformation. It will happen the first time you execute an action against the DataFrame, not when you cache that DataFrame.

In [14]:
athlete_events.cache()

DataFrame[ID: bigint, Name: string, Sex: string, Age: bigint, Height: bigint, Weight: bigint, Team: string, NOC: string, Games: string, Year: bigint, Season: string, City: string, Sport: string, Event: string, Medal: string]

In [15]:
noc_regions.cache()

DataFrame[NOC: string, Region: string, Notes: string]

## Question 1

What is the minimum and maximum `year`?

In [14]:
#Select the year column and calling min and max over the year.

from pyspark.sql.functions import min, max
athlete_events.select(min("Year"), max("Year")).show()



+---------+---------+
|min(Year)|max(Year)|
+---------+---------+
|     1896|     2016|
+---------+---------+



                                                                                

## Question 2

Is the following statement True or False?

> Averag age of female athletes who attended the olympic games after 1990 has raised when compared to the era before then.

In [16]:
from pyspark.sql.functions import avg, col

athlete_events.where(athlete_events["Sex"] == "F").where("Year < 1990")\
  .select(avg(col("Age")).alias("avg_female_age_before_1990")).show()

athlete_events.where(athlete_events["Sex"] == "F").where("Year >= 1990")\
  .select(avg(col("Age")).alias("avg_female_age_after_1990")).show()

                                                                                

+--------------------------+
|avg_female_age_before_1990|
+--------------------------+
|         21.92261478475372|
+--------------------------+





+-------------------------+
|avg_female_age_after_1990|
+-------------------------+
|       24.619499568593614|
+-------------------------+



                                                                                

In [3]:
True


## Question 3

How many Gold medals were given to men from 1970 to 2000 (including both years)?

In [18]:
from pyspark.sql.functions import sum, count


#athlete_events\
#    .where(athlete_events["sex"] == "M")\
#    .where(col("year") >= 1970)\
#    .where(col("year") <= 2000)\
#    .where(col("medal") == "Gold")\
#    .count()

athlete_events\
    .where(athlete_events["Year"].between(1970,2000))\
    .where(col("Medal") == "Gold")\
    .where(athlete_events["Sex"] == "M")\
    .count()

                                                                                

3186

## Question 4

How many NOCs attended Summer Olympics 2016 in Rio de Janeiro?

NOC stands for National Olympic Committee. Almost equivalent to a country.

In [19]:
# Your answer goes here

# Your answer goes here
#athlete_events\
#    .where(col("year") == 2016)\
#    .where(col("Season") == "Summer")\
#    .select("NOC").distinct().count()

athlete_events\
    .where(col("Games") == "2016 Summer")\
    .select("NOC").distinct().count()

                                                                                

207

## Question 5

Create two DataFrames, one for the Winter games and one for the Summer games; these DataFrames should include a list of all NOCs that have wone gold medals in the colympics, and their count. Sort these DataFrame by the count in a descending order. Call these DataFrames `winter_gold_count` and `summer_gold_count` respectively. Using these two, answer the following questions:

Which country has the highest gold medal count in the Winter Olympics? How about the Summer Olympics?

In [None]:
# Here a bit of preprocessing of the data is required before we go to actual data.
# In olympic games, for team events when a country wins a gold medal, 'ALL' atheletes get gold medal.
# The database shows that all atheletes get a gold medal for example in 4x100m relay race.
# In this case, the country they represent gets a SINGLE gold, we need to remove duplicate
# rows where country, team, sports, season and year are same so that there is single row 
# representing the country gold.

In [24]:
from pyspark.sql.functions import regexp_replace

country_medals = new_athlete_events\
    .drop_duplicates(subset= ['Team','NOC','Year','Season','City','Sport','Event','Medal'])

In [25]:
print(country_medals.count())

[Stage 60:>                                                         (0 + 2) / 2]

125152


                                                                                

In [27]:
# Your answer goes here
from pyspark.sql.functions import desc, col, expr, desc

summer_gold_count = athlete_events.select("NOC")\
    .where(col("Season") == "Summer")\
    .where(col("Medal") == "Gold")

winter_gold_count = athlete_events\
    .select("NOC").where(col("Season") == "Winter")\
    .where(col("Medal") == "Gold")

print("Count of Gold Medals for top 5 NOC's in SUMMER Olympics:")
summer_gold_count\
    .groupby("NOC")\
    .count().alias("Count")\
    .sort(col("Count").desc())\
    .show(5)

print("Count of Gold Medals for top 5 NOC's in WINTER Olympics:")
winter_gold_count\
    .groupby("NOC")\
    .count().alias("Winter_Gold_Count")\
    .sort(col("Winter_Gold_Count.count").desc())\
    .show(5)

Count of Gold Medals for top 5 NOC's in SUMMER Olympics:


                                                                                

+---+-----+
|NOC|count|
+---+-----+
|USA| 2376|
|URS|  832|
|GBR|  634|
|GER|  591|
|ITA|  518|
+---+-----+
only showing top 5 rows

Count of Gold Medals for top 5 NOC's in WINTER Olympics:




+---+-----+
|NOC|count|
+---+-----+
|CAN|  305|
|URS|  249|
|USA|  159|
|GER|  153|
|NOR|  151|
+---+-----+
only showing top 5 rows



                                                                                

In [28]:
# ALTERNATE CODE

# Ideally speaking Canada is NOT country with highest gold medal in winter olympics
# Because ice-hockey can have like on average 12-16 players and Canada has won it almost 12 times.
# This makes Canada's count to swell abnormally where it should have just like 12 gold medals.
# Wiki too shows that NORWAY has won highest gold medals in Winter olympics.
# https://en.wikipedia.org/wiki/Winter_Olympic_Games

new_summer_gold_count = country_medals.select("NOC").where(col("Season") == "Summer").where(col("Medal") == "Gold")

new_winter_gold_count = country_medals.select("NOC").where(col("Season") == "Winter").where(col("Medal") == "Gold")

new_summer_gold_count\
    .groupby("NOC")\
    .count().alias("Count")\
    .sort(col("Count").desc())\
    .show(5)

new_winter_gold_count\
    .groupby("NOC")\
    .count().alias("Winter_Gold_Count")\
    .sort(col("Winter_Gold_Count.count").desc())\
    .show(5)



+---+-----+
|NOC|count|
+---+-----+
|USA| 1001|
|URS|  394|
|GBR|  277|
|GER|  234|
|FRA|  234|
+---+-----+
only showing top 5 rows





+---+-----+
|NOC|count|
+---+-----+
|NOR|  111|
|USA|   95|
|GER|   86|
|URS|   77|
|CAN|   62|
+---+-----+
only showing top 5 rows



                                                                                

## Question 6

Using the common field `NOC`, merge `summer_gold_count` and `noc_regions` DataFrames.

Which region takes the 10th place? This is based on the number of gold medals in all of the Summer Olympics in our dataset.

In [29]:
# Your answer goes here

print("Based on NOCs:")
summer_gold_count\
    .join(noc_regions, on="NOC")\
    .groupby("Region")\
    .count().alias("Count")\
    .sort(col("Count").desc())\
    .show(10)

Based on NOCs:




+---------+-----+
|   Region|count|
+---------+-----+
|      USA| 2376|
|   Russia| 1220|
|  Germany| 1074|
|       UK|  634|
|    Italy|  518|
|   France|  463|
|  Hungary|  432|
|Australia|  362|
|   Sweden|  352|
|    China|  335|
+---------+-----+
only showing top 10 rows



                                                                                

In [31]:

print("Based on unique game EVENTs and NOCs:")
new_summer_gold_count\
    .join(noc_regions, on="NOC")\
    .groupby("Region")\
    .count().alias("Count")\
    .sort(col("Count").desc())\
    .show(20)

Based on unique game EVENTs and NOCs:




+--------------+-----+
|        Region|count|
+--------------+-----+
|           USA| 1001|
|        Russia|  592|
|       Germany|  443|
|            UK|  277|
|        France|  234|
|         China|  228|
|         Italy|  219|
|       Hungary|  178|
|     Australia|  150|
|        Sweden|  149|
|         Japan|  142|
|       Finland|  104|
|   South Korea|   90|
|       Romania|   88|
|   Netherlands|   85|
|          Cuba|   77|
|        Poland|   69|
|        Canada|   64|
|Czech Republic|   64|
|        Norway|   59|
+--------------+-----+
only showing top 20 rows



                                                                                

In [32]:
# NOTE TO PROFESSOR:

# Database has entries with Name containing double-quotes and commas which skews off the entries in column.
# This will kind of skew off the number of entries for number of gold medals after top-5.
# 
athlete_events\
    .select("Medal").distinct().show(20)

print("Incorrect entries:",athlete_events\
    .select("Medal").distinct().count())

                                                                                

+--------------------+
|               Medal|
+--------------------+
|Figure Skating Mi...|
|                null|
|Swimming Men's 4 ...|
|Athletics Men's 8...|
|              Silver|
|Swimming Women's ...|
|                Gold|
|Gymnastics Men's ...|
|Fencing Men's Foi...|
|              Bronze|
|Cross Country Ski...|
|Wrestling Men's H...|
|Sailing Mixed 6 m...|
|Athletics Women's...|
|Fencing Men's Sab...|
|Sailing Mixed Thr...|
|   Rugby Men's Rugby|
|Athletics Women's...|
|Water Polo Men's ...|
|Boxing Men's Welt...|
+--------------------+
only showing top 20 rows





Incorrect entries: 220


                                                                                