## Project: Olympics Data Analysis

### Requirements

- python=3.10 # Specify Python version
- pyspark # PySpark for data processing
- jupyterlab # JupyterLab for interactive development
- ipykernel # Kernel for Jupyter
- pandas # Data manipulation
- numpy # Numerical operations
- matplotlib # Data visualization
- seaborn # Statistical data visualization
- oracledb # Oracle Database client
- sqlalchemy # For Oracle DB connectivity
- pip # Ensure pip is available

### Part 1: Parsing and inserting data

In [1]:
# Importing libraries
import pandas as pd
import cx_Oracle as oracledb

In [2]:
# Loading data
df = pd.read_csv("./olympics_1896_2024.csv", encoding="latin1")

In [3]:
# Checking for specific instances of incorrect encoding and fixing them.
# Replace common misencoded characters with their correct representations.
df.replace({
    r'TÃ¼rkiye': 'Türkiye',
    r'Ã©': 'é',
    r'Ã¶': 'ö',
    r'Ã¼': 'ü',
    r'Ã§': 'ç',
    r'ÃŸ': 'ß',
    r'Ã¡': 'á',
    r'Ã­': 'í',
    r'Ã³': 'ó',
    r'Ãº': 'ú',
    r'Ã±': 'ñ',
    r'Ã€': 'À',
    r'Ãƒ': 'Ã',
    r'Ã•': 'Õ',
    r'â€“': '–',
    r'â€œ': '“',
    r'â€\x9d': '”',
    r'â€\x9c': '“',
    r'â€™': '’',
    r'â€˜': '‘',
}, regex=True, inplace=True)

In [4]:
# Drop rows with missing values
df.dropna(inplace=True)

# Remove rows where 'name' column contains '#NAME?'
df = df[df['name'] != '#NAME?']

In [5]:
# Ensure data types match your database schema
df['discipline'] = df['discipline'].astype(str)
df['event'] = df['event'].astype(str)
df['event_gender'] = df['event_gender'].astype(str)
df['event_type'] = df['event_type'].astype(str)
df['medal_type'] = df['medal_type'].astype(str)
df['name'] = df['name'].astype(str)
df['country_code'] = df['country_code'].astype(str)
df['country'] = df['country'].astype(str)
df['city'] = df['city'].astype(str)
df['event_year'] = df['event_year'].astype(int)  # Ensure this remains numeric

In [6]:
# Oracle DB Connection
connection = oracledb.connect(
    user="system",
    password="sam22",
    dsn="localhost:1521/FREE",
    encoding="UTF-8",
    nencoding="UTF-8"
    # mode=oracledb.SYSDBA
)

In [7]:
cursor = connection.cursor()

cursor.execute("DROP TABLE IF EXISTS OLYMPICS_1896_2024")

# Create Table
cursor.execute("""CREATE TABLE OLYMPICS_1896_2024 (
    discipline VARCHAR2(255),
    event VARCHAR2(200),
    event_gender VARCHAR2(50),
    event_type VARCHAR2(50),
    medal_type VARCHAR2(50),
    name VARCHAR2(200),
    country_code VARCHAR2(10),
    country VARCHAR2(100),
    city VARCHAR2(100),
    event_year NUMBER
)""")

cursor.execute("TRUNCATE TABLE OLYMPICS_1896_2024")

In [8]:
# Insert Data into the Table
for _, row in df.iterrows():
    cursor.execute("""
        INSERT INTO OLYMPICS_1896_2024 (
            discipline, event, event_gender, event_type, medal_type,
            name, country_code, country, city, event_year
        ) VALUES (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10)
    """, (
        row['discipline'], row['event'], row['event_gender'], row['event_type'], row['medal_type'],
        row['name'], row['country_code'], row['country'], row['city'], row['event_year']
    ))

# Commit the transaction
connection.commit()

# Close cursor and connection
cursor.close()
connection.close()

print("Data inserted successfully!")

Data inserted successfully!


### Part 2: Retrieving data from the database and creating visualizations

In [None]:
# Importing libraries
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
import os

In [3]:
# Initializing the pyspark session
spark = SparkSession.builder.appName("OlympicsData") \
    .config("spark.jars", "./ojdbc17.jar") \
    .getOrCreate()

