In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## SILVER LAYER SCRIPTS

#### DATA ACCESS USING APP

In [0]:
# spark.conf.set("fs.azure.account.auth.type.<storage-account>.dfs.core.windows.net", "OAuth")
# spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
# spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", "<application-id>")
# spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
# spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# <storage-account> --> Provide the storage account name.

# <application-id> --> from Microsoft Entra Application.

# service_credential --> from Microsoft Entra Application << certificates & secret << values.

In [0]:
spark.conf.set("fs.azure.account.auth.type.nycjobsadls.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.nycjobsadls.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.nycjobsadls.dfs.core.windows.net", "1e5d48c4-0712-45d9-a098-c8e082e6f53a")
spark.conf.set("fs.azure.account.oauth2.client.secret.nycjobsadls.dfs.core.windows.net", "TTv8Q~hjJdyAvGp7IsZFMMDprPijGjlsOt4dIbpr")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.nycjobsadls.dfs.core.windows.net", "https://login.microsoftonline.com/ec15726e-765c-4f10-9e63-9e1964529582/oauth2/token")

#### DATA LOADING

In [0]:
df_nyc = spark.read.format("csv")\
            .option("header", True)\
            .option("inferSchema",True)\
            .load('abfss://bronze@nycjobsadls.dfs.core.windows.net/Dataset')

In [0]:
df_nyc.display()

In [0]:
df_nyc.printSchema()

In [0]:
df_nyc.columns

['Job ID',
 'Agency',
 'Posting Type',
 '# Of Positions',
 'Business Title',
 'Civil Service Title',
 'Title Classification',
 'Title Code No',
 'Level',
 'Job Category',
 'Full-Time/Part-Time indicator',
 'Career Level',
 'Salary Range From',
 'Salary Range To',
 'Salary Frequency',
 'Work Location',
 'Division/Work Unit',
 'Job Description',
 'Minimum Qual Requirements',
 'Preferred Skills',
 'Additional Information',
 'To Apply',
 'Hours/Shift',
 'Work Location 1',
 'Recruitment Contact',
 'Residency Requirement',
 'Posting Date',
 'Post Until',
 'Posting Updated',
 'Process Date']

In [0]:
df_nyc.count()

6743

##Data Cleaning Steps:

####Handling Missing Values:

In [0]:
# Standardize column names:
# Convert all column names to lowercase and replace spaces with underscores.
df_nyc = df_nyc.toDF(*[col.lower().replace(" ", "_") for col in df_nyc.columns])
df_nyc.display()

In [0]:
# Finding the count of missing values in each column.
col_null_cnt_df =  df_nyc.select([count(when(col(c).isNull(),c)).alias(c) for c in df_nyc.columns])
display(col_null_cnt_df)

job_id,agency,posting_type,#_of_positions,business_title,civil_service_title,title_classification,title_code_no,level,job_category,full-time/part-time_indicator,career_level,salary_range_from,salary_range_to,salary_frequency,work_location,division/work_unit,job_description,minimum_qual_requirements,preferred_skills,additional_information,to_apply,hours/shift,work_location_1,recruitment_contact,residency_requirement,posting_date,post_until,posting_updated,process_date
0,0,0,0,0,0,0,0,0,0,270,0,0,0,0,0,0,0,50,1212,1726,0,4377,3985,6743,0,0,4390,0,0


In [0]:
#`full-time/part-time_indicator` (270 missing) → Fill with "Unknown"
df_nyc = df_nyc.fillna({'full-time/part-time_indicator': 'Unknown'})

In [0]:
# Varifying the count of missing values in "full-time/part-time_indicator" column
null_count = df_nyc.filter(col("full-time/part-time_indicator").isNull()).count()
display(null_count)

0

In [0]:
df_nyc.display()

In [0]:
# `Minimum Qual Requirements` (50 missing) → Fill with "Not specified."
df_nyc = df_nyc.fillna({'minimum_qual_requirements': 'Not specified.'})

In [0]:
# `Preferred Skills` (1212 missing) → Fill with "Not specified."
df_nyc = df_nyc.fillna({'preferred_skills':'Not specified.'})

In [0]:
# `Additional Information` (1726 missing) → Fill with "Not specified."
df_nyc = df_nyc.fillna({'additional_information':'Not specified.'})

