
Employee Salary Imputation Analysis

Problem Statement:
This script addresses missing salary data in employee records by:
1. Calculating department-level average salaries
2. Imputing missing salaries using department averages
3. Providing complete compensation analysis across departments

Data Sources:
- Employee Data: Contains salary information with missing values
- Department Data: Department names and IDs for categorization

Key Enhancements:
- Robust missing data handling using coalesce
- Department-level salary benchmarks
- Clean data integration with proper joins
- Rounded financial figures for reporting

Business Applications:
- Payroll processing with incomplete data
- Departmental budget planning
- Compensation fairness analysis
- HR analytics for salary benchmarking


In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, coalesce, round, col, when, count
from pyspark.sql.types import DecimalType

# Create a Spark session
spark = SparkSession.builder \
    .appName("SalaryDataImputation") \
    .getOrCreate()

# Sample Annual Salary for Azubi employees 
employee_data = [
    (1, "Kingsley", 90000, 10),
    (2, "Richard", None, 20),  # Missing salary
    (3, "Virginia", 80000, 10),
    (4, "Mercy", None, 30),  # Missing salary
    (5, "Irene", 75000, 20),
    (6, "Nathaniel", None, 10), # Missing salary
    (7, "Vanessa", 65000, 30),
    (8, "Robert", 61000, 20),
    (9, "Birago", None, 30),  # Missing salary
    (10, "Yasir", 71000, 20)
]

employee_columns = ["employee_id", "name", "salary", "department_id"]

# Sample data for departments
department_data = [
    (10, "Cloud Architects"),
    (20, "Database Administrators"),
    (30, "Networking Engineers")  
]

department_columns = ["department_id", "department_name"]

# Create DataFrames with explicit decimal precision for financial data
employee_df = spark.createDataFrame(employee_data, employee_columns) \
    .withColumn("salary", col("salary").cast(DecimalType(10,2))
)

department_df = spark.createDataFrame(department_data, department_columns)

# 1. Calculate average salary by department and round it to 2 decimal places
avg_salary_df = employee_df.groupBy("department_id") \
    .agg(round(avg("salary"), 2).alias("avg_salary")) \
    .withColumn("avg_salary", col("avg_salary").cast(DecimalType(10, 2)))

# 2. Join employee DataFrame with department DataFrame
joined_df = employee_df.join(department_df, "department_id", "left") \
    .join(avg_salary_df, "department_id", "left") 


# 3. Fill missing salary values with additional quality checks
filled_df = joined_df.withColumn(
    "salary",
    coalesce(col("salary"), col("avg_salary"))
).select(
    "employee_id", 
    "name", 
    "salary", 
    "department_name",
    "avg_salary"
)
filled_df.show()

# 4. Add salary comparison metrics
final_df = filled_df.withColumn(
    "salary_status",
    when(col("salary") == col("avg_salary"), "Imputed") \
    .otherwise("Original")
)

# Show Comprehensive result
print("=== Complete Salary Analysis ===")
final_df.groupBy("department_name") \
    .agg(
        round(avg("salary"), 2).alias("avg_final_salary"),
        count(when(col("salary_status") == "Imputed", 1)).alias("imputed_count"),
).show()

# Stop the Spark session
spark.stop()



25/04/25 01:39:44 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+-----------+---------+--------+--------------------+----------+
|employee_id|     name|  salary|     department_name|avg_salary|
+-----------+---------+--------+--------------------+----------+
|          1| Kingsley|90000.00|    Cloud Architects|  85000.00|
|          2|  Richard|69000.00|Database Administ...|  69000.00|
|          3| Virginia|80000.00|    Cloud Architects|  85000.00|
|          4|    Mercy|65000.00|Networking Engineers|  65000.00|
|          5|    Irene|75000.00|Database Administ...|  69000.00|
|          6|Nathaniel|85000.00|    Cloud Architects|  85000.00|
|          7|  Vanessa|65000.00|Networking Engineers|  65000.00|
|          8|   Robert|61000.00|Database Administ...|  69000.00|
|          9|   Birago|65000.00|Networking Engineers|  65000.00|
|         10|    Yasir|71000.00|Database Administ...|  69000.00|
+-----------+---------+--------+--------------------+----------+

=== Complete Salary Analysis ===


                                                                                

+--------------------+----------------+-------------+
|     department_name|avg_final_salary|imputed_count|
+--------------------+----------------+-------------+
|    Cloud Architects|        85000.00|            1|
|Database Administ...|        69000.00|            1|
|Networking Engineers|        65000.00|            3|
+--------------------+----------------+-------------+

