In [None]:
import os
from dotenv import load_dotenv
load_dotenv()

In [0]:
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": os.getenv("AZURE_CLIENT_ID"),  
    "fs.azure.account.oauth2.client.secret": os.getenv("AZURE_CLIENT_SECRET"), 
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{os.getenv('AZURE_TERNANT_ID')}/oauth2/v2.0/token" 
}
container = os.getenv("AZURE_CONTAINER_NAME")
storage_account = os.getenv("AZURE_STORAGE_ACCOUNT")
mount_point = f"/mnt/{os.getenv('AZURE_CONTAINER_NAME')}"   

existing_mounts = [mnt.mountPoint for mnt in dbutils.fs.mounts()]
if mount_point in existing_mounts:
    dbutils.fs.unmount(mount_point)

dbutils.fs.mount(
    source=f"abfss://{container}@{storage_account}.dfs.core.windows.net/",
    mount_point=mount_point,
    extra_configs=configs
)

print(f"Mounted {mount_point} successfully")


/mnt/testsynapsehieum has been unmounted.
Mounted /mnt/testsynapsehieum successfully


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, row_number, max, to_date
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Initialize Spark Session
spark = SparkSession.builder.appName("ETL_Fact_Dim_Tables").getOrCreate()

# Define storage paths
mount_point = "/mnt/testsynapsehieum"
silver_path = f"{mount_point}/silver"
gold_path = f"{mount_point}/gold"

# Load data from Silver Layer
silver_tables = {
    "Employee": spark.read.format("parquet").load(f"{silver_path}/HR.Employee"),
    "Job": spark.read.format("parquet").load(f"{silver_path}/HR.Job"),
    "Department": spark.read.format("parquet").load(f"{silver_path}/HR.Department"),
    "Shift": spark.read.format("parquet").load(f"{silver_path}/HR.Shift"),
    "BusinessTravel": spark.read.format("parquet").load(f"{silver_path}/HR.BusinessTravel"),
    "Location": spark.read.format("parquet").load(f"{silver_path}/HR.Location"),
    "Training": spark.read.format("parquet").load(f"{silver_path}/HR.Training"),
}

# Define Dimension Tables and Surrogate Keys
dim_columns = {
    "BusinessTravel": ["BusinessTravelID", "BusinessTravelOption"],
    "Employee": ["EmployeeID", "FirstName", "LastName", "FullName", "Age", "Gender", "MaritalStatus", "OverTime", "HireDate"],
    "Location": ["LocationID", "State", "PostalCode", "Country"],
    "Job": ["JobID", "JobRole"],
    "Training": ["TrainingID", "TrainingName", "TrainingCostPerHour"],
    "Department": ["DepartmentID", "DepartmentName"],
    "Shift": ["ShiftID", "ShiftName", "Start", "End"]
}

# Function to update SCD Type 2 for a table
def update_scd_type_2(table_name, new_df, key_column):
    table_path = f"{gold_path}/Dim_{table_name}"

    # Check if table exists
    if DeltaTable.isDeltaTable(spark, table_path):
        existing_df = spark.read.format("delta").load(table_path)
        
        # Ensure key column exists
        if f"{table_name}Key" in existing_df.columns:
            max_surrogate = existing_df.agg(max(col(f"{table_name}Key"))).collect()[0][0] or 0
    else:
        existing_df = spark.createDataFrame([], new_df.schema)  # Empty DataFrame

    # Ensure schema consistency
    new_df = new_df.withColumn(key_column, col(key_column).cast("string"))
    existing_df = existing_df.withColumn(key_column, col(key_column).cast("string"))

    # Debugging Schema
    print(f"Processing {table_name} - Schema:")
    new_df.printSchema()

    # Remove duplicates
    new_df = new_df.dropDuplicates([key_column])

    # Left anti join to find new records
    changed_records = new_df.join(existing_df, on=[key_column], how="left_anti") \
                        .withColumn("StartDate", current_timestamp()) \
                        .withColumn("EndDate", to_date(lit("9999-12-31"))) \
                        .withColumn(f"{table_name}Key", col(key_column).cast("int"))  # Keep EmployeeID as Key

    if DeltaTable.isDeltaTable(spark, table_path):
        delta_table = DeltaTable.forPath(spark, table_path)
        delta_table.alias("old").merge(
            changed_records.alias("new"),
            f"old.{key_column} = new.{key_column}"
        ).whenMatchedUpdate(set={
            "EndDate": current_timestamp()
        }).whenNotMatchedInsertAll().execute()
    else:
        changed_records.write.format("delta").mode("overwrite").save(table_path)

