In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Jupyter") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "4") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

df = spark.read.csv("/opt/spark-data/input/employees_raw.csv", header=True, inferSchema=True)
df.toPandas()



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/04 15:47:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/04 15:47:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

Unnamed: 0,employee_id,first_name,last_name,email,hire_date,job_title,department,salary,manager_id,address,city,state,zip_code,birth_date,status
0,1605,Robert,Robinson,steven.robinson@company.com,2019-04-21,Analyst,IT,75000,,7303 Maple Drive,Denver,,,1975-05-07,Active
1,1019,Robert,MILLER,mary@invalid,2021-09-25,Lead,Marketing,35000,2003.0,4101 Willow Drive,Los Angeles,IL,98914.0,1975-09-22,On Leave
2,1999,Barbara,Robinson,mark.anderson@company.com,2018-10-24,Consultant,Marketing,95000,2002.0,4755 Maple Rd,Boston,FL,34364.0,1986-10-26,On Leave
3,1662,Nancy,THOMAS,mark.sanchez@company.com,2017-08-11,Lead,HR,85000,2004.0,7164 Spruce Lane,Seattle,CO,22664.0,1978-05-31,On Leave
4,1112,Barbara,Harris,mary.martin@company.com,2024-02-07,Data Analyst,Finance,75000,,4721 Pine Court,Los Angeles,,54795.0,1963-02-26,On Leave
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1015,2014,anthony,Anderson,charles.johnson@company.com,2022-10-04,Lead,IT,55000,2009.0,3029 Birch Ave,Chicago,AZ,39811.0,1986-09-16,On Leave
1016,2016,Susan,TAYLOR,anthony.anderson@company.com,2021-03-23,Lead,Engineering,75000,2001.0,1302 Main Blvd,Dallas,VA,17328.0,1966-07-16,On Leave
1017,1139,Matthew,Taylor,alice.martin@company.com,2017-12-29,Lead,Analytics,45000,,2674 Spruce Blvd,Los Angeles,NC,61719.0,1973-05-30,Inactive
1018,1834,daniel,WILLIAMS,john.rodriguez@company.com,2023-09-23,Analyst,HR,105000,2004.0,9826 Elm Court,New York,AZ,60663.0,1993-02-20,Inactive


In [2]:
# Remove duplicate records based on employee_id
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
window = Window.partitionBy("employee_id").orderBy("employee_id")
df = df.withColumn("row_num", row_number().over(window)).filter(col("row_num") == 1).drop("row_num")

df.toPandas()

                                                                                

Unnamed: 0,employee_id,first_name,last_name,email,hire_date,job_title,department,salary,manager_id,address,city,state,zip_code,birth_date,status
0,1001,Lisa,Anderson,patricia.jones@company.com,2019-10-05,Analyst,Finance,65000,2005.0,,,PA,66105.0,2000-09-06,Inactive
1,1002,Alice,Taylor,daniel.moore@company.com,2020-12-29,Manager,Engineering,95000,2002.0,2366 Oak Rd,San Francisco,AZ,59965.0,1998-12-21,On Leave
2,1003,donald,Moore,james.martinez@company.com,2016-03-29,Data Analyst,Analytics,35000,2003.0,2423 Cedar Rd,San Diego,MA,14363.0,1991-05-06,Inactive
3,1004,steven,Thompson,david.anderson@company.com,2016-04-09,Manager,Marketing,"$95,000",2002.0,6437 Willow Lane,Charlotte,TX,10515.0,1987-07-08,On Leave
4,1005,Mark,Harris,JANE.MARTINEZ@COMPANY.COM,2020-06-30,Lead,Engineering,55000,2004.0,3098 Willow Rd,San Antonio,PA,63160.0,1992-09-12,On Leave
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1015,2016,Susan,TAYLOR,anthony.anderson@company.com,2021-03-23,Lead,Engineering,75000,2001.0,1302 Main Blvd,Dallas,VA,17328.0,1966-07-16,On Leave
1016,2017,Michael,Martinez,james.rodriguez@company.com,2024-03-28,Developer,Engineering,"$65,000",2007.0,2666 Elm Drive,Jacksonville,NJ,39583.0,1971-01-01,Active
1017,2018,Steven,TAYLOR,barbara.lee@company.com,2027-05-03,Lead,Analytics,85000,2005.0,525 Pine Ave,Houston,CO,10315.0,1968-10-14,Inactive
1018,2019,steven,Sanchez,richard.miller@company.com,2018-11-04,Analyst,Finance,"$65,000",2001.0,2151 Cedar Lane,Philadelphia,,69208.0,1971-12-22,Active