In [0]:
# `Hours/Shift` (4377 missing) → Fill with "Not specified."
df_nyc = df_nyc.fillna({'hours/shift':'Not specified.'})

In [0]:
# `Work Location 1` (3985 missing) → Fill with `Work Location` column if available.
df_nyc = df_nyc.withColumn("work_location_1", coalesce(col("work_location_1"), col("work_location")))

In [0]:
# `Post Until` (4390 missing) → Fill with "Open until filled."
df_nyc = df_nyc.fillna({'post_until':'Open until filled.'})

In [0]:
# `Recruitment Contact` (6743 missing) → Drop this column as all values are missing.
df_nyc = df_nyc.drop('recruitment_contact')
df_nyc.display()

####Remove Duplicates:

In [0]:
# Counting the number of duplicate records.
unique_count = df_nyc.distinct().count()
display(unique_count)

6664

In [0]:
total_count = df_nyc.count()
display(total_count)

6743

In [0]:
display(total_count-unique_count)

79

In [0]:
# Removing duplicate records
df_nyc = df_nyc.dropDuplicates()
count = df_nyc.count()
display(count)

6664

####Fix the DataTypes

In [0]:
# Converting posting_date, posting_updated, and process_date columns to date type and in same format.
date_columns = ['posting_date', 'posting_updated', 'process_date']
for column in date_columns:
    df_nyc = df_nyc.withColumn(column, to_date(col(column), "MM/dd/yyyy"))

df_nyc.display()