# Apply SCD Type 2 for All Dimension Tables
for table, df in silver_tables.items():
    if table in dim_columns:
        update_scd_type_2(table, df.select(*dim_columns[table]), dim_columns[table][0])



Processing Employee - Schema:
root
 |-- EmployeeID: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- FullName: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- HireDate: date (nullable = true)

Processing Job - Schema:
root
 |-- JobID: string (nullable = true)
 |-- JobRole: string (nullable = true)

Processing Department - Schema:
root
 |-- DepartmentID: string (nullable = true)
 |-- DepartmentName: string (nullable = true)

Processing Shift - Schema:
root
 |-- ShiftID: string (nullable = true)
 |-- ShiftName: string (nullable = true)
 |-- Start: integer (nullable = true)
 |-- End: integer (nullable = true)

Processing BusinessTravel - Schema:
root
 |-- BusinessTravelID: string (nullable = true)
 |-- BusinessTravelOption: string (nullable = true)

Processing Location - Schema:
root
 |-- 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, date_format, year, month, dayofmonth, quarter, lit, dayofweek, hour, minute, second, current_timestamp
from datetime import datetime, timedelta
from delta.tables import DeltaTable

# Initialize Spark Session
spark = SparkSession.builder.appName("DimTime").getOrCreate()

# Define storage paths
gold_path = "/mnt/testsynapsehieum/gold"
dim_time_path = f"{gold_path}/Dim_Time"

# Create Date Range
start_date = datetime(1988, 1, 1)
end_date = datetime(2030, 12, 31)

date_list = [(start_date + timedelta(days=x)).strftime('%Y-%m-%d') for x in range((end_date - start_date).days + 1)]
df = spark.createDataFrame([(d,) for d in date_list], ["Date"])

# Convert Date column to date type
df = df.withColumn("Date", col("Date").cast("date"))

# Generate Time Dimension Columns
df_dim_time = df.withColumn("TimeKey", date_format("Date", "yyyyMMdd").cast("int")) \
                .withColumn("Year", year("Date")) \
                .withColumn("Quarter", quarter("Date")) \
                .withColumn("QuarterName", expr("concat('Q', quarter(Date))")) \
                .withColumn("Month", month("Date")) \
                .withColumn("MonthName", date_format("Date", "MMMM")) \
                .withColumn("MonthNameShort", date_format("Date", "MMM")) \
                .withColumn("Day", dayofmonth("Date")) \
                .withColumn("DayOfWeek", dayofweek("Date")) \
                .withColumn("StartYear", expr("make_date(Year, 1, 1)")) \
                .withColumn("EndYear", expr("make_date(Year, 12, 31)")) \
                .withColumn("StartQuarter", expr("date_trunc('QUARTER', Date)")) \
                .withColumn("EndQuarter", expr("last_day(date_add(StartQuarter, 89))")) \
                .withColumn("StartMonth", expr("date_trunc('MONTH', Date)")) \
                .withColumn("EndMonth", expr("last_day(Date)")) \
                .withColumn("IsHoliday", lit(0))  # Default holiday flag

# Check if Delta Table Exists
if DeltaTable.isDeltaTable(spark, dim_time_path):
    delta_table = DeltaTable.forPath(spark, dim_time_path)

    # Perform Incremental Merge
    delta_table.alias("old").merge(
        df_dim_time.alias("new"),
        "old.TimeKey = new.TimeKey"
    ).whenNotMatchedInsertAll().execute()
