In [0]:
client_id = dbutils.secrets.get(scope="scopeClientID", key="secretClientID")
secret_key = dbutils.secrets.get(scope="scopeSecretKey", key="secretSecretKey")
tenant_id = dbutils.secrets.get(scope="scopeTenantID", key="secretTenantID")

configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": secret_key,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id.strip()}/oauth2/token"
}

source = "abfss://tokyo-olympic-data@tokyoolympicdata1sanjana.dfs.core.windows.net/"
mount_point = "/mnt/tokyoolympic"

dbutils.fs.mount(
    source=source,
    mount_point=mount_point,
    extra_configs=configs
)

In [0]:
%fs
ls mnt/tokyoolympic

In [0]:
%fs
ls mnt/tokyoolympic/raw_data

## Tokyo Olmpics Transformation
**"Athletes Participation" Transformations**

In [0]:

from pyspark.sql.functions import countDistinct

# Load the athletes table from the mounted Parquet path
athletes_path = "/mnt/tokyoolympic/raw_data/athletes.csv"
athletes_df = spark.read.format("csv").option("Header", True).load(athletes_path)

# Transformation 1: Count athletes grouped by NOC and Discipline
athletes_summary = athletes_df.groupBy("Country", "Discipline").agg(
    countDistinct("PersonName").alias("TotalAthletes")
)

# Transformation 2: Get distinct athletes with their NOC and Discipline
distinct_athletes = athletes_df.select("PersonName", "Country", "Discipline").distinct()

# Save the transformed data back to ADLS
athletes_summary_path = "/mnt/tokyoolympic/transformed_data/athletes_summary"
distinct_athletes_path = "/mnt/tokyoolympic/transformed_data/distinct_athletes"

athletes_summary.write.format("parquet").mode("overwrite").save(athletes_summary_path)
distinct_athletes.write.format("parquet").mode("overwrite").save(distinct_athletes_path)

# Display the results
athletes_summary.show()
distinct_athletes.show()

**"Medal Overview" Transformation**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, rank
from pyspark.sql.window import Window

# Create Spark session (if not already created)
spark = SparkSession.builder.appName("TokyoOlympics").getOrCreate()

# Load the medals table from the mounted Parquet path
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"
medals_df = spark.read.format("csv").option("header",True).load(medals_path)
medals_df.show()
# Transformation 1: Aggregate medal counts by TeamNOC
medals_summary = medals_df.groupBy("TeamCountry").agg(
    _sum("Gold").alias("TotalGold"),
    _sum("Silver").alias("TotalSilver"),
    _sum("Bronze").alias("TotalBronze"),
    _sum("Total").alias("TotalMedals")
)

# Transformation 2: Rank countries by Total Medals and Gold Medals
windowSpecTotal = Window.orderBy(col("TotalMedals").desc())
windowSpecGold = Window.orderBy(col("TotalGold").desc())

ranked_medals = medals_summary.withColumn("RankByTotalMedals", rank().over(windowSpecTotal)) \
                               .withColumn("RankByGoldMedals", rank().over(windowSpecGold))

# Save the transformed data back to a Parquet file in ADLS
output_path = "/mnt/tokyoolympic/transformed_data/medals_summary"
ranked_medals.write.format("parquet").mode("overwrite").save(output_path)

# Display the final DataFrame
ranked_medals.show()


**"Gender Distribution" Transformation**

In [0]:

from pyspark.sql.functions import col, expr

# Load the entriesgender table from the mounted Parquet path
entriesgender_path = "/mnt/tokyoolympic/raw_data/entriesgender.csv"
entriesgender_df = spark.read.format("csv").option("header",True).load(entriesgender_path)

# Transformation 1: Aggregate gender-wise participation by Discipline
gender_summary = entriesgender_df.select("Discipline",
    col("Female").cast("int").alias("FemaleParticipants"),
    col("Male").cast("int").alias("MaleParticipants"),
    col("Total").cast("int").alias("TotalParticipants")
)

# Transformation 2: Calculate gender ratio (Female and Male percentage)
gender_ratio = gender_summary.withColumn(
    "FemalePercentage", expr("ROUND((FemaleParticipants / TotalParticipants) * 100, 2)")
).withColumn(
    "MalePercentage", expr("ROUND((MaleParticipants / TotalParticipants) * 100, 2)")
)

