### _bronze_layer moving file system data to hive table for querable format_

### # Access Key (One-time config)

In [0]:
spark.conf.set(
  "*************************************************************************"
)


_Read Parquet file as _DataFrame__

In [0]:
df_athletes = spark.read.parquet("abfss://bronze@olympicprjdl.dfs.core.windows.net/athletes/")
df_athletes.show(5)

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS bronze_db")

_Save tables from DataFrame_

In [0]:
df_athletes.write.mode("overwrite").saveAsTable("bronze_db.athletes")

_Validate Table Output_

In [0]:
%sql
SELECT * FROM bronze_db.athletes LIMIT 10

In [0]:
df_coaches = spark.read.parquet("abfss://bronze@olympicprjdl.dfs.core.windows.net/coaches/")
df_coaches.write.mode("overwrite").saveAsTable("bronze_db.coaches")

In [0]:
df_events = spark.read.parquet("abfss://bronze@olympicprjdl.dfs.core.windows.net/events/")
df_events.write.mode("overwrite").saveAsTable("bronze_db.events")


In [0]:
%sql
SELECT * FROM bronze_db.events LIMIT 10;

In [0]:
%sql
SHOW TABLES IN bronze_db;

_SILVER LAYER GOAL:
Bronze → Raw Data
Silver → Cleaned + Filtered + Useful for business logic_

_Create Silver Database_

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS silver_db")

In [0]:
from pyspark.sql.functions import to_date, months_between, floor, current_date, col

# 1. Load Bronze table
df = spark.table("bronze_db.athletes")

# 2. Parse birth_date and calculate age
df = df.withColumn("birth_date_parsed", to_date(col("birth_date"), "yyyy-MM-dd"))
df = df.withColumn("age", floor(months_between(current_date(), col("birth_date_parsed")) / 12))



# 4. Select only useful columns
df_silver = df.select(
    "code", "name", "gender", "age", "country", "events", "coach"
)


# 5. Save to silver layer table
df_silver.write.mode("overwrite").saveAsTable("silver_db.athletes")


In [0]:
df_silver = df.select(
    "code", "name", "gender", "age", "country", "coach", 
    "height", "weight", "birth_place", "residence_country"
)

# Save to table
df_silver.write.mode("overwrite").saveAsTable("silver_db.athletes")

In [0]:
df_silver.count()
df_silver.show(5)

In [0]:
%sql
SELECT * FROM silver_db.athletes LIMIT 5;

_countries by number of athletes_


In [0]:
df_silver.groupBy("country").count().orderBy("count", ascending=False).show(10)

In [0]:
# Average age per country
df_silver.groupBy("country").avg("age").orderBy("avg(age)", ascending=False).show(10)


## _Steps_

In [0]:
# Load from Bronze
df_bronze_coaches = spark.table("bronze_db.coaches")
df_bronze_events = spark.table("bronze_db.events")


In [0]:
df_bronze_coaches.printSchema()


In [0]:
#  Simple Cleaning & Selection
df_silver_coaches = df_bronze_coaches.select(
    "code", "name", "gender", "disciplines", "country", "birth_date"
)
#  Save as Table
df_silver_coaches.write.mode("overwrite").saveAsTable("silver_db.coaches")



## _check_ _schema_

In [0]:
df_bronze_events.printSchema()


_Cleaned schema_

In [0]:
df_silver_events = df_bronze_events.select(
    "tag", "sport", "event", "sport_code"
).where("event IS NOT NULL AND event != ''")
df_silver_events.write.mode("overwrite").saveAsTable("silver_db.events")

In [0]:
%sql
SELECT * FROM silver_db.coaches LIMIT 5;
-- SELECT * FROM silver_db.events LIMIT 5;


## _Gold Layer_

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS gold_db")


In [0]:
# 1. Gold Table: Top Countries by Number of Athletes
from pyspark.sql.functions import count

gold_country_athletes = spark.table("silver_db.athletes") \
    .groupBy("country") \
    .agg(count("code").alias("total_athletes")) \
    .orderBy("total_athletes", ascending=False)

gold_country_athletes.write.mode("overwrite").saveAsTable("gold_db.top_countries_by_athletes")


In [0]:
#  Gold Table: Coaches with Most Athletes
gold_top_coaches = spark.table("silver_db.coaches") \
    .groupBy("name") \
    .agg(count("code").alias("athletes_trained")) \
    .orderBy("athletes_trained", ascending=False)

gold_top_coaches.write.mode("overwrite").saveAsTable("gold_db.top_coaches")


In [0]:
# Event Count per Country
from pyspark.sql.functions import countDistinct

gold_event_count = spark.table("silver_db.events") \
    .groupBy("sport") \
    .agg(countDistinct("event").alias("total_events")) \
    .orderBy("total_events", ascending=False)

gold_event_count.write.mode("overwrite").saveAsTable("gold_db.events_per_country")


In [0]:
%sql
-- SELECT * FROM gold_db.top_countries_by_athletes LIMIT 5;
-- SELECT * FROM gold_db.top_coaches LIMIT 5;
;
SELECT * FROM gold_db.events_per_country LIMIT 5;


Code for Exporting to Lake


In [0]:
# 1. Bronze Layer Export to ADLS
df_bronze = spark.table("bronze_db.athletes")

df_bronze.write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .save("abfss://bronze@olympicprjdl.dfs.core.windows.net/bronze_db/athletes")

df_bronze = spark.table("bronze_db.coaches")

df_bronze.write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .save("abfss://bronze@olympicprjdl.dfs.core.windows.net/bronze_db/coaches")

df_bronze = spark.table("bronze_db.events")

df_bronze.write \
    .mode("overwrite") \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .save("abfss://bronze@olympicprjdl.dfs.core.windows.net/bronze_db/events")


In [0]:
# 2. Silver Layer Export to ADLS
df_silver = spark.table("silver_db.athletes")

df_silver.write \
    .mode("overwrite") \
    .format("delta") \
    .save("abfss://silver@olympicprjdl.dfs.core.windows.net/silver_db/athletes")

df_silver = spark.table("silver_db.coaches")

df_silver.write \
    .mode("overwrite") \
    .format("delta") \
    .save("abfss://silver@olympicprjdl.dfs.core.windows.net/silver_db/coaches")

df_silver = spark.table("silver_db.events")

df_silver.write \
    .mode("overwrite") \
    .format("delta") \
    .save("abfss://silver@olympicprjdl.dfs.core.windows.net/silver_db/events")


__gold_layer Export_to_Adls

In [0]:
# Top Countries by Athletes
df_top_countries = spark.table("gold_db.top_countries_by_athletes")

df_top_countries.write \
    .mode("overwrite") \
    .format("parquet") \
    .save("abfss://gold@olympicprjdl.dfs.core.windows.net/gold_db/top_countries_by_athletes")

df_top_coaches = spark.table("gold_db.top_coaches")

#  Top Coaches
df_top_coaches.write \
    .mode("overwrite") \
    .format("parquet") \
    .save("abfss://gold@olympicprjdl.dfs.core.windows.net/gold_db/top_coaches")

# Events per Country
df_events_per_country = spark.table("gold_db.events_per_country")

df_events_per_country.write \
    .mode("overwrite") \
    .format("parquet") \
    .save("abfss://gold@olympicprjdl.dfs.core.windows.net/gold_db/events_per_country")
