In [0]:
%run ../source_to_bronze/utils

In [0]:
# MAGIC %run ../source_to_bronze/utils

from pyspark.sql.functions import col, count, avg, sum as sum_, desc

# Define paths
volume_path = "/Volumes/my_catalog/my_schema/my_volume"
silver_path = f"{volume_path}/silver/employee_info/dim_employee"
bronze_path = f"{volume_path}/bronze"
gold_path = f"{volume_path}/gold/employee"

# Step 1: Read silver data (employee delta table)
df = spark.read.format("delta").load(silver_path)

# Step 2: Read department and country CSVs from bronze
department_df = spark.read.option("header", True).csv(f"{bronze_path}/department")
country_df = spark.read.option("header", True).csv(f"{bronze_path}/country")

# Step 3: Convert column names to snake_case
department_df = rename_columns_to_snake_case(department_df)
country_df = rename_columns_to_snake_case(country_df)

# Step 4: Perform joins using corrected snake_case column names
joined_df = df.join(department_df, df["department"] == department_df["department_i_d"], "left") \
              .join(country_df, df["country"] == country_df["country_code"], "left")

# Step 5: Add at_load_date column
joined_df = add_at_load_date(joined_df)

# GOLD Layer: Save various business metrics

# 1.Total Salary per Department (Descending)
salary_df = joined_df.groupBy("department_name") \
    .agg(sum_("salary").alias("total_salary")) \
    .orderBy(desc("total_salary"))

salary_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_employee_salary")

# 2.Employee count per department per country
emp_count_df = joined_df.groupBy("department_name", "country_name") \
    .agg(count("*").alias("employee_count"))

emp_count_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_employee_count")

# 3.Distinct departments with country names
dept_country_df = joined_df.select("department_name", "country_name").distinct()

dept_country_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_dept_country")

# 4.Average age per department
avg_age_df = joined_df.groupBy("department_name") \
    .agg(avg("age").alias("average_age"))

avg_age_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_avg_age")


In [0]:
import os

volume_path = "/Volumes/my_catalog/my_schema/my_volume/gold/employee"

# For example:
df1 = spark.read.format("delta").load(f"{volume_path}/fact_employee_salary")
df1.display()

df2 = spark.read.format("delta").load(f"{volume_path}/fact_employee_count")
df2.display()

df3 = spark.read.format("delta").load(f"{volume_path}/fact_avg_age")
df3.display()

df4 = spark.read.format("delta").load(f"{volume_path}/fact_dept_country")
df4.display()

joined_df.display()
