In [1]:
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

In [2]:
# Initial Spark Session
spark = SparkSession.builder \
    .appName("HRIS_ETL_Pipeline_to_Lakehouse") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.hadoop.hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

# Show Spark Context
sc = spark.sparkContext
sc.setLogLevel("ERROR")
sc

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/03 16:53:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [39]:
# Create Databases
spark.sql("CREATE DATABASE IF NOT EXISTS HR_silver")
spark.sql("SHOW DATABASES").show()

+---------------+
|      namespace|
+---------------+
|bronze_raw_data|
|        default|
|      hr_bronze|
|      hr_silver|
+---------------+



In [4]:
# Extract HRIS Employee Data as CSV file
df_spark = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("./data/HRIS_EMPLOYEE_20250810.csv")

df_spark.show(5)

                                                                                

+----------+---------+--------+---------+--------+--------------------+---------------+--------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+----------------+-------------------+-----------------------+-------------+
|Unnamed: 0|FirstName|LastName|StartDate|ExitDate|               Title|     Supervisor|             ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Ra

In [21]:
AWS_S3_BUCKET_NAME = 'landing-zone'

AWS_ACCESS_KEY = 'minioadmin'
AWS_SECRET_KEY = 'minioadmin'

raw_hris = './data/HRIS_EMPLOYEE_20250810.csv'
landing_data_name = 'HRIS_EMPLOYEE_20250810.csv'

s3 = boto3.client(
    "s3",
    endpoint_url="http://minio:9000",   # or http://minio:9000 inside docker
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
)

try:
    s3.upload_file(raw_hris, AWS_S3_BUCKET_NAME, landing_data_name)
    print(f"Upload '{landing_data_name}' to '{AWS_S3_BUCKET_NAME}' bucket successful!")
except Exception as e:
    print("ERROR is: ",e)

Upload 'HRIS_EMPLOYEE_20250810.csv' to 'landing-zone' bucket successful!


In [25]:
# Define the S3 path (using s3a protocol)
landing_data_name = 'HRIS_EMPLOYEE_20250810.csv'
s3_bucket_path = f's3a://landing-zone/{landing_data_name}'

# Read CSV data from S3 into a DataFrame
hr_df = spark.read.csv(s3_bucket_path, header=True, inferSchema=True)

# Convert into Parquet File
try:
    hr_df.write.mode("overwrite").parquet("s3a://warehouse/HR_bronze/HRIS_EMPLOYEE_20250810.parquet")
    print(f"Upload 'HRIS_EMPLOYEE_20250810.parquet' to 'HR_bronze' bucket successful!")
except Exception as e:
    print("ERROR is: ",e)

                                                                                

Upload 'HRIS_EMPLOYEE_20250810.parquet' to 'HR_bronze' bucket successful!


In [17]:
# Transform data
s3_hr_parquet = "s3a://warehouse/HR_bronze/HRIS_EMPLOYEE_20250810.parquet"
raw_hr_df = spark.read.parquet(s3_hr_parquet)
raw_hr_df.show(5)

+----------+---------+--------+---------+--------+--------------------+---------------+--------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-----------+-----------+----------------+------------------+-----------------------+-------------+---------------------+-------------+----------------+----------------+-------------------+-----------------------+-------------+
|Unnamed: 0|FirstName|LastName|StartDate|ExitDate|               Title|     Supervisor|             ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Ra

In [18]:
null_counts = {c: raw_hr_df.filter(col(c).isNull()).count() for c in raw_hr_df.columns} 
print(null_counts)

{'Unnamed: 0': 0, 'FirstName': 0, 'LastName': 0, 'StartDate': 0, 'ExitDate': 1544, 'Title': 0, 'Supervisor': 0, 'ADEmail': 0, 'BusinessUnit': 0, 'EmployeeStatus': 0, 'EmployeeType': 0, 'PayZone': 0, 'EmployeeClassificationType': 0, 'TerminationType': 0, 'TerminationDescription': 1544, 'DepartmentType': 0, 'Division': 0, 'DOB': 0, 'State': 0, 'JobFunctionDescription': 0, 'GenderCode': 0, 'LocationCode': 0, 'RaceDesc': 0, 'MaritalDesc': 0, 'Performance Score': 0, 'Current Employee Rating': 0, 'Employee ID': 0, 'Survey Date': 0, 'Engagement Score': 0, 'Satisfaction Score': 0, 'Work-Life Balance Score': 0, 'Training Date': 0, 'Training Program Name': 0, 'Training Type': 0, 'Training Outcome': 0, 'Location': 0, 'Trainer': 0, 'Training Duration(Days)': 0, 'Training Cost': 0}


