### 🥉 Bronze Layer – Raw Data Ingestion

The Bronze layer stores **raw, immutable data** ingested directly from source systems.  
Data is stored **as-is** with minimal transformation to preserve source integrity and enable reprocessing.


### 🥉 Athletes Raw Data

Contains raw athlete information ingested from the source CSV file.

**Ingestion Characteristics:**
- Loaded without schema modification
- Header-based CSV ingestion
- Source data preserved for traceability

**Source:** `raw-data/Athletes.csv`  
**Target:** `bronze.athletes`


### 🥉 Coaches Raw Data

Stores raw coach information as received from the source system.

**Ingestion Characteristics:**
- No data cleansing or deduplication
- Schema inferred from source
- Used as input for Silver transformations

**Source:** `raw-data/Coaches.csv`  
**Target:** `bronze.coaches`


### 🥉 Teams Raw Data

Captures raw team participation data across disciplines and events.

**Ingestion Characteristics:**
- Direct CSV ingestion
- No business logic applied
- Acts as a replayable data source

**Source:** `raw-data/Teams.csv`  
**Target:** `bronze.teams`


### 🥉 Medals Raw Data

Stores raw medal counts by country.

**Ingestion Characteristics:**
- Loaded as-is from source
- No data validation at ingestion time
- Used for downstream fact table creation

**Source:** `raw-data/Medals.csv`  
**Target:** `bronze.medals`


### 🥉 Gender Entries Raw Data

Contains raw gender participation counts by discipline.

**Ingestion Characteristics:**
- Preserves original numeric values
- No calculated fields
- Serves as the base for gender analytics

**Source:** `raw-data/EntriesGender.csv`  
**Target:** `bronze.entries_gender`


In [0]:
spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

In [0]:
dbutils.fs.ls("abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net")

[FileInfo(path='abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/raw-data/', name='raw-data/', size=0, modificationTime=1768741575000),
 FileInfo(path='abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/transformed-data/', name='transformed-data/', size=0, modificationTime=1768741587000)]

In [0]:
df_athletes=spark.read.format('csv')\
            .option('header',True)\
            .option('inferSchema',True)\
            .load('abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/raw-data/Athletes.csv')
                
            

In [0]:
df_coaches=spark.read.format('csv')\
            .option('header',True)\
            .option('inferSchema',True)\
            .load('abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/raw-data/Coaches.csv')

In [0]:
df_teams=spark.read.format('csv')\
            .option('header',True)\
            .option('inferSchema',True)\
            .load('abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/raw-data/Teams.csv')

In [0]:
df_gender=spark.read.format('csv')\
            .option('header',True)\
            .option('inferSchema',True)\
            .load('abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/raw-data/EntriesGender.csv')

In [0]:
df_medals=spark.read.format('csv')\
            .option('header',True)\
            .option('inferSchema',True)\
            .load('abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/raw-data/Medals.csv')

In [0]:
files = {
    "Athletes.csv": "athletes",
    "Coaches.csv": "coaches",
    "Teams.csv": "teams",
    "Medals.csv": "medals",
    "EntriesGender.csv": "entries_gender"
}

raw_base = "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/raw-data"
bronze_base = "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/bronze"

for file_name, table_name in files.items():
    (
        spark.read
        .format("csv")
        .option("header", True)
        .option("inferSchema", True)
        .load(f"{raw_base}/{file_name}")
        .write
        .mode("overwrite")
        .parquet(f"{bronze_base}/{table_name}")
    )


# Silver Layer

### 🥈 Silver Layer – Cleaned & Conformed Data

The Silver layer contains **cleansed, standardized, and enriched datasets** derived from raw Bronze data.  
These datasets serve as **trusted sources** for analytics and Gold-layer aggregations.


In [0]:
from pyspark.sql.functions import (
    col, trim, upper, lower, split, expr,
    monotonically_increasing_id, round, when
)


### 🥈 Athlete Dimension (dim_athletes)

Stores cleaned and enriched athlete information with surrogate keys.

**Transformations Applied:**
- Trimmed and standardized text fields
- Removed duplicate athlete records
- Filtered invalid and null records
- Split athlete name into first and last names
- Generated surrogate keys for dimensional modeling

**Source:** `bronze.Athletes.csv`  
**Target:** `silver.dim_athletes`


In [0]:
df_dim_athletes = (
    df_athletes
    .withColumn("Name", trim(col("Name")))
    .withColumn("NOC", upper(trim(col("NOC"))))
    .withColumn("Discipline", trim(col("Discipline")))
    .filter(
        col("Name").isNotNull() &
        col("NOC").isNotNull() &
        col("Discipline").isNotNull()
    )
    .dropDuplicates(["Name", "NOC", "Discipline"])
    .withColumn("First_Name", split(col("Name"), " ").getItem(0))
    .withColumn("Last_Name", split(col("Name"), " ").getItem(-1))
    .withColumn("athlete_id", monotonically_increasing_id())
)

