
# ETL Data Pipeline on Azure with PySpark (Tokyo Olympics)

**Author:** Sharvari Pataskar  
**Last updated:** 2025-08-19 17:50:24

This notebook demonstrates an end-to-end **ETL (Extract–Transform–Load)** flow on **Azure Data Lake Storage Gen2 (ADLS Gen2)** using **PySpark**.

**Datasets (CSV in ADLS raw zone):**
- `Athletes.csv`
- `Coaches.csv`
- `EntriesGender.csv`
- `Medals.csv`
- `Teams.csv`

**What this notebook covers**
1. Environment & dependency setup  
2. Secure Spark configuration for ADLS Gen2 (OAuth 2.0 Client Credentials)  
3. **Extract** raw CSVs from ADLS  
4. **Load** to a **curated zone** as **Parquet** (optimized columnar format)  
5. **Transform** selected tables and derive **analytics**  
6. **Load** transformed outputs to a dedicated **transformed zone**  
7. **Verify** schemas and records, and discuss **next steps** for BI/Synapse

> ⚠️ **Security Note:** Do **NOT** hardcode secrets or keys in source control. In this notebook we use **placeholders**. Prefer using Databricks **Secrets**, Azure **Service Principals**, or MSI.



## 1) Prerequisites

- **Azure resources**: ADLS Gen2 storage account + container with the Tokyo Olympics CSVs under `raw-data/`  
- **Identity**: Azure AD **App registration** (Client ID, Client Secret, Tenant ID) or Managed Identity  
- **Compute**: Databricks cluster (recommended) or local Spark with Hadoop Azure connectors  
- **Permissions**: The SPN / identity must have **Storage Blob Data Contributor** on the storage account

> Tip (Databricks): Store secrets in a **Secret Scope**, then read with `dbutils.secrets.get()`.


In [None]:

# If running on Databricks, you can %pip install packages per cluster or use a cluster init script.
# In plain Jupyter, uncomment the line below.
# !pip install azure-identity azure-storage-blob



## 2) Configure Spark to access ADLS Gen2 (OAuth 2.0)

Replace the placeholders below.  
**Never commit real secrets** to GitHub.


In [None]:

# ==== EDIT THESE VALUES (do NOT commit real secrets) ====
STORAGE_ACCOUNT = "tokyoolympicdatasharvari"  # without suffix
CONTAINER       = "tokyoolympicdatasharvari"  # container name
TENANT_ID       = "<TENANT_ID>"
CLIENT_ID       = "<CLIENT_ID>"
CLIENT_SECRET   = "<CLIENT_SECRET>"  # Prefer getting from secret store e.g., dbutils.secrets.get()

# Build DFS endpoint once (ADLS Gen2)
DFS_HOST = f"{STORAGE_ACCOUNT}.dfs.core.windows.net"

# Optional: If using Databricks secrets, do something like:
# CLIENT_SECRET = dbutils.secrets.get(scope="<scope-name>", key="<key-name>")


In [None]:

# Spark configs for OAuth2 Client Credentials flow
spark.conf.set(f"fs.azure.account.auth.type.{DFS_HOST}", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{DFS_HOST}", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{DFS_HOST}", CLIENT_ID)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{DFS_HOST}", CLIENT_SECRET)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{DFS_HOST}", f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/token")

print("Spark configured for ADLS Gen2 (OAuth 2.0).")



## 3) Define Storage Paths

We organize data in zones: **raw-data** → **curated** → **transformed-data**.


In [None]:

raw_base        = f"abfss://{CONTAINER}@{DFS_HOST}/raw-data"
curated_base    = f"abfss://{CONTAINER}@{DFS_HOST}/curated"
transformed_base= f"abfss://{CONTAINER}@{DFS_HOST}/transformed-data"

paths = {
    "athletes_csv":        f"{raw_base}/Athletes.csv",
    "coaches_csv":         f"{raw_base}/Coaches.csv",
    "entries_gender_csv":  f"{raw_base}/EntriesGender.csv",
    "medals_csv":          f"{raw_base}/Medals.csv",
    "teams_csv":           f"{raw_base}/Teams.csv",
    "athletes_parquet":    f"{curated_base}/Athletes",
    "coaches_parquet":     f"{curated_base}/Coaches",
    "entries_parquet":     f"{curated_base}/EntriesGender",
    "medals_parquet":      f"{curated_base}/Medals",
    "teams_parquet":       f"{curated_base}/Teams",
    "athletes_tx":         f"{transformed_base}/Athletes",
    "coaches_tx":          f"{transformed_base}/Coaches",
    "entries_tx":          f"{transformed_base}/EntriesGender",
    "medals_tx":           f"{transformed_base}/Medals",
    "teams_tx":            f"{transformed_base}/Teams",
}
paths



## 4) Extract — Read raw CSVs from ADLS

Set `header=True` for header rows and `inferSchema=True` for automatic type inference.


In [None]:

athletes = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(paths["athletes_csv"]))
coaches = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(paths["coaches_csv"]))
entries_gender = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(paths["entries_gender_csv"]))
medals = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(paths["medals_csv"]))
teams = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(paths["teams_csv"]))

print("Sample preview of Athletes:")
athletes.show(5)
athletes.printSchema()

print("Row counts (raw):")
for name, df in [("Athletes", athletes), ("Coaches", coaches), ("EntriesGender", entries_gender), ("Medals", medals), ("Teams", teams)]:
    print(name, df.count())



## 5) Load — Write curated Parquet datasets