# Save the transformed data back to ADLS
gender_summary_path = "/mnt/tokyo-olympics-data/transformed_data/gender_summary"

gender_ratio.write.format("parquet").mode("overwrite").save(gender_summary_path)

# Display the results
gender_ratio.show()

**"Event Insights" Transformation**

In [0]:

from pyspark.sql.functions import countDistinct

# Load the teams table from the mounted Parquet path
teams_path = "/mnt/tokyoolympic/raw_data/teams.csv"
teams_df = spark.read.format("csv").option("header",True).load(teams_path)

# Transformation 1: Count unique events grouped by NOC and Discipline
events_summary = teams_df.groupBy("Country", "Discipline").agg(
    countDistinct("Event").alias("UniqueEventCount")
)

# Transformation 2: Get a distinct list of events for detailed analysis
distinct_events = teams_df.select("Event", "Country", "Discipline").distinct()

# Save the transformed data back to ADLS
events_summary_path = "/mnt/tokyoolympic/transformed_data/events_summary"
distinct_events_path = "/mnt/tokyoolympic/transformed_data/distinct_events"

events_summary.write.format("parquet").mode("overwrite").save(events_summary_path)
distinct_events.write.format("parquet").mode("overwrite").save(distinct_events_path)

# Display the results
events_summary.show(truncate=False)
distinct_events.show(truncate=False)

**"Top Countries and Rankings" Transformation**

In [0]:

from pyspark.sql.functions import col, expr, rank
from pyspark.sql.window import Window

# Load the medals table from the mounted Parquet path
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"
medals_df = spark.read.format("csv").option("header",True).load(medals_path)

# Transformation 1: Add percentages for Gold, Silver, and Bronze medals
medals_with_percentages = medals_df.withColumn(
    "GoldPercentage", expr("ROUND((Gold / Total) * 100, 2)")
).withColumn(
    "SilverPercentage", expr("ROUND((Silver / Total) * 100, 2)")
).withColumn(
    "BronzePercentage", expr("ROUND((Bronze / Total) * 100, 2)")
)

# Transformation 2: Rank countries by Total Medals and Gold Medals
windowSpecTotal = Window.orderBy(col("Total").desc())
windowSpecGold = Window.orderBy(col("Gold").desc())

ranked_countries = medals_with_percentages.withColumn(
    "RankByTotal", rank().over(windowSpecTotal)
).withColumn(
    "RankByGold", rank().over(windowSpecGold)
)

# Transformation 3: Filter top 10 countries by total medals
top_10_countries = ranked_countries.filter(col("RankByTotal") <= 10)

# Save the transformed data back to ADLS
ranked_countries_path = "/mnt/tokyoolympic/transformed_data/ranked_countries"
top_10_countries_path = "/mnt/tokyoolympic/transformed_data/top_10_countries"

ranked_countries.write.format("parquet").mode("overwrite").save(ranked_countries_path)
top_10_countries.write.format("parquet").mode("overwrite").save(top_10_countries_path)

# Display the results
ranked_countries.show()
top_10_countries.show()

**"Athlete Highlights" Transformation**

In [0]:

from pyspark.sql.functions import col, count, sum as _sum, rank
from pyspark.sql.window import Window

# Load the athletes and medals tables
athletes_path = "/mnt/tokyoolympic/raw_data/athletes.csv"
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"

athletes_df = spark.read.format("csv").option("header",True).load(athletes_path)
medals_df = spark.read.format("csv").option("header",True).load(medals_path)

# Transformation 1: Join athletes with medals data on NOC
athletes_with_medals = athletes_df.join(
    medals_df,
    athletes_df.Country == medals_df.TeamCountry,
    how="left"
).select(
    col("PersonName"),
    col("Country"),
    col("Discipline"),
    col("Gold"),
    col("Silver"),
    col("Bronze"),
    col("Total")
)

