In [0]:
# Read CSV from Volume
df = spark.read.csv("/Volumes/test/default/raw_data/Extended_Employee_Performance_and_Productivity_Data.csv", header=True)

# Write to Unity Catalog table
df.write.saveAsTable("test.default.employee_data")

In [0]:
%sql
CREATE VOLUME workspace.default.raw_data;

In [0]:
%sql
SELECT * FROM test.default.employee_data LIMIT 20;

Employee_ID,Department,Gender,Age,Job_Title,Hire_Date,Years_At_Company,Education_Level,Performance_Score,Monthly_Salary,Work_Hours_Per_Week,Projects_Handled,Overtime_Hours,Sick_Days,Remote_Work_Frequency,Team_Size,Training_Hours,Promotions,Employee_Satisfaction_Score,Resigned
1,IT,Male,55,Specialist,2022-01-19 08:03:05.556036,2,High School,5,6750.0,33,32,22,2,0,14,66,0,2.63,False
2,Finance,Male,29,Developer,2024-04-18 08:03:05.556036,0,High School,5,7500.0,34,34,13,14,100,12,61,2,1.72,False
3,Finance,Male,55,Specialist,2015-10-26 08:03:05.556036,8,High School,3,5850.0,37,27,6,3,50,10,1,0,3.17,False
4,Customer Support,Female,48,Analyst,2016-10-22 08:03:05.556036,7,Bachelor,2,4800.0,52,10,28,12,100,10,0,1,1.86,False
5,Engineering,Female,36,Analyst,2021-07-23 08:03:05.556036,3,Bachelor,2,4800.0,38,11,29,13,100,15,9,1,1.25,False
6,IT,Male,43,Manager,2016-08-14 08:03:05.556036,8,High School,3,7800.0,46,31,8,0,100,15,95,0,2.77,False
7,IT,Male,37,Technician,2023-08-28 08:03:05.556036,1,Bachelor,5,5250.0,55,20,29,2,0,16,27,0,4.46,False
8,Engineering,Female,55,Engineer,2014-10-27 08:03:05.556036,9,Bachelor,2,7200.0,42,46,7,8,100,7,64,0,2.09,False
9,Marketing,Female,55,Technician,2023-06-29 08:03:05.556036,1,High School,2,4200.0,51,23,21,14,0,1,0,1,1.44,False
10,Engineering,Female,45,Consultant,2016-12-23 08:03:05.556036,7,Bachelor,1,6050.0,41,33,2,6,75,4,53,2,2.93,False


In [0]:
%sql
-- Create Silver Table with Liquid Clustering for high performance
CREATE TABLE IF NOT EXISTS test.default.employee_performance_silver
CLUSTER BY (Department, Education_Level) -- Modern replacement for partitioning
AS
SELECT
    -- Create a unique surrogate key if one doesn't exist
    cast(Employee_ID as STRING) as employee_id,
    cast(Department as STRING) as department,
    cast(Gender as STRING) as gender,
    cast(Education_Level as STRING) as education_level,
    -- Handle numeric types and nulls
    coalesce(cast(Performance_Score as FLOAT), 0) as productivity_score,
    coalesce(cast(Overtime_Hours as FLOAT), 0) as overtime_hours,
    cast(Years_At_Company as INT) as tenure_years,
    cast(Remote_Work_Frequency as FLOAT) as remote_freq,
    -- Audit Metadata
    current_timestamp() as _ingested_at,
    'Kaggle_Source' as _source_sys
FROM test.default.employee_data
WHERE Employee_ID IS NOT NULL;

-- Add a Constraint (Professional touch)
ALTER TABLE test.default.employee_performance_silver 
ADD CONSTRAINT productivity_range CHECK (productivity_score >= 0 AND productivity_score <= 100);

In [0]:
%sql
-- VIEW 1: Executive Summary (Aggregated KPIs)
CREATE OR REPLACE VIEW test.default.gold_executive_summary AS
SELECT 
    department,
    count(employee_id) as head_count,
    round(avg(productivity_score), 2) as avg_dept_productivity,
    round(avg(remote_freq), 2) as avg_remote_adoption,
    sum(overtime_hours) as total_ot_hours
FROM test.default.employee_performance_silver
GROUP BY department;

