In [1]:
pip install pyspark




<h3>Step 1: Start SparkSession</h3>

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HRAnalyticsPipline").getOrCreate()
spark

<h3>Step 2: Import Datasets for HR </h3>

In [4]:
hr_df = spark.read.csv("HR_dataset.csv", header=True, inferSchema=True)
hr_df.show(5)

+----------+---------+--------+---------+--------+--------------------+---------------+--------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+----------------+-------------------+-----------------------+-------------+
|Unnamed: 0|FirstName|LastName|StartDate|ExitDate|               Title|     Supervisor|             ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Ra

<h3>Step 3: Perform cleaning for HR Dataset</h3>

In [5]:
#a. view missing values
from pyspark.sql.functions import col, sum, when
# Count nulls per column
def missing_report(df):
    return df.select([
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df.columns
    ])
missing_report(hr_df).show()

+----------+---------+--------+---------+--------+-----+----------+-------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+--------------+--------+---+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+--------+-------+-----------------------+-------------+
|Unnamed: 0|FirstName|LastName|StartDate|ExitDate|Title|Supervisor|ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|DepartmentType|Division|DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|Employee ID|Survey Date|Engagement Score|Satisfaction Score|Work-Life Balance Score|Training Date|Training Program Name|Train

This generates a full list of missing-value table.

We decided to leave as it is. No need to fill ExitDate with NULL.

In [6]:
#b. create a clean copy
hr_clean = hr_df

<p>Reason:<br>
This will prevent overwriting raw data</p>

In [7]:
#c. handling the missing values
from pyspark.sql.functions import col
# Replace missing termination description with "Not Applicable"
hr_clean = hr_clean.fillna({"TerminationDescription": "Not Applicable"})

print("Missing Values After Cleaning:")
missing_report(hr_clean).show(truncate=False)

Missing Values After Cleaning:
+----------+---------+--------+---------+--------+-----+----------+-------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+--------------+--------+---+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+--------+-------+-----------------------+-------------+
|Unnamed: 0|FirstName|LastName|StartDate|ExitDate|Title|Supervisor|ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|DepartmentType|Division|DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|Employee ID|Survey Date|Engagement Score|Satisfaction Score|Work-Life Balance Score|Training D

<p>We identified missing values in ExitDate and TerminationDescription.</p>
<p>- Missing ExitDate values indicate that an employee is still active, therefore these nulls were intentionally preserved and later used to compute the TurnoverFlag.</br>
- Missing TerminationDescription appears only for active employees because no termination reason exists. These were replaced with "Not Applicable" to prevent issues during reporting and segmentation..</p>

<h3>Step 4: Perform the Data Transformation for HR</h3>

In [8]:
#a. clean column names by removing spaces and special characters
import re
def clean_column_name(col):
    col = col.replace(" ", "_")
    col = col.replace("-", "_")
    col = re.sub(r"[()]", "", col)
    return col
for col_name in hr_clean.columns:
    hr_clean = hr_clean.withColumnRenamed(col_name, clean_column_name(col_name))
print("After cleaning column names:")
hr_clean.printSchema()

After cleaning column names:
root
 |-- Unnamed:_0: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- ADEmail: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- TerminationType: string (nullable = true)
 |-- TerminationDescription: string (nullable = false)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDe

<p>Reason:<br>
To improve pipeline transformations and avoid Spark conflicts when referencing column name, all column names were normalized into snake_case and cleaned of special characters</p>

In [9]:
#b. convert date columns to correct date fields
from pyspark.sql.functions import to_date, col
date_formats = {
    "StartDate": "dd-MMM-yy",
    "ExitDate": "dd-MMM-yy",
    "DOB": "dd-MM-yyyy",
    "Training_Date": "dd-MMM-yy",
    "Survey_Date": "dd-MM-yyyy"
}
for c, fmt in date_formats.items():
    if c in hr_clean.columns:
        hr_clean = hr_clean.withColumn(c, to_date(col(c), fmt))
print("After converting date columns:")
hr_clean.select("StartDate", "ExitDate", "DOB", "Training_Date", "Survey_Date").show(5)

After converting date columns:
+----------+--------+----------+-------------+-----------+
| StartDate|ExitDate|       DOB|Training_Date|Survey_Date|
+----------+--------+----------+-------------+-----------+
|2019-09-20|    NULL|1969-10-07|   2023-07-15| 2023-01-14|
|2023-02-11|    NULL|1965-08-30|   2022-09-12| 2022-09-09|
|2018-12-10|    NULL|1991-10-06|   2022-08-13| 2023-05-27|
|2021-06-21|    NULL|1998-04-04|   2022-12-15| 2023-06-16|
|2019-06-29|    NULL|1969-08-29|   2023-07-13| 2022-11-25|
+----------+--------+----------+-------------+-----------+
only showing top 5 rows


<p>Reason:<br>
Date fields were converted from string to date types to allow calculations for tenure, age, training timelines and other time-based analytics</p>

