#### Importing the neccessary libraries and dependencies

In [8]:
!pip install pandas

Collecting pandas
  Using cached pandas-2.2.3-cp39-cp39-win_amd64.whl.metadata (19 kB)
Collecting numpy>=1.22.4 (from pandas)
  Using cached numpy-2.0.2-cp39-cp39-win_amd64.whl.metadata (59 kB)
Collecting pytz>=2020.1 (from pandas)
  Using cached pytz-2025.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Using cached tzdata-2025.1-py2.py3-none-any.whl.metadata (1.4 kB)
Using cached pandas-2.2.3-cp39-cp39-win_amd64.whl (11.6 MB)
Using cached numpy-2.0.2-cp39-cp39-win_amd64.whl (15.9 MB)
Using cached pytz-2025.1-py2.py3-none-any.whl (507 kB)
Using cached tzdata-2025.1-py2.py3-none-any.whl (346 kB)
Installing collected packages: pytz, tzdata, numpy, pandas
Successfully installed numpy-2.0.2 pandas-2.2.3 pytz-2025.1 tzdata-2025.1


In [9]:
#Import libraries
import pandas as pd 
from sqlalchemy import create_engine

# Import PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, to_date

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("NYC Payroll ETL") \
    .getOrCreate()


In [10]:
spark

### Data Extraction

In [None]:
#extract and convert the cvs into dataframe using spark
AgencyMaster_df = spark.read.csv(r'C:\Users\abiol\OneDrive\Desktop\10ANALYTICS DATA ENGINEERING\NYC PAYROLL PROJECT\NYC-PAYROLL\datasets\AgencyMaster.csv', header=True, inferSchema=True)
EmpMaster_df = spark.read.csv(r'datasets\EmpMaster.csv', header=True, inferSchema=True)
TitleMaster_df = spark.read.csv(r'datasets\TitleMaster.csv', header=True, inferSchema=True)


In [None]:
AgencyMaster_df.show(5)
EmpMaster_df.show(5)
TitleMaster_df.show(5)


+--------+--------------------+
|AgencyID|          AgencyName|
+--------+--------------------+
|    2001|ADMIN FOR CHILDRE...|
|    2002|ADMIN TRIALS AND ...|
|    2003| BOARD OF CORRECTION|
|    2004|   BOARD OF ELECTION|
|    2005|BOARD OF ELECTION...|
+--------+--------------------+
only showing top 5 rows

+----------+--------+---------+
|EmployeeID|LastName|FirstName|
+----------+--------+---------+
|    100001|  AACHEN|    DAVID|
|    100002|  AACHEN|   MONICA|
|    100003|  AADAMS|  LAMMELL|
|    100004|   AADIL|     IRIS|
|    100005|  AALAAM|     AMIR|
+----------+--------+---------+
only showing top 5 rows

+---------+--------------------+
|TitleCode|    TitleDescription|
+---------+--------------------+
|    40001|*ADM SCHOOL SECUR...|
|    40002|*ADMIN SCHL SECUR...|
|    40003|    *AGENCY ATTORNEY|
|    40004|*ASSISTANT ADVOCA...|
|    40005|*ASSOCIATE EDUCAT...|
+---------+--------------------+
only showing top 5 rows



In [None]:
#checking for the right datatype for each column in various dataframes
AgencyMaster_df.printSchema()
EmpMaster_df.printSchema()
TitleMaster_df.printSchema()

root
 |-- AgencyID: integer (nullable = true)
 |-- AgencyName: string (nullable = true)

root
 |-- EmployeeID: integer (nullable = true)
 |-- LastName: string (nullable = true)
 |-- FirstName: string (nullable = true)

root
 |-- TitleCode: integer (nullable = true)
 |-- TitleDescription: string (nullable = true)



In [None]:
#Checking for missing values in the dataframe
from pyspark.sql.functions import col, count, when

AgencyMaster_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in AgencyMaster_df.columns]
).show()


+--------+----------+
|AgencyID|AgencyName|
+--------+----------+
|       0|         0|
+--------+----------+



In [None]:
#Checking for missing values in the dataframe
EmpMaster_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in EmpMaster_df.columns]
).show()

+----------+--------+---------+
|EmployeeID|LastName|FirstName|
+----------+--------+---------+
|         0|       0|        0|
+----------+--------+---------+



In [None]:
#Checking for missing values in the dataframe
TitleMaster_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in TitleMaster_df.columns]
).show()

+---------+----------------+
|TitleCode|TitleDescription|
+---------+----------------+
|        0|               1|
+---------+----------------+



In [None]:
#dropping the column with missing values
TitleMaster_df = TitleMaster_df.dropna(subset=["TitleDescription"])