In [14]:
from pyspark.sql.functions import col, lit, when

for c in ["TerminationDescription", "Division"]:
    transformed_hr_df.withColumn(c, when(col(c).isNull(), lit("UNKNOWN")).otherwise(col(c)))

In [5]:
# Show Schema
raw_hr_df.printSchema()

root
 |-- Unnamed: 0: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- ADEmail: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- TerminationType: string (nullable = true)
 |-- TerminationDescription: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 

In [6]:
transformed_hr_df = raw_hr_df.withColumnRenamed("Unnamed: 0", "ID") \
                    .withColumnRenamed("Performance Score", "PerformanceScore") \
                    .withColumnRenamed("Employee ID", "EmployeeID") \
                    .withColumnRenamed("Current Employee Rating", "CurrentEmployeeRating") \
                    .withColumnRenamed("Survey Date", "SurveyDate") \
                    .withColumnRenamed("Engagement Score", "EngagementScore") \
                    .withColumnRenamed("Satisfaction Score", "SatisfactionScore") \
                    .withColumnRenamed("Work-Life Balance Score", "WorkLifeBalanceScore") \
                    .withColumnRenamed("Training Date", "TrainingDate") \
                    .withColumnRenamed("Training Program Name", "TrainingProgramName") \
                    .withColumnRenamed("Training Type", "TrainingType") \
                    .withColumnRenamed("Work-Life Balance Score", "WorkLifeBalanceScore") \
                    .withColumnRenamed("Training Outcome", "TrainingOutcome") \
                    .withColumnRenamed("Training Duration(Days)", "TrainingDurationDays") \
                    .withColumnRenamed("Training Cost", "TrainingCost") \
                    .withColumn("CurrentTimeStamp", current_timestamp())

# # standardize column names
# for col in new_raw_hr_df.columns:
#     clean_col = col.lower().replace(" ", "_").replace("(", "").replace(")", "").replace("-", "_")
#     transformed_hr_df = new_raw_hr_df.withColumnRenamed(col, clean_col)

transformed_hr_df.printSchema(5)

root
 |-- ID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- ADEmail: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- TerminationType: string (nullable = true)
 |-- TerminationDescription: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 |-- Mari

In [8]:
# Define Hudi options
hudi_options = {
    "hoodie.table.name": "silver_hris_employee",
    "hoodie.datasource.write.recordkey.field": "ID",
    "hoodie.datasource.write.precombine.field": "CurrentTimeStamp",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.database": "hr_silver",
    "hoodie.datasource.hive_sync.table": "silver_hris_employee",
    "hoodie.datasource.hive_sync.metastore.uris": "thrift://hive-metastore:9083"
}

try:
    # Load data into Hudi Table - S3 MinIO
    transformed_hr_df.write.format("hudi") \
        .options(**hudi_options) \
        .mode("overwrite") \
        .save("s3a://warehouse/hr_silver/silver_hris_employee")
    print(f"Upload to S3-Hudi successful!")
except Exception as e:
    print("ERROR is: ",e)

                                                                                

Upload to S3-Hudi successful!


In [9]:
spark.sql("SHOW TABLES IN hr_silver;").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|hr_silver|silver_hris_employee|      false|
+---------+--------------------+-----------+



In [26]:
spark.sql("SELECT * FROM hr_silver.silver_hris_employee LIMIT 5;").show()

+-------------------+--------------------+------------------+----------------------+-----------------+---+---------+--------+---------+--------+-----+----------+-------+--------------+------------+-------+--------------------------+---------------+----------------------+--------------+--------+---+-----+----------------------+----------+------------+--------+-----------+----------------+---------------------+----------+----------+---------------+-----------------+--------------------+------------+-------------------+------------+---------------+--------+-------+--------------------+------------+----------------+------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| ID|FirstName|LastName|StartDate|ExitDate|Title|Supervisor|ADEmail|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|DepartmentType|Division|DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|Marita

In [12]:
spark.read.format("hudi").load("s3a://warehouse/hr_silver/silver_hris_employee").show(5)

                                                                                

+-------------------+--------------------+------------------+----------------------+--------------------+----+---------+---------+---------+---------+--------------------+----------------+--------------------+------------+--------------------+------------+-------+--------------------------+---------------+----------------------+-----------------+----------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+---------------------+----------+----------+---------------+-----------------+--------------------+------------+--------------------+------------+---------------+---------------+----------------+--------------------+------------+--------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|  ID|FirstName| LastName|StartDate| ExitDate|               Title|      Supervisor|             ADEmail|BusinessUnit|      EmployeeStatus|EmployeeType|PayZone|EmployeeClassific

In [28]:
spark.stop()