In [10]:
#c. create new column for age
from pyspark.sql.functions import months_between, current_date, floor
hr_clean = hr_clean.withColumn("Age", floor(months_between(current_date(), col("DOB")) / 12))
print("Age column sample:")
hr_clean.select("DOB", "Age").show(5)

Age column sample:
+----------+---+
|       DOB|Age|
+----------+---+
|1969-10-07| 56|
|1965-08-30| 60|
|1991-10-06| 34|
|1998-04-04| 27|
|1969-08-29| 56|
+----------+---+
only showing top 5 rows


In [11]:
#d. create new column for agegroup
from pyspark.sql.functions import when, col
hr_clean = hr_clean.withColumn(
    "AgeGroup",
    when(col("Age") < 30, "Under 30")
    .when((col("Age") >= 30) & (col("Age") <= 39), "30-39")
    .when((col("Age") >= 40) & (col("Age") <= 49), "40-49")
    .when((col("Age") >= 50) & (col("Age") <= 59), "50-59")
    .otherwise("60+")
)
hr_clean.groupBy("AgeGroup").count().show()

+--------+-----+
|AgeGroup|count|
+--------+-----+
|   30-39|  550|
|     60+| 1315|
|   40-49|  512|
|Under 30|  316|
|   50-59|  457|
+--------+-----+



<p>Reason:<br>
- DOB is not useful for analytics, however we can calculate the age of employees<br>
- AgeGroup is required for workforce segmentation</p>

In [12]:
#e. create new column for tenure(in years)
hr_clean = hr_clean.withColumn("Tenure", floor(months_between(current_date(), col("StartDate")) / 12))
print("Tenure column sample:")
hr_clean.select("StartDate", "Tenure").show(5)

Tenure column sample:
+----------+------+
| StartDate|Tenure|
+----------+------+
|2019-09-20|     6|
|2023-02-11|     2|
|2018-12-10|     6|
|2021-06-21|     4|
|2019-06-29|     6|
+----------+------+
only showing top 5 rows


<p>Reason:<br>
- The StartDate alone isn’t actionable. We have created a tenure column which is needed for workforce analysis<br>
- Helps evaluate employee stability and retention</p>

In [13]:
#f. turnover flag for active or left employees
from pyspark.sql.functions import when, col
hr_clean = hr_clean.withColumn(
    "TurnoverFlag",
    when(col("ExitDate").isNull(), 0).otherwise(1)
)
print("Turnover flag sample:")
hr_clean.select("ExitDate", "TurnoverFlag").show(5)

Turnover flag sample:
+--------+------------+
|ExitDate|TurnoverFlag|
+--------+------------+
|    NULL|           0|
|    NULL|           0|
|    NULL|           0|
|    NULL|           0|
|    NULL|           0|
+--------+------------+
only showing top 5 rows


<p>Reason:<br>
- The Turnover is currently 0 for active employees and 1 for left employees.<br>
- Required for turnover analysis by department, gender, race, etc</p>

In [14]:
#g. convert numeric columns
numeric_cols = [
    "Training_DurationDays",
    "Training_Cost",
    "Engagement_Score",
    "Satisfaction_Score",
    "Work_Life_Balance_Score",
    "Current_Employee_Rating"
]
for c in numeric_cols:
    if c in hr_clean.columns:
        hr_clean = hr_clean.withColumn(c, col(c).cast("double"))

<p>Reason:<br>
- Numeric attributes were cast to double precision to support analytic operations, aggregations and model-friendly formats<br>
- Required for calculations, averages, correlations and ML</p>

In [15]:
#h. drop useless columns
cols_to_drop = ["Unnamed:_0", "ADEmail", "Supervisor", "Trainer"]
hr_clean = hr_clean.drop(*cols_to_drop)

<p>Reason:<br>
We are deleting these columns because they do not contribute to the analytical goals of the project. Below is a breakdown:<br>
- Unnamed:_0 is an index generated during CSV export and has no semantic meaning<br>
- ADEmail contains personal identifiers that do not support analysis and should not be included for privacy reasons<br>
- Supervisor and Trainer contain high-cardinality values that are not required for the KPIs or transformations in this pipeline and retaining them introduces unnecessary noise</p>

In [16]:
print("Final cleaned data for HR data:")
hr_clean.show(5, truncate=False)

Final cleaned data for HR data:
+---------+--------+----------+--------+-----------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+----------------------+-------------+----------------+----------------+---------------------+-------------+---+--------+------+------------+
|FirstName|LastName|StartDate |ExitDate|Title                  |BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|DepartmentType   |Division            |DOB       |State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance_Score|Current_Employee_Rating|Employee_ID|Survey_Date|Engagement_Scor

<h3>Step 6: Import Datasets for HR</h3>

In [17]:
pay_df = spark.read.csv("payroll-2025.csv", header=True, inferSchema=True)
pay_df.show(5, truncate=False)