In [3]:
# Fill missing values
df = df.fillna({"address": "", "city": "", "state": "", "zip_code": "", "status": "Active"})

df.filter(col("address") == "").count()
df.toPandas()


                                                                                

Unnamed: 0,employee_id,first_name,last_name,email,hire_date,job_title,department,salary,manager_id,address,city,state,zip_code,birth_date,status
0,1001,Lisa,Anderson,patricia.jones@company.com,2019-10-05,Analyst,Finance,65000,2005.0,,,PA,66105.0,2000-09-06,Inactive
1,1002,Alice,Taylor,daniel.moore@company.com,2020-12-29,Manager,Engineering,95000,2002.0,2366 Oak Rd,San Francisco,AZ,59965.0,1998-12-21,On Leave
2,1003,donald,Moore,james.martinez@company.com,2016-03-29,Data Analyst,Analytics,35000,2003.0,2423 Cedar Rd,San Diego,MA,14363.0,1991-05-06,Inactive
3,1004,steven,Thompson,david.anderson@company.com,2016-04-09,Manager,Marketing,"$95,000",2002.0,6437 Willow Lane,Charlotte,TX,10515.0,1987-07-08,On Leave
4,1005,Mark,Harris,JANE.MARTINEZ@COMPANY.COM,2020-06-30,Lead,Engineering,55000,2004.0,3098 Willow Rd,San Antonio,PA,63160.0,1992-09-12,On Leave
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1015,2016,Susan,TAYLOR,anthony.anderson@company.com,2021-03-23,Lead,Engineering,75000,2001.0,1302 Main Blvd,Dallas,VA,17328.0,1966-07-16,On Leave
1016,2017,Michael,Martinez,james.rodriguez@company.com,2024-03-28,Developer,Engineering,"$65,000",2007.0,2666 Elm Drive,Jacksonville,NJ,39583.0,1971-01-01,Active
1017,2018,Steven,TAYLOR,barbara.lee@company.com,2027-05-03,Lead,Analytics,85000,2005.0,525 Pine Ave,Houston,CO,10315.0,1968-10-14,Inactive
1018,2019,steven,Sanchez,richard.miller@company.com,2018-11-04,Analyst,Finance,"$65,000",2001.0,2151 Cedar Lane,Philadelphia,,69208.0,1971-12-22,Active


In [4]:
# Validate email format
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"

# Check invalid emails
df.filter(~col("email").rlike(email_pattern)).select("email").show()

# Convert to lowercase
df = df.withColumn("email", lower(col("email")))

df.select("email").toPandas()

+----------------+
|           email|
+----------------+
|   nancy@invalid|
|patricia@invalid|
|    mary@invalid|
|    john@invalid|
| anthony@invalid|
|    jane@invalid|
|    john@invalid|
|patricia@invalid|
|  robert@invalid|
|   alice@invalid|
| richard@invalid|
|  steven@invalid|
| barbara@invalid|
|    john@invalid|
|  robert@invalid|
|    jane@invalid|
|jennifer@invalid|
| michael@invalid|
|   alice@invalid|
| barbara@invalid|
+----------------+
only showing top 20 rows