-- VIEW 2: Burnout Risk Analysis (Using Window Functions)
-- This identifies people working 20% more OT than their department average
CREATE OR REPLACE VIEW test.default.gold_burnout_risk AS
WITH DeptAvg AS (
    SELECT *,
           avg(overtime_hours) OVER(PARTITION BY department) as dept_avg_ot
    FROM test.default.employee_performance_silver
)
SELECT 
    employee_id,
    department,
    productivity_score,
    overtime_hours,
    dept_avg_ot,
    CASE 
        WHEN overtime_hours > (dept_avg_ot * 1.2) AND productivity_score > 85 THEN 'High Performer / High Risk'
        WHEN overtime_hours > (dept_avg_ot * 1.2) THEN 'High Risk'
        ELSE 'Stable'
    END as risk_status
FROM DeptAvg;

-- VIEW 3: Efficiency & Prediction Gap
-- Note: Replace 'predicted_score' with your actual ML prediction column name
CREATE OR REPLACE VIEW test.default.gold_performance_gap AS
SELECT 
    employee_id,
    department,
    tenure_years,
    productivity_score as actual_score,
    -- Simulating a prediction if you haven't run AutoML yet; 
    -- replace this with your actual ML table join later
    (productivity_score * 0.95) as predicted_score, 
    (productivity_score - (productivity_score * 0.95)) as prediction_variance
FROM test.default.employee_performance_silver;

In [0]:
%sql
CREATE VOLUME test.default.mlflow_tmp;

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from mlflow.models import infer_signature

# 1. Start MLflow Run
mlflow.set_registry_uri("databricks-uc") # Use Unity Catalog for model governance
model_name = "test.default.productivity_prediction_model"