+----------------------------------------+----------------------------------------+----------+------+-------------------------+---------+-----------+----------------------+---------------+-------------+----------------------+---------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+
|Hash1                                   |Hash2                                   |Birth Year|Gender|Race                     |Pay Grade|FLSA Status|EEOJ Code             |Employment Type|Department ID|Department Name       |Status   |Year|Date Position Began|Date Position Ended|Annual Base Salary|Gross Pay|Base Pay|Overtime Pay|Other Pay|
+----------------------------------------+----------------------------------------+----------+------+-------------------------+---------+-----------+----------------------+---------------+-------------+----------------------+---------+----+-------------------+-------------------+------------------+---------+-------

<h3>Step 7: Perform Cleaning for Payroll dataset</h3>

In [18]:
#a. create clean copy
pay_clean = pay_df

<p>Reason:<br>
This is to keep pay_df, which is the original dataset and avoid overwriting it during cleaning</p>

In [19]:
#b. view missing values for payroll
from pyspark.sql.functions import col, sum, when
# Count nulls per column
def missing_report(df):
    return df.select([
        sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
        for c in df.columns
    ])
missing_report(pay_clean).show()

+-----+-----+----------+------+----+---------+-----------+---------+---------------+-------------+---------------+------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+
|Hash1|Hash2|Birth Year|Gender|Race|Pay Grade|FLSA Status|EEOJ Code|Employment Type|Department ID|Department Name|Status|Year|Date Position Began|Date Position Ended|Annual Base Salary|Gross Pay|Base Pay|Overtime Pay|Other Pay|
+-----+-----+----------+------+----+---------+-----------+---------+---------------+-------------+---------------+------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+
|    0|    0|         0|     0|   0|        0|          0|        0|              0|            0|              0|     0|   0|                  0|              22462|                 0|        0|       0|           0|        0|
+-----+-----+----------+------+----+---------+-----------+---------+---------------+----

In [20]:
from pyspark.sql.functions import when, col
pay_clean = pay_clean.withColumn(
    "PositionEndFlag",
    when(col("Date Position Ended").isNull(), "Not Ended").otherwise("Ended")
)
missing_report(pay_clean).show(truncate=False)

+-----+-----+----------+------+----+---------+-----------+---------+---------------+-------------+---------------+------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+---------------+
|Hash1|Hash2|Birth Year|Gender|Race|Pay Grade|FLSA Status|EEOJ Code|Employment Type|Department ID|Department Name|Status|Year|Date Position Began|Date Position Ended|Annual Base Salary|Gross Pay|Base Pay|Overtime Pay|Other Pay|PositionEndFlag|
+-----+-----+----------+------+----+---------+-----------+---------+---------------+-------------+---------------+------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+---------------+
|0    |0    |0         |0     |0   |0        |0          |0        |0              |0            |0              |0     |0   |0                  |22462              |0                 |0        |0       |0           |0        |0              |
+-----+-----+----------+

<p>As you can see, there is Date Position Ended column with missing values. These null values are intentional because they represent employees whose job positions were still active at the time of data extraction.</br>
Since this information is meaningful, the null values were preserved and used to derive an additional flag to support retention and workforce stability analysis.</p>

<h3>Step 8: Perform the Data Transformation for Payroll Dataset</h3>

In [21]:
#a. clean the column names 
import re
def clean_column_name(col):
    col = col.replace(" ", "_")
    col = col.replace("-", "_")
    col = re.sub(r"[()]", "", col)
    return col
for col_name in pay_clean.columns:
    pay_clean = pay_clean.withColumnRenamed(col_name, clean_column_name(col_name))
print("After cleaning column names:")
pay_clean.printSchema()

After cleaning column names:
root
 |-- Hash1: string (nullable = true)
 |-- Hash2: string (nullable = true)
 |-- Birth_Year: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Pay_Grade: string (nullable = true)
 |-- FLSA_Status: string (nullable = true)
 |-- EEOJ_Code: string (nullable = true)
 |-- Employment_Type: string (nullable = true)
 |-- Department_ID: integer (nullable = true)
 |-- Department_Name: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Date_Position_Began: string (nullable = true)
 |-- Date_Position_Ended: string (nullable = true)
 |-- Annual_Base_Salary: double (nullable = true)
 |-- Gross_Pay: double (nullable = true)
 |-- Base_Pay: double (nullable = true)
 |-- Overtime_Pay: double (nullable = true)
 |-- Other_Pay: double (nullable = true)
 |-- PositionEndFlag: string (nullable = false)



<p>Reason:<br>
To avoid Spark errors and ensure consistent naming for transformations and joins</p>

In [22]:
#b. convert date fields
from pyspark.sql.functions import to_date, col
date_formats = {
    "Date_Position_Began": "dd/MM/yyyy",
    "Date_Position_Ended": "dd/MM/yyyy"
}
for c, fmt in date_formats.items():
    if c in pay_clean.columns:
        pay_clean = pay_clean.withColumn(c, to_date(col(c), fmt))