df_dim_athletes.write.mode("overwrite").parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/dim_athletes"
)


### 🥈 Coach Dimension (dim_coaches)

Contains curated coach information associated with disciplines and events.

**Transformations Applied:**
- Standardized text fields
- Handled missing event values
- Removed duplicate coach records
- Generated surrogate keys

**Source:** `bronze.Coaches.csv`  
**Target:** `silver.dim_coaches`


In [0]:
df_dim_coaches = (
    df_coaches
    .withColumn("Name", trim(col("Name")))
    .withColumn("NOC", upper(trim(col("NOC"))))
    .withColumn("Discipline", trim(col("Discipline")))
    .withColumn("Event", trim(col("Event")))
    .fillna({"Event": "General"})
    .dropDuplicates(["Name", "NOC", "Discipline", "Event"])
    .withColumn("coach_id", monotonically_increasing_id())
)

df_dim_coaches.write.mode("overwrite").parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/dim_coaches"
)


### 🥈 Team Dimension (dim_teams)

Represents teams participating across disciplines and events.

**Transformations Applied:**
- Cleaned and standardized team names
- Removed duplicate team records
- Generated surrogate keys

**Source:** `bronze.Teams.csv`  
**Target:** `silver.dim_teams`


In [0]:
df_dim_teams = (
    df_teams
    .withColumn("TeamName", trim(col("TeamName")))
    .withColumn("Discipline", trim(col("Discipline")))
    .withColumn("Event", trim(col("Event")))
    .dropDuplicates(["TeamName", "Discipline", "Event"])
    .withColumn("team_id", monotonically_increasing_id())
)

df_dim_teams.write.mode("overwrite").parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/dim_teams"
)


### 🥈 Medal Fact Table (fact_medals)

Stores medal counts by country with built-in data quality validation.

**Transformations Applied:**
- Enforced numeric data types
- Recalculated medal totals
- Flagged inconsistent medal records
- Standardized country codes

**Source:** `bronze.Medals.csv`  
**Target:** `silver.fact_medals`


In [0]:
df_fact_medals = (
    df_medals
    .withColumn("Gold", col("Gold").cast("int"))
    .withColumn("Silver", col("Silver").cast("int"))
    .withColumn("Bronze", col("Bronze").cast("int"))
    .withColumn("Total", col("Total").cast("int"))
    .withColumn("calculated_total", expr("Gold + Silver + Bronze"))
    .withColumn(
        "total_mismatch_flag",
        when(col("Total") != col("calculated_total"), 1).otherwise(0)
    )
)

df_fact_medals.write.mode("overwrite").parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/fact_medals"
)


### 🥈 Gender Participation (gender_participation)

Captures gender-wise athlete participation by discipline.

**Transformations Applied:**
- Cast participation counts to numeric types
- Revalidated total counts
- Flagged data quality issues
- Calculated gender participation ratios

**Source:** `bronze.EntriesGender.csv`  
**Target:** `silver.gender_participation`


In [0]:
df_gender_participation = (
    df_gender
    .withColumn("Discipline", trim(col("Discipline")))
    .withColumn("Female", col("Female").cast("int"))
    .withColumn("Male", col("Male").cast("int"))
    .withColumn("Total", col("Total").cast("int"))
    .withColumn("calculated_total", expr("Female + Male"))
    .withColumn(
        "total_mismatch_flag",
        when(col("Total") != col("calculated_total"), 1).otherwise(0)
    )
    .withColumn("female_ratio", round(col("Female") / col("Total"), 2))
    .withColumn("male_ratio", round(col("Male") / col("Total"), 2))
)

df_gender_participation.write.mode("overwrite").parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/gender_participation"
)


# Gold Layer

### 🥇 Gold Layer – Business Aggregations

The Gold layer contains **business-ready, aggregated datasets** derived from Silver tables.  
These datasets are optimized for **analytics, BI dashboards, and executive reporting**.


In [0]:
df_dim_athletes = spark.read.parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/dim_athletes"
)

df_dim_coaches = spark.read.parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/dim_coaches"
)

df_dim_teams = spark.read.parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/dim_teams"
)

df_fact_medals = spark.read.parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/fact_medals"
)

df_gender = spark.read.parquet(
    "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/silver/gender_participation"
)


### 🥇 Medals by Country

Aggregates total gold, silver, bronze, and overall medals won by each country.

**Business Use:**
- Identify top-performing countries
- Compare medal distribution across nations

**Source:** `silver.fact_medals`  
**Target:** `gold.medals_by_country`


### 🥇 Medal Rankings (Leaderboard)