In [34]:
TitleMaster_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in TitleMaster_df.columns]
).show()

+---------+----------------+
|TitleCode|TitleDescription|
+---------+----------------+
|        0|               0|
+---------+----------------+



In [21]:
#extract and convert the cvs into dataframe using spark
nycpayroll2020_df = spark.read.csv(r'datasets\nycpayroll_2020.csv', header=True, inferSchema=True)
nycpayroll2021_df = spark.read.csv(r'datasets\nycpayroll_2021.csv', header=True, inferSchema=True)

In [22]:
nycpayroll2020_df.show(5)
nycpayroll2021_df.show(5)

+----------+-------------+--------+--------------------+----------+----------+---------+---------------+-------------------+---------+--------------------+---------------------+----------+---------+------------+----------------+-------+-----------+-------------+
|FiscalYear|PayrollNumber|AgencyID|          AgencyName|EmployeeID|  LastName|FirstName|AgencyStartDate|WorkLocationBorough|TitleCode|    TitleDescription|LeaveStatusasofJune30|BaseSalary| PayBasis|RegularHours|RegularGrossPaid|OTHours|TotalOTPaid|TotalOtherPay|
+----------+-------------+--------+--------------------+----------+----------+---------+---------------+-------------------+---------+--------------------+---------------------+----------+---------+------------+----------------+-------+-----------+-------------+
|      2020|           17|    2120|OFFICE OF EMERGEN...|     10001|    GEAGER| VERONICA|      9/12/2016|           BROOKLYN|    40447|EMERGENCY PREPARE...|               ACTIVE|   86005.0|per Annum|      1820.0|

In [None]:
# Convert column names to lowercase and remove extra spaces
nycpayroll2020_df = nycpayroll2020_df.toDF(*[col.lower().strip() for col in nycpayroll2020_df.columns])
nycpayroll2021_df = nycpayroll2021_df.toDF(*[col.lower().strip() for col in nycpayroll2021_df.columns])

# Check if columns match
payroll2020 = set(nycpayroll2020_df.columns)
payroll2021 = set(nycpayroll2021_df.columns)

if payroll2020 != payroll2021:
    print("Columns do not match! Fixing column differences...")
    print("Extra columns in 2020:", payroll2020 - payroll2021)
    print("Extra columns in 2021:", payroll2021 - payroll2020)
else:
    print("Columns match, ready to merge.")


Columns do not match! Fixing column differences...
Extra columns in 2020: {'agencyid'}
Extra columns in 2021: {'agencycode'}


#### The above function shows there are missing columns in each dataset payroll2020 & payroll2021, We will proceed to merge both dataset into one

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Initialize Spark Session
spark = SparkSession.builder.appName("Merge DataFrames").getOrCreate()

# Standardizing column names (convert to lowercase and remove spaces)
nycpayroll2020_df = nycpayroll2020_df.toDF(*[col.lower().strip() for col in nycpayroll2020_df.columns])
nycpayroll2021_df = nycpayroll2021_df.toDF(*[col.lower().strip() for col in nycpayroll2021_df.columns])

# Get column sets for each DataFrame
payroll2020 = set(nycpayroll2020_df.columns)
payroll2021 = set(nycpayroll2021_df.columns)

# Identify extra columns in each dataset
# Columns in 2020 but not in 2021
extra_2020columns = list(payroll2020 - payroll2021) 
    
# Columns in 2021 but not in 2020
extra_2021columns = list(payroll2021 - payroll2020)  

# Add missing columns with NULL values
for col_name in extra_2020columns:
    nycpayroll2021_df = nycpayroll2021_df.withColumn(col_name, lit(None))  

for col_name in extra_2021columns:
    nycpayroll2020_df = nycpayroll2020_df.withColumn(col_name, lit(None))  

# Ensure both DataFrames have the same column order
common_columns = sorted(payroll2020.union(payroll2021))  
nycpayroll2020_df = nycpayroll2020_df.select(common_columns)  
nycpayroll2021_df = nycpayroll2021_df.select(common_columns)  

# Merge both DataFrames
mergedpayroll_df = nycpayroll2020_df.unionByName(nycpayroll2021_df)

# Show merged dataset
mergedpayroll_df.show()