print("After converting date columns:")
pay_clean.select("Date_Position_Began", "Date_Position_Ended").show(5)


After converting date columns:
+-------------------+-------------------+
|Date_Position_Began|Date_Position_Ended|
+-------------------+-------------------+
|         2025-01-01|         2025-02-01|
|         2025-01-01|         2025-02-01|
|         2025-01-01|         2025-02-01|
|         2025-01-01|         2025-02-01|
|         2025-01-01|         2025-02-01|
+-------------------+-------------------+
only showing top 5 rows


<p>Reason:<br>
Convert date strings into valid date types to calculate tenure and length of service</p>

In [23]:
#c. clean the race column
from pyspark.sql.functions import when, col
pay_clean = pay_clean.withColumn(
    "Race",
    when(col("Race") == "Asian", "Asian")
    .when(col("Race") == "Black or African American", "Black")
    .when(col("Race") == "Hispanic/Latino", "Hispanic")
    .when(col("Race") == "White", "White")
    .otherwise("Other")
)
print("After cleaning the race column:")
pay_clean.select("Race").show(10, truncate=False)

After cleaning the race column:
+--------+
|Race    |
+--------+
|Black   |
|White   |
|Black   |
|Black   |
|Black   |
|Asian   |
|Black   |
|Black   |
|Hispanic|
|White   |
+--------+
only showing top 10 rows


<p>Reason:<br>
Race column was standardised into simplified, analysis-friendly categories. This is necessary because the original dataset included long and inconsistent labels such as “Black or African American” and “Hispanic/Latino”. A new field Race_Clean was created to group race values into consistent categories including “Black”, “Hispanic”, “Asian”, “White”, and “Other”.</p>

In [24]:
#d. convert numeric columns
numeric_cols = [
    "Birth_Year",
    "Pay_Grade",
    "Annual_Base_Salary",
    "Gross_Pay",
    "Base_Pay",
    "Overtime_Pay",
    "Other_Pay"
]
for c in numeric_cols:
    if c in pay_clean.columns:
        pay_clean = pay_clean.withColumn(c, col(c).cast("double"))

<p>Reason:<br>
These columns must be numeric for aggregations, analytics and creating KPIs like total compensation</p>

In [25]:
#e. create a new column for years of service
from pyspark.sql.functions import current_date, months_between, floor
pay_clean = pay_clean.withColumn(
    "YearsOfService",
    floor(months_between(current_date(), col("Date_Position_Began")) / 12)
)
print("Years of Service sample:")
pay_clean.select("Date_Position_Began", "YearsOfService").show(5)

Years of Service sample:
+-------------------+--------------+
|Date_Position_Began|YearsOfService|
+-------------------+--------------+
|         2025-01-01|             0|
|         2025-01-01|             0|
|         2025-01-01|             0|
|         2025-01-01|             0|
|         2025-01-01|             0|
+-------------------+--------------+
only showing top 5 rows


<p>Reason:<br>
This supports workforce finance analytics such as salary growth vs. years of service</p>

In [26]:
#f. create a new column for total compensation
pay_clean = pay_clean.withColumn(
    "TotalCompensation",
    col("Base_Pay") + col("Overtime_Pay") + col("Other_Pay")
)
print("Total compensation sample:")
pay_clean.select("Base_Pay", "Overtime_Pay", "Other_Pay", "TotalCompensation").show(5)

Total compensation sample:
+--------+------------+---------+------------------+
|Base_Pay|Overtime_Pay|Other_Pay| TotalCompensation|
+--------+------------+---------+------------------+
| 1192.32|         0.0|    94.29|           1286.61|
| 1812.91|         0.0|     77.7|1890.6100000000001|
| 3166.68|         0.0| 94842.41|          98009.09|
|  1090.2|       290.3|   797.88|           2178.38|
| 1571.92|         0.0| 19156.29|          20728.21|
+--------+------------+---------+------------------+
only showing top 5 rows


<p>Reason:<br>
This gives a realistic financial metric of earnings and not just base salary</p>

In [27]:
#g. create a new column for pay grade category
from pyspark.sql.functions import when
pay_clean = pay_clean.withColumn(
    "PayGradeCategory",
    when(col("Pay_Grade") <= 10, "Low")
    .when((col("Pay_Grade") > 10) & (col("Pay_Grade") <= 20), "Mid")
    .otherwise("High")
)
pay_clean.select("Pay_Grade", "PayGradeCategory").show(5)

+---------+----------------+
|Pay_Grade|PayGradeCategory|
+---------+----------------+
|     15.0|             Mid|
|     20.0|             Mid|
|     27.0|            High|
|      6.0|             Low|
|     13.0|             Mid|
+---------+----------------+
only showing top 5 rows


<p>Reason:<br>
Payment grade helps compare compensation across departments and demographics</p>

