*Initialize spark and define a config*

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder \
    .appName("spark") \
    .getOrCreate()

config = {
    "datasets_path": "/content/drive/MyDrive/Colab Notebooks/data-engineer-test/datasets/",
    "olympics_output_path": "/content/drive/MyDrive/Colab Notebooks/data-engineer-test/datasets/output/olympics",
    "countries_output_path": "/content/drive/MyDrive/Colab Notebooks/data-engineer-test/datasets/output/countries",
    "region_codes_path": "/content/drive/MyDrive/Colab Notebooks/data-engineer-test/datasets/country_codes",
    "olympics_w_country_data_output_path": "/content/drive/MyDrive/Colab Notebooks/data-engineer-test/datasets/output/olympics_w_country_data"
}

*Start to "Olympics" task*

In [2]:
# define some helper functions for below

def get_location_and_year(file_name: str):
  """
  Returns the location & year parsed out of the received file name
  Expects the first 2 values to be location and year separated by either a space or _
  """
  file_name = file_name.replace('_', ' ')
  split_name = file_name.strip().split(' ')
  return split_name[0], int(split_name[1])

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import os

files = os.listdir(config['datasets_path'] + 'olympics')

# Define the schema
output_schema = StructType([
    StructField("Location", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Gold", IntegerType(), True),
    StructField("Silver", IntegerType(), True),
    StructField("Bronze", IntegerType(), True),
    StructField("Total", IntegerType(), True),
    StructField("Nation", StringType(), True)
])

final_df = spark.createDataFrame([], schema=output_schema)

In [4]:
for file_name in files:
  print(f'Processing {file_name}')
  location, year = get_location_and_year(file_name)

  df = spark.read.csv(config['datasets_path'] + 'olympics/' + file_name, header=True)
  df = (df
        .withColumn("Location", f.lit(location))
        .withColumn("Year", f.lit(year))
        .withColumn("Gold", f.col("Gold").cast(IntegerType()))
        .withColumn("Silver", f.col("Silver").cast(IntegerType()))
        .withColumn("Bronze", f.col("Bronze").cast(IntegerType()))
        .withColumn("Total", f.col("Total").cast(IntegerType()))
        .selectExpr("Location", "Year", "Gold", "Silver", "Bronze", "Total", "NOC as Nation")
        )
  final_df = final_df.unionByName(df)

Processing Tokyo 2020 Olympics Nations Medals.csv
Processing Torino 2006 Olympics Nations Medals.csv
Processing Athens 2004 Olympics Nations Medals.csv
Processing Vancouver 2010 Olympics Nations Medals.csv
Processing PyeongChang 2018 Olympics Nations Medals.csv
Processing Rio 2016 Olympics Nations Medals.csv
Processing London 2012 Olympics Nations Medals.csv
Processing Lillehammer 1994 Olympics Nations Medals.csv
Processing Paris 2024 Olympics_Nations Medals.csv
Processing Nagano 1998 Olympics Nations Medals.csv
Processing beijing_2022_Olympics_Nations_Medals.csv
Processing Sydney 2000 Olympics Nations Medals.csv
Processing Sochi 2014 Olympics Nations Medals.csv
Processing SaltLakeCity 2002 Olympics Nations Medals.csv
Processing Atlanta 1996 Olympics Nations Medals.csv


In [5]:
final_df.write.parquet(config['olympics_output_path'], mode='overwrite')

In [6]:
# TESTS
# Would typically have this in a separate file broken out into a python package, but for the sake of not setting up infrastruture for a quick coding test, just throwing some simple tests here

def test_get_location_and_year():
  location, year = get_location_and_year('Kentucky 2024 more words_897')
  assert location == 'Kentucky'
  assert year == 2024

  location, year = get_location_and_year('NewMexico_2025_moreasdlkjfwords_897')
  assert location == 'NewMexico'
  assert year == 2025


test_get_location_and_year()

*End to "Olympics" task*

*Start to "Countries" task*

In [7]:
countries_schema = StructType([
    StructField("Country", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Population", StringType(), True),
    StructField("Area (sq. mi.)", StringType(), True),
    StructField("Pop. Density (per sq. mi.)", StringType(), True),
    StructField("Coastline (coast/area ratio)", StringType(), True),
    StructField("Net migration", StringType(), True),
    StructField("Infant mortality (per 1000 births)", StringType(), True),
    StructField("GDP ($ per capita)", StringType(), True),
    StructField("Literacy (%)", StringType(), True),
    StructField("Phones (per 1000)", StringType(), True),
    StructField("Arable (%)", StringType(), True),
    StructField("Crops (%)", StringType(), True),
    StructField("Other (%)", StringType(), True),
    StructField("Climate", StringType(), True),
    StructField("Birthrate", StringType(), True),
    StructField("Deathrate", StringType(), True),
    StructField("Agriculture", StringType(), True),
    StructField("Industry", StringType(), True),
    StructField("Service", StringType(), True)
])


countries_df = spark.read.csv(config['datasets_path'] + 'countries/countries of the world.csv', header=True, schema=countries_schema);
countries_df.write.parquet(config['countries_output_path'], mode='overwrite')

*End to "Countries" task*

*Start to "Combing Olympics and Countries" task*

In [8]:
olympics_df = spark.read.parquet(config["olympics_output_path"])
countries_df = spark.read.parquet(config["countries_output_path"])

# The only relationship that can be made to olympics would have to do with the "Nation" column created which appears to be abbreviated countries
# To combat this I made an intermediary table that can associate the two tables together better - see "generate_country_codes.py"
region_codes_df = spark.read.parquet(config["region_codes_path"])

# See "Look for missing matches" cell below how I found these values
region_code_replacements = {
    "North Korea": "Korea, North",
    "South Korea": "Korea, South",
    "Great Britain": "United Kingdom",
    "Côte d'Ivoire": "Cote d'Ivoire",
    "Bahamas": "Bahamas, The",
    "Serbia and Montenegro": "Serbia",
    "Trinidad and Tobago": "Trinidad & Tobago",
    "North Macedonia": "Macedonia"

}
region_codes_df = region_codes_df.replace(region_code_replacements, subset=["Country"])

# outliers to be aware of:
# - ROC - Russian Olympic Committee
#     - Depending on usage we may want to make this Russia but I'm going to leave it out for now
# - AIN - Individual Neutral Athletes
#     - Another 'Russia' one, but will be left out for now
# - ROT - I think this is Refugee Olympic Team which won't associate with a country
# - IOA - Independent Olympic Athletes
# - OAR - Another Russia one that won't match up unless specified
# - TPE - Chinese Taipei - excluded for now because I'm not sure if that is considered China or not

# Left join to see data that doesn't connect to a country
olympics_w_country = olympics_df.join(region_codes_df, on="Nation", how="left")

# Countries missing from countries_df thus being ignored (See "Look for missing matches" below):
missing_countries = ["Montenegro", "Yugoslavia", "Kosovo"]
olympics_w_country = olympics_w_country.filter(~f.col("Country").isin(missing_countries))

olympics_w_country.filter(f.col("Country").isNull()).show()

# Verify every nation is found
print(f'{olympics_w_country.filter(f.col("Country").isNull()).select("Nation").distinct().count()} nations who medaled could not be mapped to a country')

+------+--------+----+----+------+------+-----+-------+
|Nation|Location|Year|Gold|Silver|Bronze|Total|Country|
+------+--------+----+----+------+------+-----+-------+
+------+--------+----+----+------+------+-----+-------+

0 nations who medaled could not be mapped to a country


In [9]:
# join it with countries
ignore_nations = ["ROC", "AIN", "OAR", "ROT", "IOA", "TPE"]

# left join to only keep countries with olympic data
olympics_w_country_data = (olympics_w_country
                           .filter(~f.col("Nation").isin(ignore_nations))
                           .withColumn("Country", f.trim(f.col("Country")))
  .join(countries_df.withColumn("Country", f.trim(f.col("Country"))), on="Country", how="left")
)

In [10]:
# Look for missing matches
olympics_w_country_data.filter(f.col("Population").isNull()).show()

missing_nations = [row.Country for row in olympics_w_country_data.filter(f.col("Population").isNull()).select("Country").distinct().collect()]

print(f'{len(missing_nations)} nations  could not map back to a country: {missing_nations}')

# Used this line to search for countries that appeared missing at first but just had a different name in countries_df
countries_df.filter(f.col("Country").like("%Bah%")).show()

+-------+------+--------+----+----+------+------+-----+------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------+-------+---------+---------+-----------+--------+-------+
|Country|Nation|Location|Year|Gold|Silver|Bronze|Total|Region|Population|Area (sq. mi.)|Pop. Density (per sq. mi.)|Coastline (coast/area ratio)|Net migration|Infant mortality (per 1000 births)|GDP ($ per capita)|Literacy (%)|Phones (per 1000)|Arable (%)|Crops (%)|Other (%)|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|
+-------+------+--------+----+----+------+------+-----+------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------+-------+---------+---------+-----------+--------+-------+
+---

In [11]:
# Now we're in a state with as many matches as possible, so write out the data asset
olympics_w_country_data.write.mode('overwrite').parquet(config['olympics_w_country_data_output_path'])

*End combing olympics and countries task*

In [12]:
# just some messing around with the final output for potential use cases
df = spark.read.parquet(config['olympics_w_country_data_output_path'])
df.show()

print("Most medals won in single year by a nation")
df.orderBy(f.col("Total").desc()).show()

print("\nMost medals won by a single nation in 2024")
df.filter(f.col("Year") == "2024").orderBy(f.col("Total").desc()).show()

print("\nMost medals won all time")
df.groupBy("Country").agg(f.sum(f.col("Total")).alias("Total")).orderBy(f.col("Total").desc()).show()

print("\nMost medals won all time based on most recent population")
(df.groupBy("Country").agg(
    f.sum(f.col("Total")).alias("Total"),
    f.max("Population").alias("Max Population"))
.withColumn("Medals / population", f.col("Total") / f.col("Max Population"))
.orderBy(f.col("Medals / population").desc()).show()
)

+--------------+------+--------+----+----+------+------+-----+--------------------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------+-------+---------+---------+-----------+--------+-------+
|       Country|Nation|Location|Year|Gold|Silver|Bronze|Total|              Region|Population|Area (sq. mi.)|Pop. Density (per sq. mi.)|Coastline (coast/area ratio)|Net migration|Infant mortality (per 1000 births)|GDP ($ per capita)|Literacy (%)|Phones (per 1000)|Arable (%)|Crops (%)|Other (%)|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|
+--------------+------+--------+----+----+------+------+-----+--------------------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------