+----------+--------+--------------------+---------------+----------+----------+---------+----------+-------------+---------------------+-------+---------+-------------+----------------+------------+---------+--------------------+-------------+-----------+-------------------+
|agencycode|agencyid|          agencyname|agencystartdate|basesalary|employeeid|firstname|fiscalyear|     lastname|leavestatusasofjune30|othours| paybasis|payrollnumber|regulargrosspaid|regularhours|titlecode|    titledescription|totalotherpay|totalotpaid|worklocationborough|
+----------+--------+--------------------+---------------+----------+----------+---------+----------+-------------+---------------------+-------+---------+-------------+----------------+------------+---------+--------------------+-------------+-----------+-------------------+
|      NULL|    2120|OFFICE OF EMERGEN...|      9/12/2016|   86005.0|     10001| VERONICA|      2020|       GEAGER|               ACTIVE|    0.0|per Annum|           17|

In [50]:
mergedpayroll_df.columns

#number of columns
num_cols = len(mergedpayroll_df.columns)
print('The number of columns are:', num_cols)

#number of rows
num_rows = mergedpayroll_df.count()

print('The number of rows are:', num_rows)

print(f'The shape of the dataframe mergedpayroll_df: ({num_rows}, {num_cols})')


The number of columns are: 20
The number of rows are: 201
The shape of the dataframe mergedpayroll_df: (201, 20)


In [39]:
mergedpayroll_df.printSchema()

root
 |-- agencycode: integer (nullable = true)
 |-- agencyid: integer (nullable = true)
 |-- agencyname: string (nullable = true)
 |-- agencystartdate: string (nullable = true)
 |-- basesalary: double (nullable = true)
 |-- employeeid: integer (nullable = true)
 |-- firstname: string (nullable = true)
 |-- fiscalyear: integer (nullable = true)
 |-- lastname: string (nullable = true)
 |-- leavestatusasofjune30: string (nullable = true)
 |-- othours: double (nullable = true)
 |-- paybasis: string (nullable = true)
 |-- payrollnumber: integer (nullable = true)
 |-- regulargrosspaid: double (nullable = true)
 |-- regularhours: double (nullable = true)
 |-- titlecode: integer (nullable = true)
 |-- titledescription: string (nullable = true)
 |-- totalotherpay: double (nullable = true)
 |-- totalotpaid: double (nullable = true)
 |-- worklocationborough: string (nullable = true)



In [None]:
#Check and count for null values
for column in mergedpayroll_df.columns:
    print(column, 'Nulls', mergedpayroll_df.filter(mergedpayroll_df[column].isNull()).count())

agencycode Nulls 100
agencyid Nulls 101
agencyname Nulls 0
agencystartdate Nulls 0
basesalary Nulls 0
employeeid Nulls 0
firstname Nulls 0
fiscalyear Nulls 0
lastname Nulls 0
leavestatusasofjune30 Nulls 0
othours Nulls 0
paybasis Nulls 0
payrollnumber Nulls 0
regulargrosspaid Nulls 0
regularhours Nulls 0
titlecode Nulls 0
titledescription Nulls 0
totalotherpay Nulls 0
totalotpaid Nulls 0
worklocationborough Nulls 0


In [57]:
#Filling the null values
mergedpayroll_df = mergedpayroll_df.fillna(
    {"agencycode": 0, 
     "agencyid": 0
    })

In [58]:
#confirming no null values
for column in mergedpayroll_df.columns:
    print(column, 'Nulls', mergedpayroll_df.filter(mergedpayroll_df[column].isNull()).count())

agencycode Nulls 0
agencyid Nulls 0
agencyname Nulls 0
agencystartdate Nulls 0
basesalary Nulls 0
employeeid Nulls 0
firstname Nulls 0
fiscalyear Nulls 0
lastname Nulls 0
leavestatusasofjune30 Nulls 0
othours Nulls 0
paybasis Nulls 0
payrollnumber Nulls 0
regulargrosspaid Nulls 0
regularhours Nulls 0
titlecode Nulls 0
titledescription Nulls 0
totalotherpay Nulls 0
totalotpaid Nulls 0
worklocationborough Nulls 0


In [61]:
#checking and removing duplicate values
mergedpayroll_df = mergedpayroll_df.dropDuplicates()


In [77]:
# Convert date columns to proper format (replace 'date_column' with actual column)
mergedpayroll_df = mergedpayroll_df.withColumn("agencystartdate", to_date(col("agencystartdate"), "MM/dd/yyyy"))

#convert columns to interger from strings
mergedpayroll_df = mergedpayroll_df.withColumn("basesalary", col("basesalary").cast("int"))
mergedpayroll_df = mergedpayroll_df.withColumn("agencycode", col("agencycode").cast("int"))
mergedpayroll_df = mergedpayroll_df.withColumn("agencyid", col("agencyid").cast("int"))
mergedpayroll_df = mergedpayroll_df.withColumn("employeeid", col("employeeid").cast("int"))

from pyspark.sql.functions import col