# Transformation 2: Aggregate top-performing athletes by NOC and Discipline
top_athletes = athletes_with_medals.groupBy("Country", "Discipline").agg(
    count("PersonName").alias("TotalAthletes"),
    _sum("Gold").alias("TotalGold"),
    _sum("Silver").alias("TotalSilver"),
    _sum("Bronze").alias("TotalBronze"),
    _sum("Total").alias("TotalMedals")
)

# Transformation 3: Rank disciplines within each NOC by total medals
windowSpec = Window.partitionBy("Country").orderBy(col("TotalMedals").desc())
ranked_athletes = top_athletes.withColumn("RankByMedals", rank().over(windowSpec))

# Save the transformed data back to ADLS
athletes_with_medals_path = "/mnt/tokyoolympic/transformed_data/athletes_with_medals"
ranked_athletes_path = "/mnt/tokyoolympic/transformed_data/ranked_athletes"

athletes_with_medals.write.format("parquet").mode("overwrite").save(athletes_with_medals_path)
ranked_athletes.write.format("parquet").mode("overwrite").save(ranked_athletes_path)

# Display the results
athletes_with_medals.show()
ranked_athletes.show()

**"Discipline Insights" Transformation**

In [0]:
from pyspark.sql.functions import col, sum as _sum, expr, rank
from pyspark.sql.window import Window

# Load the athletes and entriesgender tables
athletes_path = "/mnt/tokyoolympic/raw_data/athletes.csv"
entriesgender_path = "/mnt/tokyoolympic/raw_data/entriesgender.csv"

athletes_df = spark.read.format("csv").option("header",True).load(athletes_path)
entriesgender_df = spark.read.format("csv").option("header",True).load(entriesgender_path)

# Transformation 1: Calculate the total number of athletes per discipline
discipline_athletes = athletes_df.groupBy("Discipline").agg(
    _sum(expr("1")).alias("TotalAthletes")
)

# Transformation 2: Combine with gender distribution data
discipline_insights = discipline_athletes.join(
    entriesgender_df,
    "Discipline",
    how="inner"
).select(
    col("Discipline"),
    col("TotalAthletes"),
    col("Female").cast("int").alias("FemaleParticipants"),
    col("Male").cast("int").alias("MaleParticipants"),
    col("Total").cast("int").alias("TotalParticipants")
).withColumn(
    "FemalePercentage", expr("ROUND((FemaleParticipants / TotalParticipants) * 100, 2)")
).withColumn(
    "MalePercentage", expr("ROUND((MaleParticipants / TotalParticipants) * 100, 2)")
)

# Transformation 3: Rank disciplines by total participation and female percentage
windowSpecTotal = Window.orderBy(col("TotalParticipants").desc())
windowSpecFemale = Window.orderBy(col("FemalePercentage").desc())

ranked_disciplines = discipline_insights.withColumn(
    "RankByTotalParticipants", rank().over(windowSpecTotal)
).withColumn(
    "RankByFemalePercentage", rank().over(windowSpecFemale)
)

# Save the transformed data back to ADLS
discipline_insights_path = "/mnt/tokyoolympic/transformed_data/discipline_insights"
ranked_disciplines_path = "/mnt/tokyoolympic/transformed_data/ranked_disciplines"



**"Country Level Performance Insights" Transformations**

In [0]:
from pyspark.sql.functions import col, count, sum as _sum, expr, rank
from pyspark.sql.window import Window

# Load the athletes and medals tables
athletes_path = "/mnt/tokyoolympic/raw_data/athletes.csv"
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"

athletes_df = spark.read.format("csv").option("header",True).load(athletes_path)
medals_df = spark.read.format("csv").option("header",True).load(medals_path)

# Transformation 1: Calculate total number of athletes per country
country_athletes = athletes_df.groupBy("Country").agg(
    count("PersonName").alias("TotalAthletes")
)

# Transformation 2: Merge medals data with athlete counts
country_performance = medals_df.join(
    country_athletes,
    medals_df.TeamCountry == country_athletes.Country,
    how="inner"
).select(
    col("TeamCountry").alias("Country"),
    col("TotalAthletes"),
    col("Gold"),
    col("Silver"),
    col("Bronze"),
    col("Total").alias("TotalMedals")
)