with mlflow.start_run(run_name="Employee_Productivity_RF"):
    # 2. Prepare Data
    df = spark.read.table("test.default.employee_performance_silver")
    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

    # 3. Feature Engineering Stages
    # Convert Department (string) to numerical index
    indexer = StringIndexer(inputCol="department", outputCol="dept_index", handleInvalid="keep")
    
    # Combine all features into a single vector
    feature_cols = ["dept_index", "overtime_hours", "tenure_years", "remote_freq"]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="unscaled_features")
    
    # Scale features (Standard practice for better model performance)
    scaler = StandardScaler(inputCol="unscaled_features", outputCol="features")

    # 4. Define Model (Random Forest Regressor)
    rf = RandomForestRegressor(featuresCol="features", labelCol="productivity_score", numTrees=10)

    # 5. Build and Train Pipeline
    pipeline = Pipeline(stages=[indexer, assembler, scaler, rf])
    model = pipeline.fit(train_df)

    # 6. Log Model and Metrics
    predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(labelCol="productivity_score", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    
    signature = infer_signature(train_df.toPandas(), predictions.select("prediction").toPandas())
    mlflow.log_metric("rmse", rmse)
    mlflow.spark.log_model(model, "model", registered_model_name=model_name, dfs_tmpdir="/Volumes/test/default/mlflow_tmp/", signature=signature)
    
    print(f"Model trained with RMSE: {rmse}")



Uploading artifacts:   0%|          | 0/44 [00:00<?, ?it/s]

Successfully registered model 'test.default.productivity_prediction_model'.


Uploading artifacts:   0%|          | 0/44 [00:00<?, ?it/s]

Created version '1' of model 'test.default.productivity_prediction_model'.


Model trained with RMSE: 1.4200575830613325


In [0]:
import os
import mlflow

# Set UC volume path for MLflow SparkML model loading
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/test/default/mlflow_tmp/"

# Load the model from Unity Catalog using a valid alias
model_name = "test.default.productivity_prediction_model"
model_uri = f"models:/{model_name}@champion"  # Use the actual alias assigned in Model Registry
model = mlflow.pyfunc.load_model(model_uri=model_uri)

# Load Silver data as Pandas DataFrame (select only features used for training)
silver_df = spark.read.table("test.default.employee_performance_silver")
pdf = silver_df.toPandas()

# Apply the model
pdf["predicted_score"] = model.predict(pdf)

# Add back the employee_id and other columns if needed
full_pdf = silver_df.toPandas()
full_pdf["predicted_score"] = pdf["predicted_score"]

# Save results back to Spark DataFrame
result_df = spark.createDataFrame(full_pdf)
result_df.write.mode("overwrite").saveAsTable("test.default.gold_ml_productivity_insights")



Downloading artifacts:   0%|          | 0/44 [00:00<?, ?it/s]

In [0]:
%sql
select * from test.default.gold_ml_productivity_insights

employee_id,department,gender,education_level,productivity_score,overtime_hours,tenure_years,remote_freq,_ingested_at,_source_sys,predicted_score
65554,Engineering,Male,High School,5.0,9.0,0,25.0,2026-01-14T05:57:54.535Z,Kaggle_Source,3.00247821061241
65612,Engineering,Male,High School,4.0,20.0,1,100.0,2026-01-14T05:57:54.535Z,Kaggle_Source,2.972402161183457
65663,Engineering,Female,High School,3.0,8.0,6,50.0,2026-01-14T05:57:54.535Z,Kaggle_Source,3.009027827064463
65710,Engineering,Female,High School,4.0,21.0,2,75.0,2026-01-14T05:57:54.535Z,Kaggle_Source,3.0080140929649604
65727,Engineering,Male,High School,3.0,4.0,1,100.0,2026-01-14T05:57:54.535Z,Kaggle_Source,2.994980619230786
65744,Engineering,Male,High School,1.0,15.0,8,75.0,2026-01-14T05:57:54.535Z,Kaggle_Source,3.0043686317462037
65769,HR,Male,PhD,2.0,23.0,2,25.0,2026-01-14T05:57:54.535Z,Kaggle_Source,2.9999065464022445
65821,HR,Male,PhD,5.0,14.0,3,0.0,2026-01-14T05:57:54.535Z,Kaggle_Source,3.005108411460529
65828,Engineering,Male,High School,5.0,3.0,8,100.0,2026-01-14T05:57:54.535Z,Kaggle_Source,2.985101056888783
65841,Engineering,Female,High School,2.0,25.0,4,75.0,2026-01-14T05:57:54.535Z,Kaggle_Source,3.0180831432011144


In [0]:
%sql
CREATE OR REPLACE VIEW test.default.gold_performance_insights AS
SELECT 
    e.employee_id,
    e.department,
    e.tenure_years,
    e.remote_freq,
    e.overtime_hours,
    e.productivity_score AS actual_productivity,
    m.predicted_score,
    -- Calculate the 'Model Variance' (Actual vs. Expected)
    (e.productivity_score - m.predicted_score) AS performance_gap,
    -- Categorize performance based on ML expectation
    CASE 
        WHEN (e.productivity_score - m.predicted_score) > 2 THEN 'Over-performer'
        WHEN (e.productivity_score - m.predicted_score) < -2 THEN 'Under-performer'
        ELSE 'Meeting Expectations'
    END AS ml_performance_status,
    -- Add a ranking for 'Top Talents' within each department
    RANK() OVER (PARTITION BY e.department ORDER BY e.productivity_score DESC) as dept_rank
FROM test.default.employee_performance_silver e
JOIN test.default.gold_ml_productivity_insights m ON e.employee_id = m.employee_id;

COMMENT ON TABLE test.default.gold_performance_insights IS 'Unified BI layer combining actual performance data with ML-predicted benchmarks.';

In [0]:
%sql
select * from test.default.gold_performance_insights where department='HR' limit 60 

employee_id,department,tenure_years,remote_freq,overtime_hours,actual_productivity,predicted_score,performance_gap,ml_performance_status,dept_rank
40,HR,1,100.0,2.0,5.0,2.996400890750029,2.003599109249971,Meeting Expectations,1
132,HR,0,25.0,28.0,5.0,3.025912701731231,1.9740872982687687,Meeting Expectations,1
162,HR,4,0.0,20.0,5.0,2.9813237120413723,2.018676287958628,Meeting Expectations,1
198,HR,3,75.0,22.0,5.0,3.0064540528748216,1.9935459471251784,Meeting Expectations,1
227,HR,9,25.0,18.0,5.0,3.047828726680035,1.9521712733199648,Meeting Expectations,1
245,HR,4,100.0,7.0,5.0,3.026387592044235,1.9736124079557649,Meeting Expectations,1
441,HR,7,100.0,18.0,5.0,3.024246779812416,1.975753220187584,Meeting Expectations,1
506,HR,8,75.0,2.0,5.0,3.008266493368164,1.991733506631836,Meeting Expectations,1
583,HR,9,75.0,25.0,5.0,3.0404364345425456,1.9595635654574544,Meeting Expectations,1
612,HR,1,100.0,14.0,5.0,3.00324878278371,1.99675121721629,Meeting Expectations,1