Ranks countries based on total medals won using window functions.

**Business Use:**
- Generate Olympic leaderboards
- Compare country performance rankings

**Source:** `gold.medals_by_country`  
**Target:** `gold.medals_ranked`


In [0]:
df_gold_medals_by_country = (
    df_fact_medals
    .groupBy("Team/NOC")
    .agg(
        expr("sum(Gold) as gold_medals"),
        expr("sum(Silver) as silver_medals"),
        expr("sum(Bronze) as bronze_medals"),
        expr("sum(Total) as total_medals")
    )
)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

window_spec = Window.orderBy(col("total_medals").desc())

df_gold_medals_ranked = (
    df_gold_medals_by_country
    .withColumn("rank", dense_rank().over(window_spec))
)


### 🥇 Athletes by Discipline

Counts the number of athletes participating in each discipline.

**Business Use:**
- Identify high-participation sports
- Analyze popularity of disciplines

**Source:** `silver.dim_athletes`  
**Target:** `gold.athletes_by_discipline`


In [0]:
df_gold_athletes_by_discipline = (
    df_dim_athletes
    .groupBy("Discipline")
    .count()
    .withColumnRenamed("count", "athlete_count")
)


### 🥇 Athletes by Country

Calculates total athletes representing each country.

**Business Use:**
- Measure country-level participation
- Compare athlete representation

**Source:** `silver.dim_athletes`  
**Target:** `gold.athletes_by_country`


In [0]:
df_gold_athletes_by_country = (
    df_dim_athletes
    .groupBy("NOC")
    .count()
    .withColumnRenamed("count", "athlete_count")
)


### 🥇 Teams by Country

Aggregates number of teams per country across all disciplines.

**Business Use:**
- Analyze team-based participation
- Compare team involvement by country

**Source:** `silver.dim_teams`  
**Target:** `gold.teams_by_country`


In [0]:
df_gold_teams_by_country = (
    df_dim_teams
    .groupBy("Country")
    .count()
    .withColumnRenamed("count", "team_count")
)


### 🥇 Coaches by Country

Counts the number of coaches associated with each country.

**Business Use:**
- Evaluate coaching investment
- Compare coaching scale across countries

**Source:** `silver.dim_coaches`  
**Target:** `gold.coaches_by_country`


In [0]:
df_gold_coaches_by_country = (
    df_dim_coaches
    .groupBy("NOC")
    .count()
    .withColumnRenamed("count", "coach_count")
)


### 🥇 Gender Ratio by Discipline

Analyzes male and female participation ratios across disciplines.

**Business Use:**
- Evaluate gender diversity
- Identify balanced vs skewed disciplines

**Source:** `silver.gender_participation`  
**Target:** `gold.gender_ratio_by_discipline`


In [0]:
df_gold_gender_ratio = (
    df_gender
    .select(
        "Discipline",
        "Female",
        "Male",
        "Total",
        "female_ratio",
        "male_ratio"
    )
)


### 🥇 Overall Olympic KPIs

Provides a high-level executive summary of Olympic participation and outcomes.

**KPIs Included:**
- Total Athletes
- Total Teams
- Total Coaches
- Total Medals Awarded

**Source:** Multiple Silver tables  
**Target:** `gold.overall_kpis`


In [0]:
df_gold_overall_kpis = df_dim_athletes.select(
    expr("count(distinct athlete_id) as total_athletes")
).crossJoin(
    df_dim_teams.select(
        expr("count(distinct team_id) as total_teams")
    )
).crossJoin(
    df_dim_coaches.select(
        expr("count(distinct coach_id) as total_coaches")
    )
).crossJoin(
    df_fact_medals.select(
        expr("sum(Total) as total_medals_awarded")
    )
)


In [0]:
base_gold_path = "abfss://datapipelinecontainer@dataenginerringproject.dfs.core.windows.net/gold"

df_gold_medals_by_country.write.mode("overwrite").parquet(f"{base_gold_path}/medals_by_country")
df_gold_medals_ranked.write.mode("overwrite").parquet(f"{base_gold_path}/medals_ranked")
df_gold_athletes_by_discipline.write.mode("overwrite").parquet(f"{base_gold_path}/athletes_by_discipline")
df_gold_athletes_by_country.write.mode("overwrite").parquet(f"{base_gold_path}/athletes_by_country")
df_gold_teams_by_country.write.mode("overwrite").parquet(f"{base_gold_path}/teams_by_country")
df_gold_coaches_by_country.write.mode("overwrite").parquet(f"{base_gold_path}/coaches_by_country")
df_gold_gender_ratio.write.mode("overwrite").parquet(f"{base_gold_path}/gender_ratio_by_discipline")
df_gold_overall_kpis.write.mode("overwrite").parquet(f"{base_gold_path}/overall_kpis")