# Transformation 3: Calculate average medals per athlete
country_performance = country_performance.withColumn(
    "AvgMedalsPerAthlete", expr("ROUND(TotalMedals / TotalAthletes, 2)")
)

# Transformation 4: Rank countries based on total medals and average medals per athlete
windowSpecTotal = Window.orderBy(col("TotalMedals").desc())
windowSpecAvg = Window.orderBy(col("AvgMedalsPerAthlete").desc())

ranked_countries = country_performance.withColumn(
    "RankByTotalMedals", rank().over(windowSpecTotal)
).withColumn(
    "RankByAvgMedalsPerAthlete", rank().over(windowSpecAvg)
)

# Save the transformed data back to ADLS
country_performance_path = "/mnt/tokyoolympic/transformed_data/country_performance"
ranked_countries_path = "/mnt/tokyoolympic/transformed_data/ranked_countries_performance"

country_performance.write.format("parquet").mode("overwrite").save(country_performance_path)
ranked_countries.write.format("parquet").mode("overwrite").save(ranked_countries_path)

# Display the results
country_performance.show()
ranked_countries.show()


**"Medal Trends by Discipline and Country" Transformations**

In [0]:
from pyspark.sql.functions import col, sum as _sum, rank
from pyspark.sql.window import Window

# Load the medals and teams tables
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"
teams_path = "/mnt/tokyoolympic/raw_data/teams.csv"

medals_df = spark.read.format("csv").option("header",True).load(medals_path)
teams_df = spark.read.format("csv").option("header",True).load(teams_path)

# Transformation 1: Join medals data with teams to associate disciplines with medals
medals_by_discipline = medals_df.join(
    teams_df,
    medals_df.TeamCountry == teams_df.Country,
    how="inner"
).select(
    col("TeamCountry").alias("Country"),
    col("Discipline"),
    col("Gold"),
    col("Silver"),
    col("Bronze"),
    col("Total").alias("TotalMedals")
)

# Transformation 2: Calculate total medals by discipline and NOC
discipline_medals = medals_by_discipline.groupBy("Country", "Discipline").agg(
    _sum("Gold").alias("TotalGold"),
    _sum("Silver").alias("TotalSilver"),
    _sum("Bronze").alias("TotalBronze"),
    _sum("TotalMedals").alias("TotalMedalsByDiscipline")
)

# Transformation 3: Rank disciplines within each country by total medals
windowSpec = Window.partitionBy("Country").orderBy(col("TotalMedalsByDiscipline").desc())

ranked_disciplines_by_country = discipline_medals.withColumn(
    "RankByDiscipline", rank().over(windowSpec)
)

# Save the transformed data back to ADLS
discipline_medals_path = "/mnt/tokyoolympic/transformed_data/discipline_medals"
ranked_disciplines_path = "/mnt/tokyoolympic/transformed_data/ranked_disciplines_by_country"

discipline_medals.write.format("parquet").mode("overwrite").save(discipline_medals_path)
ranked_disciplines_by_country.write.format("parquet").mode("overwrite").save(ranked_disciplines_path)

# Display the results
discipline_medals.show()
ranked_disciplines_by_country.show()

**"Event Participation Insights" Transformations**

In [0]:
from pyspark.sql.functions import col, countDistinct, rank
from pyspark.sql.window import Window

# Load the teams and entriesgender tables
teams_path = "/mnt/tokyoolympic/raw_data/teams.csv"
entriesgender_path = "/mnt/tokyoolympic/raw_data/entriesgender.csv"

teams_df = spark.read.format("csv").option("header",True).load(teams_path)
entriesgender_df = spark.read.format("csv").option("header",True).load(entriesgender_path)

# Transformation 1: Calculate total number of events grouped by NOC and Discipline
event_summary = teams_df.groupBy("Country", "Discipline").agg(
    countDistinct("Event").alias("TotalEvents")
)

# Transformation 2: Combine with gender distribution data
event_participation = event_summary.join(
    entriesgender_df,
    "Discipline",
    how="inner"
).select(
    col("Country"),
    col("Discipline"),
    col("TotalEvents"),
    col("Female").cast("int").alias("FemaleParticipants"),
    col("Male").cast("int").alias("MaleParticipants"),
    col("Total").cast("int").alias("TotalParticipants")
)