else:
    # First-time full load
    df_dim_time.write.format("delta").mode("overwrite").save(dim_time_path)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, row_number, current_timestamp
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# Create Fact_Performance_Employee Table
def create_fact_performance_employee():
    dim_employee = spark.read.format("delta").load(f"{gold_path}/Dim_Employee").alias("dim_employee")
    dim_job = spark.read.format("delta").load(f"{gold_path}/Dim_Job").alias("dim_job")
    dim_department = spark.read.format("delta").load(f"{gold_path}/Dim_Department").alias("dim_department")
    dim_shift = spark.read.format("delta").load(f"{gold_path}/Dim_Shift").alias("dim_shift")
    dim_business_travel = spark.read.format("delta").load(f"{gold_path}/Dim_BusinessTravel").alias("dim_business_travel")
    dim_location = spark.read.format("delta").load(f"{gold_path}/Dim_Location").alias("dim_location")

    df = silver_tables["Employee"].alias("emp") \
        .join(dim_employee, col("emp.EmployeeID") == col("dim_employee.EmployeeID"), "left") \
        .join(dim_job, col("emp.JobID") == col("dim_job.JobID"), "left") \
        .join(dim_department, col("emp.DepartmentID") == col("dim_department.DepartmentID"), "left") \
        .join(dim_shift, col("emp.ShiftID") == col("dim_shift.ShiftID"), "left") \
        .join(dim_business_travel, col("emp.BusinessTravelID") == col("dim_business_travel.BusinessTravelID"), "left") \
        .join(dim_location, col("emp.LocationID") == col("dim_location.LocationID"), "left") \
        .select(
            col("dim_employee.EmployeeKey").alias("Employee_Key"),
            col("dim_job.JobKey").alias("Job_Key"),
            col("dim_department.DepartmentKey").alias("Department_Key"),
            col("dim_shift.ShiftKey").alias("Shift_Key"),
            col("dim_business_travel.BusinessTravelKey").alias("BusinessTravel_Key"),
            col("dim_location.LocationKey").alias("Location_Key"),
            col("emp.HireDate").alias("Time_Key"),
            col("emp.MonthlyIncome").alias("Monthly_Salary"),
            col("emp.PerformanceRating").alias("Performance_Score"),
            col("emp.JobInvolvement").alias("Projects_Handled"),
            col("emp.OverTime").alias("Overtime_Hours"),
            col("emp.SickDays").alias("Sick_Days"),
            col("emp.JobSatisfaction"),
            col("emp.YearsAtCompany"),
            col("emp.CurrentWorking"),
            col("emp.JobLevel"),
            col("emp.modified_date"),
            col("emp.is_deleted")
        ).withColumn("FactPerformanceEmployeeKey", row_number().over(Window.orderBy(lit(1))))

    return df

# Incremental Load for Fact_Performance_Employee
fact_performance_employee_table_path = f"{gold_path}/Fact_Performance_Employee"
fact_performance_employee = create_fact_performance_employee()

if DeltaTable.isDeltaTable(spark, fact_performance_employee_table_path):
    delta_table = DeltaTable.forPath(spark, fact_performance_employee_table_path)
    delta_table.alias("old").merge(
        fact_performance_employee.alias("new"),
        "old.Employee_Key = new.Employee_Key AND old.Job_Key = new.Job_Key AND old.Department_Key = new.Department_Key"
    ).whenMatchedUpdate(set={
        "Monthly_Salary": col("new.Monthly_Salary"),
        "Performance_Score": col("new.Performance_Score"),
        "Projects_Handled": col("new.Projects_Handled"),
        "Overtime_Hours": col("new.Overtime_Hours"),
        "Sick_Days": col("new.Sick_Days"),
        "JobSatisfaction": col("new.JobSatisfaction"),
        "YearsAtCompany": col("new.YearsAtCompany"),
        "CurrentWorking": col("new.CurrentWorking"),
        "JobLevel": col("new.JobLevel"),
        "modified_date": col("new.modified_date"),
        "is_deleted": col("new.is_deleted")
    }).whenNotMatchedInsertAll().execute()
else:
    fact_performance_employee.write.format("delta").mode("overwrite").save(fact_performance_employee_table_path)