In [28]:
#h. create a new column for employment status flag
pay_clean = pay_clean.withColumn(
    "IsActive",
    when(col("Status") == "Active", 1).otherwise(0)
)
print("Employment active status sample:")
pay_clean.select("Status", "IsActive").show(5)

Employment active status sample:
+---------+--------+
|   Status|IsActive|
+---------+--------+
|Withdrawn|       0|
|Withdrawn|       0|
|Withdrawn|       0|
|Withdrawn|       0|
|Withdrawn|       0|
+---------+--------+
only showing top 5 rows


<p>Reason:<br>
Required for turnover and workforce stability analytics</p>

In [29]:
#i. create Age in Payroll dataset
from pyspark.sql.functions import col
current_year = 2025
pay_clean = pay_clean.withColumn(
    "Age",
    current_year - col("Birth_Year")
)
pay_clean.select("Birth_Year", "Age").show(10)

+----------+----+
|Birth_Year| Age|
+----------+----+
|    1955.0|70.0|
|    1955.0|70.0|
|    1963.0|62.0|
|    1973.0|52.0|
|    1959.0|66.0|
|    1958.0|67.0|
|    1959.0|66.0|
|    1948.0|77.0|
|    1960.0|65.0|
|    1954.0|71.0|
+----------+----+
only showing top 10 rows


In [30]:
#j. create AgeGroup
from pyspark.sql.functions import when
pay_clean = pay_clean.withColumn(
    "AgeGroup",
    when(col("Age") < 30, "Under 30")
    .when((col("Age") >= 30) & (col("Age") <= 39), "30-39")
    .when((col("Age") >= 40) & (col("Age") <= 49), "40-49")
    .when((col("Age") >= 50) & (col("Age") <= 59), "50-59")
    .otherwise("60+")
)
pay_clean.groupBy("AgeGroup").count().show()

+--------+-----+
|AgeGroup|count|
+--------+-----+
|   30-39| 6401|
|     60+| 3569|
|   40-49| 6437|
|Under 30| 4572|
|   50-59| 6230|
+--------+-----+



In [31]:
#k. drop useless columns
cols_to_drop = ["Hash1", "Hash2", "Department_ID"]
pay_clean = pay_clean.drop(*cols_to_drop)

<p>Reason:<br>
- Hash1 and Hash2 are hashed IDs and do not match the HR dataset</br>
- Department ID are internal codes used only inside the Houston payroll system</br>
- They cannot be used for merging</br>
- They contribute no analytical value</p>

In [32]:
print("Final cleaned payroll data:")
pay_clean.show(5, truncate=False)

Final cleaned payroll data:
+----------+------+-----+---------+-----------+----------------------+---------------+----------------------+---------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+---------------+--------------+------------------+----------------+--------+----+--------+
|Birth_Year|Gender|Race |Pay_Grade|FLSA_Status|EEOJ_Code             |Employment_Type|Department_Name       |Status   |Year|Date_Position_Began|Date_Position_Ended|Annual_Base_Salary|Gross_Pay|Base_Pay|Overtime_Pay|Other_Pay|PositionEndFlag|YearsOfService|TotalCompensation |PayGradeCategory|IsActive|Age |AgeGroup|
+----------+------+-----+---------+-----------+----------------------+---------------+----------------------+---------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+---------------+--------------+------------------+----------------+--------+----+--------+
|1955.0    |Female|Black