Parquet is a compressed, columnar format that accelerates analytics workloads.


In [None]:

(athletes.write.mode("overwrite").parquet(paths["athletes_parquet"]))
(coaches.write.mode("overwrite").parquet(paths["coaches_parquet"]))
(entries_gender.write.mode("overwrite").parquet(paths["entries_parquet"]))
(medals.write.mode("overwrite").parquet(paths["medals_parquet"]))
(teams.write.mode("overwrite").parquet(paths["teams_parquet"]))

print("Curated Parquet writes completed.")



## 6) Quick Verification — Read curated Parquet


In [None]:

athletes_pq = spark.read.parquet(paths["athletes_parquet"])
coaches_pq = spark.read.parquet(paths["coaches_parquet"])
entries_pq = spark.read.parquet(paths["entries_parquet"])
medals_pq = spark.read.parquet(paths["medals_parquet"])
teams_pq = spark.read.parquet(paths["teams_parquet"])

for name, df in [("Athletes", athletes_pq), ("Coaches", coaches_pq), ("EntriesGender", entries_pq), ("Medals", medals_pq), ("Teams", teams_pq)]:
    print(f"\n{name} (curated) schema:")
    df.printSchema()
    print(f"{name} (curated) sample:")
    df.show(5)



## 7) Transform — Type casting and derived metrics

We cast **EntriesGender** numeric columns and compute gender share.  
We also rank countries by **Gold** medals.


In [None]:

from pyspark.sql.functions import col, round as _round
from pyspark.sql.types import IntegerType

# Ensure numeric types
entries_tx = (entries_pq
              .withColumn("Female", col("Female").cast(IntegerType()))
              .withColumn("Male",   col("Male").cast(IntegerType()))
              .withColumn("Total",  col("Total").cast(IntegerType()))
              .withColumn("Share_Female", _round(col("Female")/col("Total"), 4))
              .withColumn("Share_Male",   _round(col("Male")/col("Total"), 4))
             )

print("Transformed EntriesGender schema:")
entries_tx.printSchema()
entries_tx.show(10)


In [None]:

# Top countries by Gold medals (descending)
from pyspark.sql.functions import desc

# Some datasets use 'Team/NOC' or 'TeamCountry'. Adjust column name if needed.
gold_col = "Gold" if "Gold" in medals_pq.columns else None
team_col = None
for c in ["TeamCountry", "Team/NOC", "Team", "Country"]:
    if c in medals_pq.columns:
        team_col = c
        break

if gold_col and team_col:
    top_gold = medals_pq.select(team_col, gold_col).orderBy(desc(gold_col))
    print("Top 10 by Gold medals:")
    top_gold.show(10)
else:
    print("Expected columns for medals not found. Available columns:", medals_pq.columns)



## 8) Load — Write transformed outputs


In [None]:

athletes_tx = athletes_pq  # (no-op example; add business rules as needed)
coaches_tx  = coaches_pq
medals_tx   = medals_pq
teams_tx    = teams_pq

athletes_tx.write.mode("overwrite").parquet(paths["athletes_tx"])
coaches_tx.write.mode("overwrite").parquet(paths["coaches_tx"])
entries_tx.write.mode("overwrite").parquet(paths["entries_tx"])
medals_tx.write.mode("overwrite").parquet(paths["medals_tx"])
teams_tx.write.mode("overwrite").parquet(paths["teams_tx"])

print("Transformed Parquet writes completed.")



## 9) Validate Transformed Zone


In [None]:

athletes_check = spark.read.parquet(paths["athletes_tx"])
coaches_check = spark.read.parquet(paths["coaches_tx"])
entries_check = spark.read.parquet(paths["entries_tx"])
medals_check = spark.read.parquet(paths["medals_tx"])
teams_check = spark.read.parquet(paths["teams_tx"])

for name, df in [("Athletes_tx", athletes_check), ("Coaches_tx", coaches_check), ("EntriesGender_tx", entries_check), ("Medals_tx", medals_check), ("Teams_tx", teams_check)]:
    print(f"\n{name} schema:")
    df.printSchema()
    print(f"{name} sample:")
    df.show(5)



## 10) (Optional) Useful Patterns & Hardening

- **Partitioning**: When data is large, write Parquet partitioned by columns (e.g., `year`, `discipline`).  
  ```python
  df.write.mode("overwrite").partitionBy("Discipline").parquet(".../path")
  ```

- **Schema-on-read**: For production, define explicit schemas to avoid inference drift.  
- **Delta Lake**: Switch to Delta for ACID upserts & time travel.  
- **Secrets**: Use `dbutils.secrets.get(scope, key)` or Azure Key Vault.  
- **Monitoring**: Track job runs via **Databricks Jobs** or **ADF**.  
- **Downstream**: Connect **Power BI** or **Synapse Serverless** to curated/transformed zones.



## 11) Conclusion & Next Steps

✅ Built a clean ETL pipeline using PySpark on ADLS Gen2: **Extract → Curate (Parquet) → Transform → Validate**.  
➡️ Extend with **Delta Lake**, add **data quality checks** (Great Expectations), and publish to a BI layer.

**Repository Tips (for GitHub):**
- Keep this notebook under `notebooks/etl_pipeline_tokyo_olympics.ipynb`
- Add `docs/architecture.png` (simple diagram of Raw → Curated → Transformed)
- Include a `README.md` describing setup, flow, and sample outputs
- Add `.gitignore` to exclude checkpoints and local artifacts