In [4]:
# Creating the connection to the Oracle database and reading data from the OLYMPICS_1896_2024 table
url = "jdbc:oracle:thin:@localhost:1521/FREE"
properties = {
    "user": "system",
    "password": "sam22",
    "driver": "oracle.jdbc.driver.OracleDriver"
}

# Reading data from the OLYMPICS_1896_2024 table
df_db = spark.read.jdbc(url=url, table="OLYMPICS_1896_2024",
                        properties=properties)

df_db = df_db.withColumn("EVENT_YEAR", df_db["EVENT_YEAR"].cast("integer"))
df_db.printSchema()

root
 |-- DISCIPLINE: string (nullable = true)
 |-- EVENT: string (nullable = true)
 |-- EVENT_GENDER: string (nullable = true)
 |-- EVENT_TYPE: string (nullable = true)
 |-- MEDAL_TYPE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- COUNTRY_CODE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- EVENT_YEAR: integer (nullable = true)



## Functions to prepare visualization 

In [None]:
def medal_distribution_by_country():
    """
    Calculates the total medals won by each country using PySpark.
    """
    try:
        result = df_db.groupBy("country") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .orderBy(F.desc("total_medals")) \
            .limit(20)

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "MEDAL_DISTRIBUTION_BY_COUNTRY") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
        f"{output_folder}/medal_distribution_by_top20_country.csv")

        print(f"CSV saved at: {output_folder}/medal_distribution_by_top20_country.csv")

        

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [19]:
medal_distribution_by_country()

Unnamed: 0,country,total_medals
0,United States of America,2946
1,Soviet Union,1077
2,Germany,1036
3,Great Britain,974
4,France,933
5,People's Republic of China,902
6,Italy,706
7,Australia,618
8,Japan,590
9,Sweden,571


In [None]:
def yearly_medal_trends():
    """
    Calculates the number of medals awarded each year using PySpark.
    """
    try:
        result = df_db.groupBy("event_year") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .orderBy("event_year")

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "YEARLY_MEDAL_TRENDS") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/yearly_medal_trends.csv")

        print(
            f"CSV saved at: {output_folder}/yearly_medal_trends.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def top_medalists(top_n=10):
    """
    Identifies the top medalists with the most medals using PySpark, 
    ignoring null or NaN values in the name column.
    """
    # Group by "name", aggregate medal counts, and order by total medals
    try:
        result = df_db.groupBy("name") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .orderBy(F.desc("total_medals")) \
            .limit(top_n)

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "TOP_MEDALISTS") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/top_medalists.csv")

        print(
            f"CSV saved at: {output_folder}/top_medalists.csv")

    except Exception as e:
        print(f"An error occurred: {e}")

    return result.toPandas()