# Convert string columns to numeric types
mergedpayroll_df = (
    mergedpayroll_df
    .withColumn("fiscalyear", col("fiscalyear").cast("int"))
    .withColumn("othours", col("othours").cast("double"))
    .withColumn("payrollnumber", col("payrollnumber").cast("int"))
    .withColumn("regulargrosspaid", col("regulargrosspaid").cast("double"))
    .withColumn("regularhours", col("regularhours").cast("double"))
    .withColumn("titlecode", col("titlecode").cast("int"))
    .withColumn("totalotherpay", col("totalotherpay").cast("double"))
    .withColumn("totalotpaid", col("totalotpaid").cast("double"))
)

# Show schema after changes
mergedpayroll_df.printSchema()

root
 |-- agencycode: integer (nullable = true)
 |-- agencyid: integer (nullable = true)
 |-- agencyname: string (nullable = true)
 |-- agencystartdate: date (nullable = true)
 |-- basesalary: integer (nullable = true)
 |-- employeeid: integer (nullable = true)
 |-- firstname: string (nullable = true)
 |-- fiscalyear: integer (nullable = true)
 |-- lastname: string (nullable = true)
 |-- leavestatusasofjune30: string (nullable = true)
 |-- othours: double (nullable = true)
 |-- paybasis: string (nullable = true)
 |-- payrollnumber: integer (nullable = true)
 |-- regulargrosspaid: double (nullable = true)
 |-- regularhours: double (nullable = true)
 |-- titlecode: integer (nullable = true)
 |-- titledescription: string (nullable = true)
 |-- totalotherpay: double (nullable = true)
 |-- totalotpaid: double (nullable = true)
 |-- worklocationborough: string (nullable = true)



In [80]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


mergedpayroll_df.show(5)

+----------+--------+--------------------+---------------+----------+----------+---------+----------+---------+---------------------+-------+---------+-------------+----------------+------------+---------+--------------------+-------------+-----------+-------------------+
|agencycode|agencyid|          agencyname|agencystartdate|basesalary|employeeid|firstname|fiscalyear| lastname|leavestatusasofjune30|othours| paybasis|payrollnumber|regulargrosspaid|regularhours|titlecode|    titledescription|totalotherpay|totalotpaid|worklocationborough|
+----------+--------+--------------------+---------------+----------+----------+---------+----------+---------+---------------------+-------+---------+-------------+----------------+------------+---------+--------------------+-------------+-----------+-------------------+
|         0|    2120|OFFICE OF EMERGEN...|           NULL|     86005|    149612| JONATHAN|      2020|    ROTTA|               ACTIVE|    0.0|per Annum|           17|        84698.21

In [79]:
mergedpayroll_df.printSchema()

root
 |-- agencycode: integer (nullable = true)
 |-- agencyid: integer (nullable = true)
 |-- agencyname: string (nullable = true)
 |-- agencystartdate: date (nullable = true)
 |-- basesalary: integer (nullable = true)
 |-- employeeid: integer (nullable = true)
 |-- firstname: string (nullable = true)
 |-- fiscalyear: integer (nullable = true)
 |-- lastname: string (nullable = true)
 |-- leavestatusasofjune30: string (nullable = true)
 |-- othours: double (nullable = true)
 |-- paybasis: string (nullable = true)
 |-- payrollnumber: integer (nullable = true)
 |-- regulargrosspaid: double (nullable = true)
 |-- regularhours: double (nullable = true)
 |-- titlecode: integer (nullable = true)
 |-- titledescription: string (nullable = true)
 |-- totalotherpay: double (nullable = true)
 |-- totalotpaid: double (nullable = true)
 |-- worklocationborough: string (nullable = true)



In [None]:
#datetime parser
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

mergedpayroll_df.show(5)

+----------+--------+--------------------+---------------+----------+----------+---------+----------+---------+---------------------+-------+---------+-------------+----------------+------------+---------+--------------------+-------------+-----------+-------------------+
|agencycode|agencyid|          agencyname|agencystartdate|basesalary|employeeid|firstname|fiscalyear| lastname|leavestatusasofjune30|othours| paybasis|payrollnumber|regulargrosspaid|regularhours|titlecode|    titledescription|totalotherpay|totalotpaid|worklocationborough|
+----------+--------+--------------------+---------------+----------+----------+---------+----------+---------+---------------------+-------+---------+-------------+----------------+------------+---------+--------------------+-------------+-----------+-------------------+
|         0|    2120|OFFICE OF EMERGEN...|           NULL|     86005|    149612| JONATHAN|      2020|    ROTTA|               ACTIVE|    0.0|per Annum|           17|        84698.21