Unnamed: 0,email
0,patricia.jones@company.com
1,daniel.moore@company.com
2,james.martinez@company.com
3,david.anderson@company.com
4,jane.martinez@company.com
...,...
1015,anthony.anderson@company.com
1016,james.rodriguez@company.com
1017,barbara.lee@company.com
1018,richard.miller@company.com


In [5]:
# Remove $ and commas, convert to decimal
df = df.withColumn("salary", regexp_replace(col("salary"), r"[\$,]", "").cast("decimal(10,2)"))

df.select("salary").toPandas()

Unnamed: 0,salary
0,65000.00
1,95000.00
2,35000.00
3,95000.00
4,55000.00
...,...
1015,75000.00
1016,65000.00
1017,85000.00
1018,65000.00


In [6]:
# Remove records with future hire dates
df.filter(col("hire_date") > current_date()).select("employee_id", "hire_date").show()

df = df.filter(col("hire_date") <= current_date())

df.count()

+-----------+----------+
|employee_id| hire_date|
+-----------+----------+
|       1006|2027-08-16|
|       1052|2026-08-16|
|       1085|2026-08-04|
|       1193|2026-10-10|
|       1196|2027-06-29|
|       1197|2027-10-28|
|       1208|2027-12-28|
|       1228|2026-12-29|
|       1256|2027-03-24|
|       1274|2027-01-25|
|       1276|2026-06-20|
|       1308|2027-10-14|
|       1309|2028-01-01|
|       1344|2027-06-01|
|       1357|2026-10-14|
|       1449|2026-03-24|
|       1451|2026-07-06|
|       1454|2026-11-30|
|       1476|2027-08-21|
|       1490|2026-05-05|
+-----------+----------+
only showing top 20 rows



970

In [7]:
# Convert names to proper case
df = df.withColumn("first_name", initcap(col("first_name"))) \
        .withColumn("last_name", initcap(col("last_name")))

df.select("first_name", "last_name").toPandas()

Unnamed: 0,first_name,last_name
0,Lisa,Anderson
1,Alice,Taylor
2,Donald,Moore
3,Steven,Thompson
4,Mark,Harris
...,...,...
965,Jennifer,Johnson
966,Susan,Taylor
967,Michael,Martinez
968,Steven,Sanchez


In [8]:
# Calculate age from birth_date
df = df.withColumn("age", year(current_date()) - year(col("birth_date")) - 
                   when(concat_ws("-", month(col("birth_date")), dayofmonth(col("birth_date"))) >
                        concat_ws("-", month(current_date()), dayofmonth(current_date())), 1).otherwise(0))

df.select("age").describe().toPandas()

Unnamed: 0,summary,age
0,count,970.0
1,mean,45.236082474226805
2,stddev,11.961364408257229
3,min,25.0
4,max,66.0


In [9]:
# Calculate years of service
df = df.withColumn("tenure_years", (datediff(current_date(), col("hire_date")) / 365.25).cast("decimal(3,1)"))

df.select("tenure_years").describe().toPandas()

Unnamed: 0,summary,tenure_years
0,count,970.0
1,mean,5.66619
2,stddev,2.533782254762425
3,min,1.0
4,max,10.0


In [10]:
# Create salary bands: Junior (<50k), Mid (50k-80k), Senior (>80k)
df = df.withColumn("salary_band", 
                   when(col("salary").isNull(), "Unknown")
                   .when(col("salary") < 50000, "Junior")
                   .when(col("salary") < 80000, "Mid")
                   .otherwise("Senior"))

df.groupBy("salary_band").count().toPandas()

Unnamed: 0,salary_band,count
0,Senior,358
1,Mid,365
2,Junior,247


In [11]:
# Concatenate first and last name
df = df.withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name")))

df.select("first_name", "last_name", "full_name").toPandas()

Unnamed: 0,first_name,last_name,full_name
0,Lisa,Anderson,Lisa Anderson
1,Alice,Taylor,Alice Taylor
2,Donald,Moore,Donald Moore
3,Steven,Thompson,Steven Thompson
4,Mark,Harris,Mark Harris
...,...,...,...
965,Jennifer,Johnson,Jennifer Johnson
966,Susan,Taylor,Susan Taylor
967,Michael,Martinez,Michael Martinez
968,Steven,Sanchez,Steven Sanchez