<h3>Step 9: Perform Data Transformation (Cont'd) for Departments</h3>

In [33]:
hr_clean.select("DepartmentType").distinct().show(100, truncate=False)

+--------------------+
|DepartmentType      |
+--------------------+
|Sales               |
|Production          |
|Admin Offices       |
|Executive Office    |
|Software Engineering|
|IT/IS               |
+--------------------+



In [34]:
pay_clean.select("Department_Name").distinct().show(200, truncate=False)

+-----------------------------+
|Department_Name              |
+-----------------------------+
|Houston Emergency Center     |
|Library                      |
|Municipal Courts             |
|Health & Human Services      |
|Houston Airport System       |
|Information Technology       |
|Finance                      |
|Mayor's Office               |
|Solid Waste Management       |
|Houston Public Works         |
|Business Opportunity         |
|Police                       |
|Parks & Recreation           |
|Fleet Management             |
|Legal                        |
|General Services             |
|Admin. & Regulatory Affairs  |
|City Secretary               |
|Housing & Community Devlpmnt.|
|City Council                 |
|Fire                         |
|Planning & Development       |
|Department of Neighborhoods  |
|Controllers                  |
|Human Resources              |
+-----------------------------+



In [35]:
#Create the mapping table for HR Department Type
from pyspark.sql.functions import when, col
hr_with_cat = hr_clean.withColumn(
    "DeptCategory",
    when(col("DepartmentType") == "Sales", "Operations")
    .when(col("DepartmentType") == "Production", "Operations")
    .when(col("DepartmentType") == "Admin Offices", "Corporate")
    .when(col("DepartmentType") == "Executive Office", "Corporate")
    .when(col("DepartmentType") == "Software Engineering", "Technology")
    .when(col("DepartmentType") == "IT/IS", "Technology")
    .otherwise("Other")
)
# Optional: quick sanity check
hr_with_cat.groupBy("DepartmentType", "DeptCategory").count().show()

+--------------------+------------+-----+
|      DepartmentType|DeptCategory|count|
+--------------------+------------+-----+
|Software Engineering|  Technology|  121|
|       Admin Offices|   Corporate|   85|
|               IT/IS|  Technology|  459|
|               Sales|  Operations|  345|
|    Executive Office|   Corporate|   25|
|   Production       |       Other| 2115|
+--------------------+------------+-----+



In [36]:
#Create the mapping table for Payroll Department
pay_with_cat = pay_clean.withColumn(
    "DeptCategory",
    when(col("Department_Name") == "Information Technology", "Technology")
    # Corporate
    .when(col("Department_Name").isin(
        "Finance",
        "Mayor's Office",
        "Admin. & Regulatory Affairs",
        "City Secretary",
        "Controllers",
        "Human Resources",
        "Legal"
    ), "Corporate")
    # Operations
    .when(col("Department_Name").isin(
        "Fleet Management",
        "General Services",
        "Solid Waste Management",
        "Houston Public Works",
        "Houston Airport System",
        "Business Opportunity"
    ), "Operations")
    # Public service
    .when(col("Department_Name").isin(
        "Houston Emergency Center",
        "Library",
        "Municipal Courts",
        "Health & Human Services",
        "Parks & Recreation",
        "Police",
        "Fire",
        "Planning & Development",
        "Department of Neighborhoods",
        "Housing & Community Devlpmnt.",
        "City Council"
    ), "Public Service")
    .otherwise("Other")
)
# Optional: sanity check
pay_with_cat.groupBy("Department_Name", "DeptCategory").count().show()

+--------------------+--------------+-----+
|     Department_Name|  DeptCategory|count|
+--------------------+--------------+-----+
|Houston Emergency...|Public Service|  268|
|Admin. & Regulato...|     Corporate|  550|
|  Parks & Recreation|Public Service| 1150|
|Solid Waste Manag...|    Operations|  503|
|        City Council|Public Service|  124|
|Planning & Develo...|Public Service|   99|
|Houston Public Works|    Operations| 4462|
|         Controllers|     Corporate|   82|
|             Finance|     Corporate|  294|
|    Municipal Courts|Public Service|  292|
|      Mayor's Office|     Corporate|  124|
|    Fleet Management|    Operations|  410|
|Health & Human Se...|Public Service| 1325|
|Houston Airport S...|    Operations| 1631|
|Information Techn...|    Technology|  311|
|    General Services|    Operations|  273|
|Business Opportunity|    Operations|   46|
|      City Secretary|     Corporate|   11|
|     Human Resources|     Corporate|  735|
|             Library|Public Ser

In [37]:
print("HR Categories:")
hr_with_cat.select("DeptCategory").distinct().show()
print("Payroll Categories:")
pay_with_cat.select("DeptCategory").distinct().show()

HR Categories:
+------------+
|DeptCategory|
+------------+
|       Other|
|  Technology|
|   Corporate|
|  Operations|
+------------+

Payroll Categories:
+--------------+
|  DeptCategory|
+--------------+
|Public Service|
|    Technology|
|     Corporate|
|    Operations|
+--------------+



<h3>Step 10: Perform Merge</h3>

In [38]:
final_df = hr_with_cat.join(
    pay_with_cat,
    on="DeptCategory",
    how="inner"
)

In [39]:
final_df.show(5, truncate=False)

+------------+---------+--------+----------+----------+------------------------+------------+--------------+------------+-------+--------------------------+---------------+---------------------------------------------+--------------+------------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+-----------------+---------------------+-------------+---+--------+------+------------+----------+------+-----+---------+-----------+----------------------+---------------+--------------------+---------+----+-------------------+-------------------+------------------+---------+--------+------------+---------+---------------+--------------+-----------------+----------------+--------+----+--------+
|DeptCategory|FirstName|LastName|StartDate |ExitDate  |Title     

In [40]:
print("Merged row count:", final_df.count())

Merged row count: 2927395


In [41]:
final_df.columns

['DeptCategory',
 'FirstName',
 'LastName',
 'StartDate',
 'ExitDate',
 'Title',
 'BusinessUnit',
 'EmployeeStatus',
 'EmployeeType',
 'PayZone',
 'EmployeeClassificationType',
 'TerminationType',
 'TerminationDescription',
 'DepartmentType',
 'Division',
 'DOB',
 'State',
 'JobFunctionDescription',
 'GenderCode',
 'LocationCode',
 'RaceDesc',
 'MaritalDesc',
 'Performance_Score',
 'Current_Employee_Rating',
 'Employee_ID',
 'Survey_Date',
 'Engagement_Score',
 'Satisfaction_Score',
 'Work_Life_Balance_Score',
 'Training_Date',
 'Training_Program_Name',
 'Training_Type',
 'Training_Outcome',
 'Location',
 'Training_DurationDays',
 'Training_Cost',
 'Age',
 'AgeGroup',
 'Tenure',
 'TurnoverFlag',
 'Birth_Year',
 'Gender',
 'Race',
 'Pay_Grade',
 'FLSA_Status',
 'EEOJ_Code',
 'Employment_Type',
 'Department_Name',
 'Status',
 'Year',
 'Date_Position_Began',
 'Date_Position_Ended',
 'Annual_Base_Salary',
 'Gross_Pay',
 'Base_Pay',
 'Overtime_Pay',
 'Other_Pay',
 'PositionEndFlag',
 'Years

In [42]:
final_df = final_df.drop("Age", "AgeGroup")

from pyspark.sql.functions import col
final_df = final_df.withColumn("Age", 2025 - col("Birth_Year"))

In [43]:
from pyspark.sql.functions import col
final_df = final_df.withColumn("Age", 2025 - col("Birth_Year"))

In [44]:
from pyspark.sql.functions import when
final_df = final_df.withColumn(
    "AgeGroup",
    when(col("Age") < 20, "Under 20")
    .when((col("Age") >= 20) & (col("Age") < 30), "20-29")
    .when((col("Age") >= 30) & (col("Age") < 40), "30-39")
    .when((col("Age") >= 40) & (col("Age") < 50), "40-49")
    .when((col("Age") >= 50) & (col("Age") < 60), "50-59")
    .otherwise("60+")
)

In [45]:
from collections import Counter
[col for col, count in Counter(final_df.columns).items() if count > 1]

[]

In [46]:
from pyspark.sql.functions import count
table1_demo = final_df.groupBy("AgeGroup", "Gender", "Race").agg(count("*").alias("EmployeeCount"))
table1_demo.show()

+--------+------+--------+-------------+
|AgeGroup|Gender|    Race|EmployeeCount|
+--------+------+--------+-------------+
|   40-49|Female|   White|        27920|
|   40-49|Female|   Black|       137120|
|   40-49|Female|   Asian|        25550|
|   40-49|  Male|Hispanic|       107865|
|   50-59|Female|Hispanic|        55545|
|     60+|Female|   Asian|        19260|
|Under 20|Female|   Black|         7150|
|Under 20|  Male|Hispanic|         3705|
|   20-29|Female|   Other|          330|
|   30-39|Female|   Black|       101530|
|   50-59|  Male|   Other|         5520|
|   20-29|  Male|   White|        14275|
|Under 20|  Male|   Black|         4555|
|   50-59|  Male|Hispanic|       106310|
|Under 20|  Male|   White|          770|
|     60+|  Male|   Black|       169150|
|     60+|  Male|   White|       103880|
|   20-29|  Male|   Asian|        15370|
|   40-49|  Male|   Other|         5865|
|   20-29|  Male|   Other|         1380|
+--------+------+--------+-------------+
only showing top

In [47]:
from pyspark.sql.functions import count, avg, col
table2_engagement = final_df.groupBy(
    "DeptCategory", "AgeGroup", "Gender", "Race").agg(
    avg("Engagement_Score").alias("Avg_Engagement"),
    avg("Satisfaction_Score").alias("Avg_Satisfaction"),
    avg("Work_Life_Balance_Score").alias("Avg_WorkLifeBalance"),
    count("*").alias("EmployeeCount")
)
table2_engagement.show(20, truncate=False)

+------------+--------+------+--------+------------------+------------------+-------------------+-------------+
|DeptCategory|AgeGroup|Gender|Race    |Avg_Engagement    |Avg_Satisfaction  |Avg_WorkLifeBalance|EmployeeCount|
+------------+--------+------+--------+------------------+------------------+-------------------+-------------+
|Technology  |40-49   |Female|Hispanic|3.010344827586207 |3.0620689655172413|3.0086206896551726 |2320         |
|Corporate   |Under 20|Female|Other   |3.0272727272727273|2.6545454545454548|3.190909090909091  |330          |
|Technology  |60+     |Male  |Hispanic|3.010344827586207 |3.0620689655172413|3.0086206896551726 |6380         |
|Operations  |60+     |Female|White   |3.0               |3.1478260869565218|3.072463768115942  |16905        |
|Corporate   |Under 20|Female|Asian   |3.0272727272727273|2.6545454545454548|3.190909090909091  |1320         |
|Corporate   |60+     |Male  |White   |3.0272727272727273|2.6545454545454548|3.190909090909091  |3410   

In [48]:
from pyspark.sql.functions import avg, count
table3_compensation = final_df.groupBy(
    "DeptCategory", "AgeGroup", "Gender", "Race").agg(
    avg("Annual_Base_Salary").alias("Avg_Salary"),
    avg("Base_Pay").alias("Avg_BasePay"),
    avg("Overtime_Pay").alias("Avg_OvertimePay"),
    avg("TotalCompensation").alias("Avg_TotalCompensation"),
    count("*").alias("Payroll_Count")
)
table3_compensation.show(20, truncate=False)

+------------+--------+------+--------+------------------+------------------+------------------+---------------------+-------------+
|DeptCategory|AgeGroup|Gender|Race    |Avg_Salary        |Avg_BasePay       |Avg_OvertimePay   |Avg_TotalCompensation|Payroll_Count|
+------------+--------+------+--------+------------------+------------------+------------------+---------------------+-------------+
|Technology  |40-49   |Female|Hispanic|74320.25          |44631.5275000002  |0.0               |45463.00000000034    |2320         |
|Corporate   |Under 20|Female|Other   |29120.0           |2518.8333333333335|0.0               |2518.8333333333335   |330          |
|Technology  |60+     |Male  |Hispanic|94340.09090909091 |32295.591818180535|90.98181818181793 |61022.24636363305    |6380         |
|Operations  |60+     |Female|White   |90946.48979591837 |41959.45081632884 |401.70265306122405|57751.68244898189    |16905        |
|Corporate   |Under 20|Female|Asian   |29120.0           |2250.791666

In [49]:
from pyspark.sql.functions import sum as _sum, avg, count
table4_turnover = final_df.groupBy(
    "DeptCategory", "AgeGroup", "Gender", "Race"
).agg(
    _sum("TurnoverFlag").alias("Total_Leavers"),
    count("*").alias("Total_Employees"),
    avg("Tenure").alias("Avg_Tenure")
).withColumn(
    "TurnoverRate", col("Total_Leavers") / col("Total_Employees")
)
table4_turnover.show(20, truncate=False)

+------------+--------+------+--------+-------------+---------------+-----------------+------------------+
|DeptCategory|AgeGroup|Gender|Race    |Total_Leavers|Total_Employees|Avg_Tenure       |TurnoverRate      |
+------------+--------+------+--------+-------------+---------------+-----------------+------------------+
|Technology  |40-49   |Female|Hispanic|1240         |2320           |4.346551724137931|0.5344827586206896|
|Corporate   |Under 20|Female|Other   |213          |330            |4.627272727272727|0.6454545454545455|
|Technology  |60+     |Male  |Hispanic|3410         |6380           |4.346551724137931|0.5344827586206896|
|Operations  |60+     |Female|White   |8379         |16905          |4.350724637681159|0.4956521739130435|
|Corporate   |Under 20|Female|Asian   |852          |1320           |4.627272727272727|0.6454545454545455|
|Corporate   |60+     |Male  |White   |2201         |3410           |4.627272727272727|0.6454545454545455|
|Operations  |40-49   |Male  |Other  

In [50]:
from pyspark.sql.functions import avg, sum as _sum, count
table5_training = final_df.groupBy(
    "DeptCategory", "AgeGroup", "Gender", "Race"
).agg(
    avg("Training_DurationDays").alias("Avg_TrainingDays"),
    _sum("Training_Cost").alias("Total_TrainingCost"),
    count("Training_Program_Name").alias("Training_Count")
)
table5_training.show(20, truncate=False)

+------------+--------+------+--------+------------------+--------------------+--------------+
|DeptCategory|AgeGroup|Gender|Race    |Avg_TrainingDays  |Total_TrainingCost  |Training_Count|
+------------+--------+------+--------+------------------+--------------------+--------------+
|Technology  |40-49   |Female|Hispanic|3.0120689655172415|1329740.92          |2320          |
|Corporate   |Under 20|Female|Other   |2.963636363636364 |181869.48000000019  |330           |
|Technology  |60+     |Male  |Hispanic|3.0120689655172415|3656787.5300000035  |6380          |
|Operations  |60+     |Female|White   |2.907246376811594 |9072758.660000052   |16905         |
|Corporate   |Under 20|Female|Asian   |2.963636363636364 |727477.9199999997   |1320          |
|Corporate   |60+     |Male  |White   |2.963636363636364 |1879317.959999996   |3410          |
|Operations  |40-49   |Male  |Other   |2.907246376811594 |2777375.099999987   |5175          |
|Operations  |50-59   |Male  |Black   |2.907246376

In [51]:
table1_demo.coalesce(1).write.option("header", True).mode("overwrite").csv("analytics_output/demo")
table2_engagement.coalesce(1).write.option("header", True).mode("overwrite").csv("analytics_output/engagement")
table3_compensation.coalesce(1).write.option("header", True).mode("overwrite").csv("analytics_output/compensation")
table4_turnover.coalesce(1).write.option("header", True).mode("overwrite").csv("analytics_output/turnover")
table5_training.coalesce(1).write.option("header", True).mode("overwrite").csv("analytics_output/training")