In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as F_sum

In [4]:
spark = SparkSession.builder \
    .appName("Tokyo Olympic Data Transformation") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()


In [12]:
mkdir -p /Users/deepasangoll/Final Project ADW/output/gold/


In [13]:
# Save Dimension and Fact Tables
output_base_path = "/Users/deepasangoll/Final Project ADW/output/gold/"
dim_athletes.write.format("parquet").mode("overwrite").save(output_base_path + "dim_athletes")
dim_coaches.write.format("parquet").mode("overwrite").save(output_base_path + "dim_coaches")
dim_teams.write.format("parquet").mode("overwrite").save(output_base_path + "dim_teams")
fact_medal_wins.write.format("parquet").mode("overwrite").save(output_base_path + "fact_medal_wins")
fact_entries_gender.write.format("parquet").mode("overwrite").save(output_base_path + "fact_entries_gender")

# Terminate Spark Session
spark.stop()


In [17]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, sum as F_sum
import os

# Initialize Spark Session with configuration
spark = SparkSession.builder \
    .appName("Tokyo Olympic Data Transformation") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

# Set log level to ERROR to reduce log output verbosity
spark.sparkContext.setLogLevel("ERROR")

print("Spark Session initialized.")

# Bronze Layer: Load Raw Data
athletes = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Users/deepasangoll/Final Project ADW/raw-data/athletes.csv")
coaches = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Users/deepasangoll/Final Project ADW/raw-data/coaches.csv")
entriesgender = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Users/deepasangoll/Final Project ADW/raw-data/entriesgender.csv")
medals = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Users/deepasangoll/Final Project ADW/raw-data/medals.csv")
teams = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/Users/deepasangoll/Final Project ADW/raw-data/teams.csv")

print("Data loaded successfully.")

# Silver Layer: Clean and Transform Data
athletes = athletes.dropDuplicates()
coaches = coaches.dropDuplicates().fillna({"Name": "Unknown"})
entriesgender = entriesgender.dropDuplicates()
medals = medals.dropDuplicates()
teams = teams.dropDuplicates()

print("Data cleaned and transformed.")

# Gold Layer: Create Dimension Tables
dim_athletes = athletes.select(col("PersonName").alias("athlete_name"), "Country", "Discipline").dropDuplicates()
dim_coaches = coaches.select(col("Name").alias("coach_name"), "Country", "Discipline").dropDuplicates()
dim_teams = teams.select(col("TeamName").alias("team_name"), "Discipline", "Country").dropDuplicates()

print("Dimension tables created.")

# Gold Layer: Create Fact Tables
fact_medal_wins = medals.groupBy("Team_Country").agg(
    F_sum("Gold").alias("total_gold"),
    F_sum("Silver").alias("total_silver"),
    F_sum("Bronze").alias("total_bronze")
)
fact_entries_gender = entriesgender.groupBy("Discipline").agg(
    F_sum("Male").alias("total_male_entries"),
    F_sum("Female").alias("total_female_entries")
)

print("Fact tables created.")

# Define the base output path
output_base_path = "/Users/deepasangoll/Final Project ADW/Final Project ADW/gold/"

# Ensure the directory exists
os.makedirs(output_base_path, exist_ok=True)
print(f"Output directory verified: {output_base_path}")

# Save Dimension and Fact Tables as CSV files
dim_athletes.write.format("csv").mode("overwrite").option("header", "true").save(output_base_path + "dim_athletes")
print("Dimension table for athletes saved as CSV.")

dim_coaches.write.format("csv").mode("overwrite").option("header", "true").save(output_base_path + "dim_coaches")
print("Dimension table for coaches saved as CSV.")

dim_teams.write.format("csv").mode("overwrite").option("header", "true").save(output_base_path + "dim_teams")
print("Dimension table for teams saved as CSV.")

fact_medal_wins.write.format("csv").mode("overwrite").option("header", "true").save(output_base_path + "fact_medal_wins")
print("Fact table for medal wins saved as CSV.")

fact_entries_gender.write.format("csv").mode("overwrite").option("header", "true").save(output_base_path + "fact_entries_gender")
print("Fact table for entries by gender saved as CSV.")

# Terminate Spark Session
spark.stop()
print("Spark Session terminated. Processing complete.")


Spark Session initialized.
Data loaded successfully.
Data cleaned and transformed.
Dimension tables created.
Fact tables created.
Output directory verified: /Users/deepasangoll/Final Project ADW/Final Project ADW/gold/
Dimension table for athletes saved as CSV.
Dimension table for coaches saved as CSV.
Dimension table for teams saved as CSV.
Fact table for medal wins saved as CSV.
Fact table for entries by gender saved as CSV.
Spark Session terminated. Processing complete.