In [0]:
df_nyc.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- agency: string (nullable = true)
 |-- posting_type: string (nullable = true)
 |-- #_of_positions: integer (nullable = true)
 |-- business_title: string (nullable = true)
 |-- civil_service_title: string (nullable = true)
 |-- title_classification: string (nullable = true)
 |-- title_code_no: string (nullable = true)
 |-- level: string (nullable = true)
 |-- job_category: string (nullable = true)
 |-- full-time/part-time_indicator: string (nullable = false)
 |-- career_level: string (nullable = true)
 |-- salary_range_from: double (nullable = true)
 |-- salary_range_to: double (nullable = true)
 |-- salary_frequency: string (nullable = true)
 |-- work_location: string (nullable = true)
 |-- division/work_unit: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- minimum_qual_requirements: string (nullable = false)
 |-- preferred_skills: string (nullable = false)
 |-- additional_information: string (nullable = false

In [0]:
# `Salary Range From` and `Salary Range To` should be floats but check for anomalies (e.g., negative values)
salaries = ['salary_range_from','salary_range_to']
for sal in salaries:
    df_nyc = df_nyc.withColumn(sal,col(sal).cast("float"))

df_nyc.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- agency: string (nullable = true)
 |-- posting_type: string (nullable = true)
 |-- #_of_positions: integer (nullable = true)
 |-- business_title: string (nullable = true)
 |-- civil_service_title: string (nullable = true)
 |-- title_classification: string (nullable = true)
 |-- title_code_no: string (nullable = true)
 |-- level: string (nullable = true)
 |-- job_category: string (nullable = true)
 |-- full-time/part-time_indicator: string (nullable = false)
 |-- career_level: string (nullable = true)
 |-- salary_range_from: float (nullable = true)
 |-- salary_range_to: float (nullable = true)
 |-- salary_frequency: string (nullable = true)
 |-- work_location: string (nullable = true)
 |-- division/work_unit: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- minimum_qual_requirements: string (nullable = false)
 |-- preferred_skills: string (nullable = false)
 |-- additional_information: string (nullable = false)


####Data Quality Check:

In [0]:
df_nyc.filter(df_nyc.salary_range_from < 0).display()

job_id,agency,posting_type,#_of_positions,business_title,civil_service_title,title_classification,title_code_no,level,job_category,full-time/part-time_indicator,career_level,salary_range_from,salary_range_to,salary_frequency,work_location,division/work_unit,job_description,minimum_qual_requirements,preferred_skills,additional_information,to_apply,hours/shift,work_location_1,residency_requirement,posting_date,post_until,posting_updated,process_date


In [0]:
df_nyc.filter(df_nyc.salary_range_to < 0).display()

job_id,agency,posting_type,#_of_positions,business_title,civil_service_title,title_classification,title_code_no,level,job_category,full-time/part-time_indicator,career_level,salary_range_from,salary_range_to,salary_frequency,work_location,division/work_unit,job_description,minimum_qual_requirements,preferred_skills,additional_information,to_apply,hours/shift,work_location_1,residency_requirement,posting_date,post_until,posting_updated,process_date


In [0]:
# Check for `Salary Range From` > `Salary Range To`, swap if needed.
df_nyc.filter(df_nyc.salary_range_from > df_nyc.salary_range_to).display()

job_id,agency,posting_type,#_of_positions,business_title,civil_service_title,title_classification,title_code_no,level,job_category,full-time/part-time_indicator,career_level,salary_range_from,salary_range_to,salary_frequency,work_location,division/work_unit,job_description,minimum_qual_requirements,preferred_skills,additional_information,to_apply,hours/shift,work_location_1,residency_requirement,posting_date,post_until,posting_updated,process_date


In [0]:
# Distinct values in a specific column.
df_nyc.select(col('salary_frequency')).distinct().show()

+----------------+
|salary_frequency|
+----------------+
|          Annual|
|           Daily|
|          Hourly|
+----------------+



In [0]:
df_nyc.display()

In [0]:
#Create a **salary difference** column: `salary_range_to - salary_range_from` to analyze pay scale variations.
df_nyc = df_nyc.withColumn("salary_difference", col("salary_range_to") - col("salary_range_from"))
df_nyc.display()

In [0]:
# Create a **median salary** column: `(salary_range_from + salary_range_to) / 2` for easy salary comparisons.
df_nyc = df_nyc.withColumn("median_salary",col("salary_range_from")+col("salary_range_to")/2)
df_nyc.display()

In [0]:
df_nyc.select(col('career_level')).distinct().show()

+--------------------+
|        career_level|
+--------------------+
|             Student|
|           Executive|
|         Entry-Level|
|             Manager|
|Experienced (non-...|
+--------------------+



In [0]:
df_nyc.groupby(df_nyc.job_category).count().withColumnRenamed("count","vacent_position").orderBy("vacent_position", ascending=False).display()

job_category,vacent_position
"Engineering, Architecture, & Planning",729
"Technology, Data & Innovation",546
Health,477
"Finance, Accounting, & Procurement",355
Legal Affairs,353
Social Services,349
Administration & Human Resources,314
Building Operations & Maintenance,307
"Constituent Services & Community Programs Health Policy, Research & Analysis",291
Constituent Services & Community Programs,269


In [0]:
# Group by agency** to find which agency has the most openings.
df_nyc.groupby(df_nyc.agency).count().withColumnRenamed("count","vacent_position").orderBy("vacent_position", ascending=False).display()

agency,vacent_position
DEPT OF HEALTH/MENTAL HYGIENE,1214
DEPT OF ENVIRONMENT PROTECTION,917
HRA/DEPT OF SOCIAL SERVICES,645
DEPARTMENT OF TRANSPORTATION,492
DEPT OF DESIGN & CONSTRUCTION,372
NYC HOUSING AUTHORITY,363
ADMIN FOR CHILDREN'S SVCS,240
DEPARTMENT OF CORRECTION,194
LAW DEPARTMENT,164
TAXI & LIMOUSINE COMMISSION,155


In [0]:
print(df_nyc.rdd.getNumPartitions())

4


In [0]:
df_nyc = df_nyc.drop('minimum_qual_requirements')
df_nyc.display()

In [0]:
df_nyc = df_nyc.drop('job_description','preferred_skills','additional_information','to_apply')
df_nyc.display()

In [0]:
df_nyc.columns

['job_id',
 'agency',
 'posting_type',
 '#_of_positions',
 'business_title',
 'civil_service_title',
 'title_classification',
 'title_code_no',
 'level',
 'job_category',
 'full-time/part-time_indicator',
 'career_level',
 'salary_range_from',
 'salary_range_to',
 'salary_frequency',
 'work_location',
 'division/work_unit',
 'hours/shift',
 'work_location_1',
 'residency_requirement',
 'posting_date',
 'post_until',
 'posting_updated',
 'process_date',
 'salary_difference',
 'median_salary']

In [0]:
df_nyc.repartition(1).write.format('parquet')\
                .mode('overwrite')\
                .option('path', 'abfss://silver@nycjobsadls.dfs.core.windows.net/Dataset')\
                .save()