# Transformation 3: Rank disciplines within each NOC by event participation
windowSpec = Window.partitionBy("Country").orderBy(col("TotalEvents").desc())

ranked_event_participation = event_participation.withColumn(
    "RankByEvents", rank().over(windowSpec)
)

# Save the transformed data back to ADLS
event_summary_path = "/mnt/tokyoolympic/transformed_data/event_summary"
ranked_event_participation_path = "/mnt/tokyoolympic/transformed_data/ranked_event_participation"

event_participation.write.format("parquet").mode("overwrite").save(event_summary_path)
ranked_event_participation.write.format("parquet").mode("overwrite").save(ranked_event_participation_path)

# Display the results
event_participation.show()
ranked_event_participation.show()


**"Comprehensive Medal and Athlete Analysis" Transformations**

In [0]:
from pyspark.sql.functions import col, count, sum as _sum, expr, rank
from pyspark.sql.window import Window

# Load the medals and athletes tables
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"
athletes_path = "/mnt/tokyoolympic/raw_data/athletes.csv"

medals_df = spark.read.format("csv").option("header",True).load(medals_path)
athletes_df = spark.read.format("csv").option("header",True).load(athletes_path)

# Transformation 1: Calculate total number of athletes per country
country_athletes = athletes_df.groupBy("Country").agg(
    count("PersonName").alias("TotalAthletes")
)

# Transformation 2: Combine medals and athlete data
medal_athlete_analysis = medals_df.join(
    country_athletes,
    medals_df.TeamCountry == country_athletes.Country,
    how="inner"
).select(
    col("TeamCountry").alias("Country"),
    col("TotalAthletes"),
    col("Gold"),
    col("Silver"),
    col("Bronze"),
    col("Total").alias("TotalMedals")
).withColumn(
    "MedalsPerAthlete", expr("ROUND(TotalMedals / TotalAthletes, 2)")
)

# Transformation 3: Rank countries based on medal efficiency
windowSpecEfficiency = Window.orderBy(col("MedalsPerAthlete").desc())

ranked_medal_efficiency = medal_athlete_analysis.withColumn(
    "RankByEfficiency", rank().over(windowSpecEfficiency)
)

# Save the transformed data back to ADLS
medal_athlete_analysis_path = "/mnt/tokyoolympic/transformed_data/medal_athlete_analysis"
ranked_medal_efficiency_path = "/mnt/tokyoolympic/transformed_data/ranked_medal_efficiency"

medal_athlete_analysis.write.format("parquet").mode("overwrite").save(medal_athlete_analysis_path)
ranked_medal_efficiency.write.format("parquet").mode("overwrite").save(ranked_medal_efficiency_path)

# Display the results
medal_athlete_analysis.show()
ranked_medal_efficiency.show()


**"Discipline-Level Gender and Medal Insights" Transformations**

In [0]:
from pyspark.sql.functions import col, sum as _sum, expr, rank
from pyspark.sql.window import Window

# Load the required tables
entriesgender_path = "/mnt/tokyoolympic/raw_data/entriesgender.csv"
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"
teams_path = "/mnt/tokyoolympic/raw_data/teams.csv"

entriesgender_df = spark.read.format("csv").option("header",True).load(entriesgender_path)
medals_df = spark.read.format("csv").option("header",True).load(medals_path)
teams_df = spark.read.format("csv").option("header",True).load(teams_path)

# Transformation 1: Aggregate total medals by Discipline
medals_by_discipline = medals_df.join(
    teams_df,
    medals_df.TeamCountry == teams_df.Country,
    how="inner"
).groupBy("Discipline").agg(
    _sum("Gold").alias("TotalGold"),
    _sum("Silver").alias("TotalSilver"),
    _sum("Bronze").alias("TotalBronze"),
    _sum("Total").alias("TotalMedals")
)