In [12]:
# Extract domain from email
df = df.withColumn("email_domain", regexp_extract(col("email"), r"@(.+)$", 1))

df.groupBy("email_domain").count().toPandas()

Unnamed: 0,email_domain,count
0,invalid,99
1,company.com,871


In [13]:
# Add timestamps
from datetime import datetime
current_ts = lit(datetime.now().isoformat())
df = df.withColumn("created_at", current_ts).withColumn("updated_at", current_ts)

df.select("created_at", "updated_at").toPandas()

Unnamed: 0,created_at,updated_at
0,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
1,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
2,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
3,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
4,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
...,...,...
965,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
966,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
967,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698
968,2026-02-04T15:48:24.345698,2026-02-04T15:48:24.345698


In [14]:
# Select and reorder columns
final_cols = ["employee_id", "first_name", "last_name", "full_name", "email", "email_domain", 
              "hire_date", "job_title", "department", "salary", "salary_band", "manager_id", 
              "address", "city", "state", "zip_code", "birth_date", "age", "tenure_years", 
              "status", "created_at", "updated_at"]

df = df.select([col(c) for c in final_cols])

df.printSchema()
df.show()

root
 |-- employee_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- full_name: string (nullable = false)
 |-- email: string (nullable = true)
 |-- email_domain: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- job_title: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: decimal(10,2) (nullable = true)
 |-- salary_band: string (nullable = false)
 |-- manager_id: integer (nullable = true)
 |-- address: string (nullable = false)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- zip_code: integer (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- age: integer (nullable = true)
 |-- tenure_years: decimal(3,1) (nullable = true)
 |-- status: string (nullable = false)
 |-- created_at: string (nullable = false)
 |-- updated_at: string (nullable = false)

+-----------+----------+---------+------------------+--------------------+---------

In [15]:
# 1. Top earners
# 1. Top earners
df.orderBy(col("salary").desc()).select("full_name", "department", "salary").show(10)

# 2. Employees by department
df.groupBy("department").agg(count("employee_id").alias("count"), 
                              avg("salary").alias("avg_salary")).show()

# 3. Salary distribution
df.groupBy("salary_band").count().show()

# 4. Salary stats
df.select("salary").describe().show()

# 5. Age distribution
df.select("age").describe().show()

# 6. Tenure analysis
df.groupBy(when(col("tenure_years") < 1, "< 1 year")
           .when(col("tenure_years") < 3, "1-3 years")
           .when(col("tenure_years") < 5, "3-5 years")
           .otherwise("5+ years").alias("tenure_group")).count().toPandas()

+----------------+-----------+---------+
|       full_name| department|   salary|
+----------------+-----------+---------+
|    Robert Davis|         IT|105000.00|
|  Mary Hernandez|         IT|105000.00|
| Jessica Sanchez|  Marketing|105000.00|
|     Michael Lee|Engineering|105000.00|
|Jennifer Johnson|  Marketing|105000.00|
|   David Johnson|  Analytics|105000.00|
|   Barbara Lopez|      Sales|105000.00|
|     Karen Lopez|  Analytics|105000.00|
|   John Anderson|      Sales|105000.00|
| Donald Anderson|Engineering|105000.00|
+----------------+-----------+---------+
only showing top 10 rows

+-----------+-----+------------+
| department|count|  avg_salary|
+-----------+-----+------------+
|      Sales|  133|72218.045113|
|Engineering|  134|71567.164179|
|         HR|  127|69330.708661|
|    Finance|  113|68716.814159|
|  Analytics|  115|67173.913043|
|  Marketing|   98|72346.938776|
|         IT|  124|70322.580645|
| Operations|  126|66825.396825|
+-----------+-----+------------+

+--

Unnamed: 0,tenure_group,count
0,5+ years,575
1,3-5 years,210
2,1-3 years,185


In [16]:
# Show cleaned data


# Display key columns

# Convert to Pandas and display
pdf = df.select("employee_id", "full_name", "email", "salary", "salary_band", "age", "tenure_years", "department").limit(10).toPandas()
print(pdf)



   employee_id        full_name                        email    salary  \
0         1001    Lisa Anderson   patricia.jones@company.com  65000.00   
1         1002     Alice Taylor     daniel.moore@company.com  95000.00   
2         1003     Donald Moore   james.martinez@company.com  35000.00   
3         1004  Steven Thompson   david.anderson@company.com  95000.00   
4         1005      Mark Harris    jane.martinez@company.com  55000.00   
5         1007      John Wilson  richard.jackson@company.com  85000.00   
6         1008  Patricia Miller      james.jones@company.com  35000.00   
7         1009     Steven Clark  daniel.martinez@company.com  35000.00   
8         1010      Alice Smith                nancy@invalid  65000.00   
9         1011     Daniel Brown   patricia.lewis@company.com  35000.00   

  salary_band  age tenure_years   department  
0         Mid   25          6.3      Finance  
1      Senior   28          5.1  Engineering  
2      Junior   34          9.9    Analytics

In [18]:
pdf = df.select("employee_id", "full_name", "email","department").limit(10).toPandas()
print(pdf)

   employee_id        full_name                        email   department
0         1001    Lisa Anderson   patricia.jones@company.com      Finance
1         1002     Alice Taylor     daniel.moore@company.com  Engineering
2         1003     Donald Moore   james.martinez@company.com    Analytics
3         1004  Steven Thompson   david.anderson@company.com    Marketing
4         1005      Mark Harris    jane.martinez@company.com  Engineering
5         1007      John Wilson  richard.jackson@company.com           IT
6         1008  Patricia Miller      james.jones@company.com    Analytics
7         1009     Steven Clark  daniel.martinez@company.com    Marketing
8         1010      Alice Smith                nancy@invalid      Finance
9         1011     Daniel Brown   patricia.lewis@company.com    Marketing


In [20]:
pdf = df.select("employee_id", "full_name", "salary_band",  "tenure_years", "department").limit(10).toPandas()
print(pdf)

   employee_id        full_name salary_band tenure_years   department
0         1001    Lisa Anderson         Mid          6.3      Finance
1         1002     Alice Taylor      Senior          5.1  Engineering
2         1003     Donald Moore      Junior          9.9    Analytics
3         1004  Steven Thompson      Senior          9.8    Marketing
4         1005      Mark Harris         Mid          5.6  Engineering
5         1007      John Wilson      Senior          8.3           IT
6         1008  Patricia Miller      Junior          6.5    Analytics
7         1009     Steven Clark      Junior          2.9    Marketing
8         1010      Alice Smith         Mid          7.9      Finance
9         1011     Daniel Brown      Junior          4.0    Marketing


In [17]:
# Write cleaned dataframe to Postgres using SQLAlchemy and psycopg2
from sqlalchemy import create_engine
import os

pg_user = os.environ.get('POSTGRES_USER', 'spark')
pg_password = os.environ.get('POSTGRES_PASSWORD', 'sparkpass')
pg_host = os.environ.get('POSTGRES_HOST', 'postgres')
pg_port = os.environ.get('POSTGRES_PORT', '5432')
pg_db = os.environ.get('POSTGRES_DB', 'sparkdb')

engine = create_engine(f'postgresql+psycopg2://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{pg_db}')

# Convert Spark DataFrame to pandas (note: ok for moderate-sized data)
pdf = df.toPandas()

# Write to table 'employees_clean' (replace if exists)
pdf.to_sql('employees_clean', engine, if_exists='replace', index=False)

print('Wrote', len(pdf), 'rows to employees_clean in Postgres')

Wrote 970 rows to employees_clean in Postgres