In [None]:
def medal_distribution_by_gender():
    """
    Calculates the proportion of medals won by gender using PySpark.
    """
    try:
        result = df_db.groupBy("event_gender") \
            .agg(F.count("medal_type").alias("total_medals"))

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "MEDAL_DISTRIBUTION_BY_GENDER") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/medal_distribution_by_gender.csv")

        print(
            f"CSV saved at: {output_folder}/medal_distribution_by_gender.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def medals_by_host_city():
    """
    Calculates the total medals awarded in each host city using PySpark.
    """
    try:
        result = df_db.groupBy("city") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .orderBy(F.desc("total_medals")) \
            .limit(20)

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "MEDALS_BY_HOST_CITY") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/medals_by_host_city.csv")

        print(
            f"CSV saved at: {output_folder}/medals_by_host_city.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [1]:
def medals_by_country_map():
    """
    Calculates the total medals won by each country for visualization on a map using PySpark.
    """
    try:
        result = df_db.groupBy("country_code") \
            .agg(F.count("medal_type").alias("total_medals"))

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "MEDALS_BY_COUNTRY_MAP") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/medals_by_country_map.csv")

        print(
            f"CSV saved at: {output_folder}/medals_by_country_map.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [5]:
medals_by_country_map()

CSV saved at: olympics_csv_output/medals_by_country_map.csv


Unnamed: 0,country_code,total_medals
0,POL,318
1,JAM,72
2,BUR,1
3,BRA,202
4,ARM,22
...,...,...
153,INA,48
154,KGZ,13
155,CZE,112
156,EGY,39


In [None]:
def medals_by_discipline():
    """
    Calculates the total medals awarded in each discipline using PySpark.
    """
    try:
        result = df_db.groupBy("discipline") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .orderBy(F.desc("total_medals")) \
            .limit(20)

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "MEDALS_BY_DISCIPLINE") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/medals_by_discipline.csv")

        print(
            f"CSV saved at: {output_folder}/medals_by_discipline.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def event_popularity():
    """
    Identifies the most popular events by participation using PySpark.
    """
    try:
        result = df_db.groupBy("event") \
            .agg(F.count("medal_type").alias("total_participants")) \
            .orderBy(F.desc("total_participants")) \
            .limit(20)

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "EVENT_POPULARITY") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/event_popularity.csv")

        print(
            f"CSV saved at: {output_folder}/event_popularity.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def medal_type_proportions():
    """
    Calculates the proportion of each medal type using PySpark.
    """
    try:
        result = df_db.groupBy("medal_type") \
            .agg(F.count("medal_type").alias("total_medals"))

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "MEDAL_TYPE_PROPORTIONS") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/medal_type_proportions.csv")

        print(
            f"CSV saved at: {output_folder}/medal_type_proportions.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def medal_type_trends():
    """
    Tracks changes in medal distribution trends by type over time using PySpark.
    """
    try:
        result = df_db.groupBy("event_year", "medal_type") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .orderBy("event_year", "medal_type")

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "MEDAL_TYPE_TRENDS") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/medal_type_trends.csv")

        print(
            f"CSV saved at: {output_folder}/medal_type_trends.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def top_countries_by_athlete_medals():
    """
    Identifies countries with the most unique medalist athletes using PySpark.
    """
    try:
        result = df_db.groupBy("country") \
            .agg(F.countDistinct("name").alias("unique_athletes")) \
            .orderBy(F.desc("unique_athletes")) \
            .limit(20)

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "TOP_COUNTRIES_BY_ATHLETE_MEDALS") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/top_countries_by_athelete_medals.csv")

        print(
            f"CSV saved at: {output_folder}/top_countries_by_athelete_medals.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def repeat_medalists():
    """
    Identifies athletes who have won multiple medals using PySpark.
    """
    try:
        result = df_db.groupBy("name") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .filter("total_medals > 1") \
            .orderBy(F.desc("total_medals")) \
            .limit(10)

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "REPEAT_MEDALISTS") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/repeat_medalists.csv")

        print(
            f"CSV saved at: {output_folder}/repeat_medalists.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def country_performance_over_time():
    """
    Tracks medal performance of top countries over time using PySpark.
    """
    try:
        top_countries = [row["country"] for row in df_db.groupBy(
            "country").count().orderBy(F.desc("count")).limit(10).collect()]
        result = df_db.filter(F.col("country").isin(top_countries)) \
            .groupBy("event_year", "country") \
            .agg(F.count("medal_type").alias("total_medals")) \
            .orderBy("event_year")

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "COUNTRY_PERFORMANCE_OVER_TIME") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/country_performance_over_time.csv")

        print(
            f"CSV saved at: {output_folder}/country_performance_over_time.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def gender_based_event_trends():
    """
    Tracks changes in the number of events for each gender over time using PySpark.
    """
    try:
        result = df_db.groupBy("event_year", "event_gender") \
            .agg(F.count("event").alias("total_events")) \
            .orderBy("event_year")

        # Write the DataFrame to Oracle database
        result.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "GENDER_BASED_EVENT_TRENDS") \
            .options(**properties) \
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        result.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/gender_based_event_trends.csv")

        print(
            f"CSV saved at: {output_folder}/gender_based_event_trends.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return result.toPandas()

In [None]:
def host_country_advantage():
    """
    Analyzes medals won by host countries during their hosted games using PySpark.
    """
    try:
        host_medals = df_db.groupBy("city", "country") \
            .agg(F.count("medal_type").alias("total_medals"))

        # Write the DataFrame to Oracle database
        host_medals.write \
            .format("jdbc") \
            .option("url", url) \
            .option("dbtable", "HOST_COUNTRY_ADVANTAGE") \
            .options(**properties)\
            .mode("overwrite") \
            .save()
        
        output_folder = "olympics_csv_output"
        os.makedirs(output_folder, exist_ok=True)

        host_medals.write.mode("overwrite").option("header", True).csv(
            f"{output_folder}/host_country_advantage.csv")

        print(
            f"CSV saved at: {output_folder}/host_country_advantage.csv")

    except Exception as e:
        print(f"An error occurred: {e}")
    return host_medals.toPandas()