# Transformation 2: Combine gender participation with medal data
discipline_gender_medals = medals_by_discipline.join(
    entriesgender_df,
    "Discipline",
    how="inner"
).select(
    col("Discipline"),
    col("TotalGold"),
    col("TotalSilver"),
    col("TotalBronze"),
    col("TotalMedals"),
    col("Female").cast("int").alias("FemaleParticipants"),
    col("Male").cast("int").alias("MaleParticipants"),
    col("Total").cast("int").alias("TotalParticipants")
).withColumn(
    "FemalePercentage", expr("ROUND((FemaleParticipants / TotalParticipants) * 100, 2)")
).withColumn(
    "MalePercentage", expr("ROUND((MaleParticipants / TotalParticipants) * 100, 2)")
)

# Transformation 3: Rank disciplines by total medals and gender balance
windowSpecMedals = Window.orderBy(col("TotalMedals").desc())
windowSpecFemale = Window.orderBy(col("FemalePercentage").desc())

ranked_discipline_gender = discipline_gender_medals.withColumn(
    "RankByMedals", rank().over(windowSpecMedals)
).withColumn(
    "RankByFemaleParticipation", rank().over(windowSpecFemale)
)

# Save the transformed data back to ADLS
discipline_gender_medals_path = "/mnt/tokyoolympic/transformed_data/discipline_gender_medals"
ranked_discipline_gender_path = "/mnt/tokyoolympic/transformed_data/ranked_discipline_gender"

discipline_gender_medals.write.format("parquet").mode("overwrite").save(discipline_gender_medals_path)
ranked_discipline_gender.write.format("parquet").mode("overwrite").save(ranked_discipline_gender_path)

# Display the results
discipline_gender_medals.show()
ranked_discipline_gender.show()

**Continent Mapping DataFrame Creation**

In [0]:
from pyspark.sql import SparkSession

# Create Spark session if not already created
spark = SparkSession.builder.appName("ContinentMapping").getOrCreate()

# Define the continent mapping manually
continent_data = [
    ("United States of America", "North America"),
    ("People's Republic of China", "Asia"),
    ("Japan", "Asia"),
    ("Great Britain", "Europe"),
    ("ROC", "Europe"),
    ("Australia", "Oceania"),
    ("Netherlands", "Europe"),
    ("France", "Europe"),
    ("Germany", "Europe"),
    ("Italy", "Europe"),
    ("Canada", "North America"),
    ("Brazil", "South America"),
    ("New Zealand", "Oceania"),
    ("Cuba", "North America"),
    ("Hungary", "Europe"),
    ("Republic of Korea", "Asia"),
    ("Poland", "Europe"),
    ("Czech Republic", "Europe"),
    ("Kenya", "Africa"),
    ("Norway", "Europe"),
    ("Jamaica", "North America"),
    ("Spain", "Europe"),
    ("Sweden", "Europe"),
    ("Switzerland", "Europe"),
    ("Denmark", "Europe"),
    ("Croatia", "Europe"),
    ("Islamic Republic of Iran", "Asia"),
    ("Serbia", "Europe"),
    ("Belgium", "Europe"),
    ("Bulgaria", "Europe"),
    ("Slovenia", "Europe"),
    ("Uzbekistan", "Asia"),
    ("Georgia", "Asia"),
    ("Chinese Taipei", "Asia"),
    ("Turkey", "Asia"),
    ("Greece", "Europe"),
    ("Uganda", "Africa"),
    ("Ecuador", "South America"),
    ("Ireland", "Europe"),
    ("Israel", "Asia"),
    ("Qatar", "Asia"),
    ("Bahamas", "North America"),
    ("Kosovo", "Europe"),
    ("Ukraine", "Europe"),
    ("Belarus", "Europe"),
    ("Romania", "Europe"),
    ("Venezuela", "South America"),
    ("India", "Asia"),
    ("Hong Kong, China", "Asia"),
    ("Philippines", "Asia"),
    ("Slovakia", "Europe"),
    ("South Africa", "Africa"),
    ("Austria", "Europe"),
    ("Egypt", "Africa"),
    ("Indonesia", "Asia"),
    ("Ethiopia", "Africa"),
    ("Portugal", "Europe"),
    ("Tunisia", "Africa"),
    ("Estonia", "Europe"),
    ("Fiji", "Oceania"),
    ("Latvia", "Europe"),
    ("Thailand", "Asia"),
    ("Bermuda", "North America"),
    ("Morocco", "Africa"),
    ("Puerto Rico", "North America"),
    ("Colombia", "South America"),
    ("Azerbaijan", "Asia"),
    ("Dominican Republic", "North America"),
    ("Armenia", "Asia"),
    ("Kyrgyzstan", "Asia"),
    ("Mongolia", "Asia"),
    ("Argentina", "South America"),
    ("San Marino", "Europe"),
    ("Jordan", "Asia"),
    ("Malaysia", "Asia"),
    ("Nigeria", "Africa"),
    ("Bahrain", "Asia"),
    ("Saudi Arabia", "Asia"),
    ("Lithuania", "Europe"),
    ("North Macedonia", "Europe"),
    ("Namibia", "Africa"),
    ("Turkmenistan", "Asia"),
    ("Kazakhstan", "Asia"),
    ("Mexico", "North America"),
    ("Finland", "Europe"),
    ("Botswana", "Africa"),
    ("Burkina Faso", "Africa"),
    ("Côte d'Ivoire", "Africa"),
    ("Ghana", "Africa"),
    ("Grenada", "North America"),
    ("Kuwait", "Asia"),
    ("Republic of Moldova", "Europe"),
    ("Syrian Arab Republic", "Asia"),
]