print("Fact_Performance_Employee processed successfully.")

# Create Fact_HR_Training Table
def create_fact_hr_training():
    dim_employee = spark.read.format("delta").load(f"{gold_path}/Dim_Employee").alias("dim_employee")
    dim_training = spark.read.format("delta").load(f"{gold_path}/Dim_Training").alias("dim_training")
    dim_department = spark.read.format("delta").load(f"{gold_path}/Dim_Department").alias("dim_department")
    dim_location = spark.read.format("delta").load(f"{gold_path}/Dim_Location").alias("dim_location")
    dim_job = spark.read.format("delta").load(f"{gold_path}/Dim_Job").alias("dim_job") 

    df = silver_tables["Employee"].alias("emp") \
        .join(dim_employee, col("emp.EmployeeID") == col("dim_employee.EmployeeID"), "left") \
        .join(dim_training, col("emp.TrainingID") == col("dim_training.TrainingID"), "left") \
        .join(dim_department, col("emp.DepartmentID") == col("dim_department.DepartmentID"), "left") \
        .join(dim_location, col("emp.LocationID") == col("dim_location.LocationID"), "left") \
        .join(dim_job, col("emp.JobID") == col("dim_job.JobID"), "left") \
        .select(
            col("dim_employee.EmployeeKey").alias("Employee_Key"),
            col("dim_training.TrainingKey").alias("Training_Key"),
            col("dim_department.DepartmentKey").alias("Department_Key"),
            col("dim_location.LocationKey").alias("Location_Key"),
            col("dim_job.JobKey").alias("Job_Key"), 
            col("emp.TraingingTime").alias("Time_Key"),
            col("dim_training.TrainingCostPerHour").alias("Training_Cost"),  
            col("emp.TrainingHours"),
            col("emp.JobLevel"),
            col("emp.TrainingOutcome"),
            col("emp.JobSatisfaction"),
            col("emp.modified_date"),
            col("emp.is_deleted")
        ).withColumn("FactHRTrainingKey", row_number().over(Window.orderBy(lit(1))))

    return df

# Incremental Load for Fact_HR_Training
fact_hr_training_table_path = f"{gold_path}/Fact_HR_Training"
fact_hr_training = create_fact_hr_training()

if DeltaTable.isDeltaTable(spark, fact_hr_training_table_path):
    delta_table = DeltaTable.forPath(spark, fact_hr_training_table_path)
    
    delta_table.alias("old").merge(
        fact_hr_training.alias("new"),
        "old.Employee_Key = new.Employee_Key AND old.Training_Key = new.Training_Key AND old.Job_Key = new.Job_Key"  
    ).whenMatchedUpdate(set={
        "Training_Cost": col("new.Training_Cost"),
        "TrainingHours": col("new.TrainingHours"),  
        "JobLevel": col("new.JobLevel"),
        "TrainingOutcome": col("new.TrainingOutcome"),
        "JobSatisfaction": col("new.JobSatisfaction"),
        "modified_date": col("new.modified_date"),
        "is_deleted": col("new.is_deleted")
    }).whenNotMatchedInsert(values={
        "Employee_Key": col("new.Employee_Key"),
        "Training_Key": col("new.Training_Key"),
        "Department_Key": col("new.Department_Key"),
        "Location_Key": col("new.Location_Key"),
        "Job_Key": col("new.Job_Key"),  
        "Time_Key": col("new.Time_Key"),
        "Training_Cost": col("new.Training_Cost"),
        "TrainingHours": col("new.TrainingHours"), 
        "JobLevel": col("new.JobLevel"),
        "TrainingOutcome": col("new.TrainingOutcome"),
        "JobSatisfaction": col("new.JobSatisfaction"),
        "modified_date": col("new.modified_date"),
        "is_deleted": col("new.is_deleted")
    }).execute()
else:
    fact_hr_training.write.format("delta").mode("overwrite").save(fact_hr_training_table_path)

print("Fact_HR_Training processed successfully.")


Fact_Performance_Employee processed successfully.
Fact_HR_Training processed successfully.
