In [0]:
%run "/Workspace/Users/edwin.a@diggibyte.com/databricks_assignment/src/question_1/source_to_bronze/util"

In [0]:
from pyspark.sql.functions import col, count, avg, sum as sum_, desc

# Define paths
volume_path = "/Volumes/workspace/default/assignment_practice"
silver_path = f"{volume_path}/dim_employee"
bronze_path = f"{volume_path}/df_write"
gold_path = f"{volume_path}/gold/employee"

# Step 1: Read silver data (employee delta table)
df = spark.read.format("delta").load(silver_path)

# Step 2: Read department and country CSVs from bronze
department_df = spark.read.option("header", True).csv(f"{bronze_path}/Department.csv/")
country_df = spark.read.option("header", True).csv(f"{bronze_path}/Country.csv/")

# Step 3: Convert column names to snake_case
department_df = rename_column_to_snake_case(department_df)
country_df = rename_column_to_snake_case(country_df)

# Step 4: Perform joins using corrected snake_case column names
joined_df = df.join(department_df, df["department_i_d"] == department_df["department_i_d"], "left") \
              .join(country_df, df["country_i_d"] == country_df["country_code"], "left")

# Step 5: Add at_load_date column
joined_df = add_at_load_date(joined_df)


# 1.salary of each department in descending order.
salary_df = joined_df.groupBy("department_name") \
    .agg(sum_("salary").alias("total_salary")) \
    .orderBy(desc("total_salary"))

salary_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_employee_salary")

# 2.the number of employees in each department located in each country.
emp_count_df = joined_df.groupBy("department_name", "country_name") \
    .agg(count("*").alias("employee_count"))

emp_count_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_employee_count")

# 3.the department names along with their corresponding country names.
dept_country_df = joined_df.select("department_name", "country_name").distinct()

dept_country_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_dept_country")

# 4.the average age of employees in each department
avg_age_df = joined_df.groupBy("department_name") \
    .agg(avg("age").alias("average_age"))

avg_age_df.write.format("delta").mode("overwrite").save(f"{gold_path}/fact_avg_age")

In [0]:
import os

volume_path = "/Volumes/workspace/default/assignment_practice/gold/employee"

df1 = spark.read.format("delta").load(f"{volume_path}/fact_employee_salary")
df1.display()

df2 = spark.read.format("delta").load(f"{volume_path}/fact_employee_count")
df2.display()

df3 = spark.read.format("delta").load(f"{volume_path}/fact_avg_age")
df3.display()

df4 = spark.read.format("delta").load(f"{volume_path}/fact_dept_country")
df4.display()

joined_df.display()