# Create a DataFrame for continent mapping
continent_mapping_df = spark.createDataFrame(continent_data, ["Team Country", "Continent"])

# Save the Continent Mapping DataFrame to ADLS (optional)
continent_mapping_path = "/mnt/tokyoolympic/raw_data/continent_mapping.csv"
continent_mapping_df.write.format("parquet").mode("overwrite").save(continent_mapping_path)

# Display the Continent Mapping DataFrame
continent_mapping_df.show()

**"Overall Performance Analysis By Continent" Tranformations**

In [0]:
from pyspark.sql.functions import col, count, sum as _sum, expr

# Load the required tables
medals_path = "/mnt/tokyoolympic/raw_data/medals.csv"
athletes_path = "/mnt/tokyoolympic/raw_data/athletes.csv"
continent_mapping_path = "/mnt/tokyoolympic/raw_data/continent_mapping.csv"

medals_df = spark.read.format("csv").option("header",True).load(medals_path)
athletes_df = spark.read.format("csv").option("header",True).load(athletes_path)
continent_mapping_df = spark.read.format("csv").option("header",True).load(continent_mapping_path)

# Step 1: Join medals and continent mapping to associate continents with medals
medals_with_continents = medals_df.join(
    continent_mapping_df,
    medals_df["TeamCountry"] == continent_mapping_df["Team Country"],
    how="inner"
).select(
    col("Continent"),
    col("Gold"),
    col("Silver"),
    col("Bronze"),
    col("Total").alias("TotalMedals")
)

# Step 2: Aggregate medal counts by continent
continent_medals = medals_with_continents.groupBy("Continent").agg(
    _sum("Gold").alias("TotalGold"),
    _sum("Silver").alias("TotalSilver"),
    _sum("Bronze").alias("TotalBronze"),
    _sum("TotalMedals").alias("TotalMedalsByContinent")
)

# Step 3: Join athletes with continent mapping for participation data
athletes_with_continents = athletes_df.join(
    continent_mapping_df,
    athletes_df.Country == continent_mapping_df["Team Country"],
    how="inner"
).select(
    col("Continent"),
    col("PersonName")
)

# Step 4: Calculate athlete participation by continent
continent_participation = athletes_with_continents.groupBy("Continent").agg(
    count("PersonName").alias("TotalAthletes")
)

# Step 5: Combine medal and participation data to calculate efficiency
continent_performance = continent_medals.join(
    continent_participation,
    "Continent",
    how="inner"
).withColumn(
    "MedalsPerAthlete", expr("ROUND(TotalMedalsByContinent / TotalAthletes, 2)")
)

# Save the transformed data back to ADLS
continent_performance_path = "/mnt/tokyoolympic/transformed_data/continent_performance"

continent_performance.write.format("parquet").mode("overwrite").save(continent_performance_path)

# Display the results
continent_performance.show()
