In [0]:
%run "/Workspace/Users/sheeba.banu@diggibyte.com/databricks_assignment/src/question1/source_to_bronze/utils"

In [0]:
from pyspark.sql import functions as F

# Read silver table
employee_df = spark.read.table("workspace.employee_info.dim_employee")

# 1️⃣ Salary of each department in descending order
dept_salary_df = (
    employee_df.groupBy("department_id")
               .agg(F.sum("salary").alias("total_salary"))
               .orderBy(F.desc("total_salary"))
)

# 2️⃣ Number of employees in each department per country
dept_country_count_df = (
    employee_df.groupBy("department_id", "country_id")
               .agg(F.count("employee_id").alias("employee_count"))
)

# 3️⃣ Department names along with country names
# (Assuming you have department & country reference data, else skip)
department_df = spark.read.csv("/Volumes/assignment/default/write_path/department/", header=True)
country_df = spark.read.csv("/Volumes/assignment/default/write_path/country/", header=True)
dept_country_df = (
    employee_df.join(department_df, employee_df["department_id"] == department_df["DepartmentID"],"inner")
    .join(country_df, employee_df["country_id"] == country_df["CountryCode"], "inner")
                 .select("department_id", "country_id")
                 .withColumn("at_load_date", F.current_date())
)

# 4️⃣ Average age of employees in each department
avg_age_df = (
    employee_df.groupBy("department_id")
               .agg(F.avg("age").alias("avg_age"))
)

# Add load date
dept_salary_df = add_load_date(dept_salary_df, "at_load_date")
dept_country_count_df = add_load_date(dept_country_count_df, "at_load_date")
avg_age_df = add_load_date(avg_age_df, "at_load_date")

spark.sql("CREATE SCHEMA IF NOT EXISTS assignment.gold")

# Write Gold outputs with overwrite + replaceWhere
gold_schema = "assignment.gold"

(
    dept_salary_df.write.format("delta")
    .mode("overwrite")
    .option("replaceWhere", "at_load_date = current_date()")
    .saveAsTable(f"{gold_schema}.fact_dept_salary")
)

(
    dept_country_count_df.write.format("delta")
    .mode("overwrite")
    .option("replaceWhere", "at_load_date = current_date()")
    .saveAsTable(f"{gold_schema}.fact_dept_country_count")
)

(
    dept_country_df.write.format("delta")
    .mode("overwrite")
    .option("replaceWhere", "at_load_date = current_date()")
    .saveAsTable(f"{gold_schema}.fact_department_country")
)

(
    avg_age_df.write.format("delta")
    .mode("overwrite")
    .option("replaceWhere", "at_load_date = current_date()")
    .saveAsTable(f"{gold_schema}.fact_avg_age")
)

print("✅ Gold tables written successfully.")


In [0]:
display(dept_salary_df)
display(dept_country_count_df)
display(dept_country_df)
display(avg_age_df)