 Analysis of source data


In [5]:
!which spark-submit
!echo $SPARK_HOME

#verified if pysaprk is installed in juypter or not

/opt/spark


In [6]:
import findspark
findspark.init()
import pyspark
print(pyspark.__version__)

#check the availabilty for pysaprk

2.4.5


In [7]:
from pyspark.sql import SparkSession  # Entry point for DataFrame API and Spark SQL
from pyspark.sql.functions import *  # SQL-like functions: col(), lit(), when(), agg(), date functions
from pyspark.sql.types import *      # Spark SQL data types: StructType, IntegerType, StringType, etc.
from datetime import datetime        # Python standard datetime for Spark UDFs and filtering

In [69]:
# Initialize SparkSession - connects to cluster at spark://master:7077
# appName appears in Spark UI (localhost:4040) for monitoring
spark = SparkSession.builder\
        .appName("Newyork_jobs_analysis")\
        .getOrCreate()

# Load NYC Jobs CSV from mounted dataset volume (/dataset → ./dataset/)
# header=True: First row = column names
# inferSchema=True: Auto-detects data types (int, string, date)
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True, inferSchema=True)

# Display DataFrame schema (column names + data types)
df.printSchema()

# Show first 5 rows - validates data loaded correctly
df.show(5)

# Statistical summary - count, mean, stddev, min, max for numeric columns
df.describe()
                

root
 |-- Job ID: integer (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Posting Type: string (nullable = true)
 |-- # Of Positions: integer (nullable = true)
 |-- Business Title: string (nullable = true)
 |-- Civil Service Title: string (nullable = true)
 |-- Title Code No: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Job Category: string (nullable = true)
 |-- Full-Time/Part-Time indicator: string (nullable = true)
 |-- Salary Range From: double (nullable = true)
 |-- Salary Range To: double (nullable = true)
 |-- Salary Frequency: string (nullable = true)
 |-- Work Location: string (nullable = true)
 |-- Division/Work Unit: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Minimum Qual Requirements: string (nullable = true)
 |-- Preferred Skills: string (nullable = true)
 |-- Additional Information: string (nullable = true)
 |-- To Apply: string (nullable = true)
 |-- Hours/Shift: string (nullable = true)
 |-- Work Locat

DataFrame[summary: string, Job ID: string, Agency: string, Posting Type: string, # Of Positions: string, Business Title: string, Civil Service Title: string, Title Code No: string, Level: string, Job Category: string, Full-Time/Part-Time indicator: string, Salary Range From: string, Salary Range To: string, Salary Frequency: string, Work Location: string, Division/Work Unit: string, Job Description: string, Minimum Qual Requirements: string, Preferred Skills: string, Additional Information: string, To Apply: string, Hours/Shift: string, Work Location 1: string, Recruitment Contact: string, Residency Requirement: string, Posting Date: string, Post Until: string, Posting Updated: string, Process Date: string]

DATA CLEANING FUNCTIONS

In [70]:
# Function to clean column names for programmatic SQL queries and downstream systems
# Converts: "Agency Name" → "agency_name", "Civil Service Title" → "civil_service_title"
def clean_columns(df):
    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, col_name.replace(" ", "_"))
    return df

# Apply column cleaning - single optimized transformation pass
df = clean_columns(df)
column_names = df.columns
print(column_names)


['Job_ID', 'Agency', 'Posting_Type', '#_Of_Positions', 'Business_Title', 'Civil_Service_Title', 'Title_Code_No', 'Level', 'Job_Category', 'Full-Time/Part-Time_indicator', 'Salary_Range_From', 'Salary_Range_To', 'Salary_Frequency', 'Work_Location', 'Division/Work_Unit', 'Job_Description', 'Minimum_Qual_Requirements', 'Preferred_Skills', 'Additional_Information', 'To_Apply', 'Hours/Shift', 'Work_Location_1', 'Recruitment_Contact', 'Residency_Requirement', 'Posting_Date', 'Post_Until', 'Posting_Updated', 'Process_Date']


In [71]:
from pyspark.sql.functions import col, sum as spark_sum

null_counts = df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
])

null_counts.count()

1

PROCESSING DATE FUNCTION:

In [72]:
# Transform raw posting_date string → structured date components
# Enables time-based analytics: monthly trends, yearly aggregation, filtering
from pyspark.sql.functions import to_date, year, month, col, lower
def process_dates(df):
    # Convert string date → Spark DateType using format "MM/dd/yyyy"
    df = df.withColumn("process_date", trim(col("posting_updated")))
    df = df.withColumn("processed_date", to_date(col("process_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))

    
    # Extract year for yearly aggregation (groupBy("year").count())
    df = df.withColumn("year", year(col("processed_date")))
    
    # Extract month for monthly trends (groupBy("year", "month").sum())
    df = df.withColumn("month", month(col("processed_date")))
    
    return df

# Apply date transformations - creates temporal features for analysis
df = process_dates(df)
df_check = df.select('processed_date','year','month')
df_check.show(10)


+--------------+----+-----+
|processed_date|year|month|
+--------------+----+-----+
|          null|null| null|
|    2012-01-26|2012|    1|
|          null|null| null|
|          null|null| null|
|    2014-01-08|2014|    1|
|    2014-01-08|2014|    1|
|          null|null| null|
|    2014-07-25|2014|    7|
|          null|null| null|
|    2014-06-26|2014|    6|
+--------------+----+-----+
only showing top 10 rows



In [73]:

from pyspark.sql.functions import to_date, year, month, col, trim

df = df.withColumn(
    "process_date_clean", 
    to_date(col("process_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
)

df = df.withColumn(
    "posting_date_clean", 
    to_date(col("posting_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
)

df = df.withColumn(
    "posting_updated_clean", 
    to_date(col("posting_updated"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
)

# Extract year/month
df = df.withColumn("year", year(col("process_date_clean")))
df = df.withColumn("month", month(col("process_date_clean")))

print("ALL COLUMNS:", df.columns)
df.select(
    "process_date", 
    "process_date_clean", 
    "posting_date", 
    "posting_date_clean", 
    "posting_updated", 
    "posting_updated_clean",
    "year", 
    "month"
).show(10, truncate=False)

ALL COLUMNS: ['Job_ID', 'Agency', 'Posting_Type', '#_Of_Positions', 'Business_Title', 'Civil_Service_Title', 'Title_Code_No', 'Level', 'Job_Category', 'Full-Time/Part-Time_indicator', 'Salary_Range_From', 'Salary_Range_To', 'Salary_Frequency', 'Work_Location', 'Division/Work_Unit', 'Job_Description', 'Minimum_Qual_Requirements', 'Preferred_Skills', 'Additional_Information', 'To_Apply', 'Hours/Shift', 'Work_Location_1', 'Recruitment_Contact', 'Residency_Requirement', 'Posting_Date', 'Post_Until', 'Posting_Updated', 'process_date', 'processed_date', 'year', 'month', 'process_date_clean', 'posting_date_clean', 'posting_updated_clean']
+---------------------------------------+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Average salary calculation

In [77]:
def process_salary(df):
    """
    Compute avg_salary = (salary_range_from + salary_range_to) / 2
    and return the values as a dataframe
    """
    df = df.withColumn(
        "avg_salary",
        (col("salary_range_from") + col("salary_range_to")) / 2
    )
    return df

df = process_salary(df)
#df.show(5)
df_avg_salary = df.select('salary_range_from','salary_range_to','avg_salary')
print(df_avg_salary)
df_avg_salary.show(5)

DataFrame[salary_range_from: double, salary_range_to: double, avg_salary: double]
+-----------------+---------------+----------+
|salary_range_from|salary_range_to|avg_salary|
+-----------------+---------------+----------+
|          42405.0|        65485.0|   53945.0|
|          60740.0|       162014.0|  111377.0|
|         51907.68|       54580.32|   53244.0|
|         51907.68|       54580.32|   53244.0|
|             35.0|           35.0|      35.0|
+-----------------+---------------+----------+
only showing top 5 rows



KPI 1  -- NUMBER OF JOBS PER CATEGORY

In [17]:
top_jobs_category = df.groupBy("job_category")\
                    .count()\
                    .orderBy(desc("count"))\
                    .limit(10)

top_jobs_category.show(5)

+--------------------+-----+
|        job_category|count|
+--------------------+-----+
|Engineering, Arch...|  504|
|Technology, Data ...|  313|
|       Legal Affairs|  226|
|Public Safety, In...|  182|
|Building Operatio...|  181|
+--------------------+-----+
only showing top 5 rows



KPI 2 -- SALARY DISTRIBUTION PER JOB CATEGORY

In [19]:
salary_by_cateogry = df.groupBy("job_category")\
                        .agg(avg("avg_salary").alias("avg_salary"))\
                        .orderBy(desc("avg_salary"))
salary_by_cateogry.show(5)

+--------------------+----------+
|        job_category|avg_salary|
+--------------------+----------+
|Administration & ...|  218587.0|
|Engineering, Arch...|  198518.0|
|Engineering, Arch...|  196042.5|
|Health Policy, Re...|  128694.5|
|Engineering, Arch...|  128247.5|
+--------------------+----------+
only showing top 5 rows



KPI 3 -- Correlation Between Degree & Salary

In [28]:
def extract_degree():
    """
    Extract highest degree from minimum qualifications text using fuzzy matching
    Creates categorical + ordinal features for ML modeling
    """
    return (when(lower(col("minimum_qual_requirements")).contains("phd"), "PhD")
            .when(lower(col("minimum_qual_requirements")).contains("master") , "Masters")
            .when(lower(col("minimum_qual_requirements")).contains("bachelor"), "Bachelors")
            .otherwise("No Degree"))

# Extract degree level from job requirements text (NLP-like feature engineering)
df = df.withColumn("degree_level", extract_degree())

# Create ordinal encoding for ML (PhD=3 > Masters=2 > Bachelors=1 > No Degree=0)
df = df.withColumn(
    "degree_encoded",
    when(col("degree_level") == "PhD", 3)
    .when(col("degree_level") == "Masters", 2)
    .when(col("degree_level") == "Bachelors", 1)
    .otherwise(0)
)

# Preview transformation
df.select("degree_level", "degree_encoded").show()

+------------+--------------+
|degree_level|degree_encoded|
+------------+--------------+
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|     Masters|             2|
|   No Degree|             0|
|   No Degree|             0|
|     Masters|             2|
|     Masters|             2|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
|   No Degree|             0|
+------------+--------------+
only showing top 20 rows



AttributeError: 'NoneType' object has no attribute 'desc'

KPI 4 -- job posting having the highest salary per agency

In [33]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, desc

w = Window.partitionBy("agency").orderBy(desc("avg_salary"))

highest_salary_per_agency = (
    df.withColumn("rank", row_number().over(w))
      .filter(col("rank") == 1)
      .select("agency", "business_title", "avg_salary")
)

highest_salary_per_agency.show(5)

+--------------------+--------------------+----------+
|              agency|      business_title|avg_salary|
+--------------------+--------------------+----------+
|LANDMARKS PRESERV...|LANDMARKS PRESERV...|   60103.5|
|OFFICE OF COLLECT...|COLLEGE AIDE - CL...|     9.555|
|     FIRE DEPARTMENT|Senior Enterprise...|  120474.5|
|ADMIN FOR CHILDRE...|Assistant Commiss...|  117474.5|
|MANHATTAN COMMUNI...| Community Assistant|      19.0|
+--------------------+--------------------+----------+
only showing top 5 rows



KPI 5 --  job positings average salary per agency for the last 2 yearS

In [78]:
from datetime import datetime

two_years_ago = datetime.now().year - 2

df_recent = df.filter(col("year") >= two_years_ago)

avg_salary_last_2_years = (
    df_recent.groupBy("agency")\
             .agg(avg("avg_salary").alias("avg_salary"))\
             .orderBy(desc("avg_salary"))
)

avg_salary_last_2_years.show()

+------+----------+
|agency|avg_salary|
+------+----------+
+------+----------+



In [79]:
#to check why the above KPI is returning zero. we are not having data for previous years 

years_present_in_data = df.select('year')

years_present_in_data.show()

+----+
|year|
+----+
|null|
|2012|
|null|
|null|
|2014|
|2014|
|null|
|2014|
|null|
|2014|
|null|
|null|
|null|
|2014|
|2014|
|2014|
|null|
|null|
|2014|
|null|
+----+
only showing top 20 rows



KPI 6 -- highest paid skills in the US market

In [None]:
#for this KPI I understand this can be done with Regex. As data doesnt have 
#skills column we can do it with the help of 2 different columns which is skills preferred and job desciption by exploding the values of it 

FEATURE REMOVAL

In [80]:
columns_to_drop = [
    "processed_date", 
    "year",
    "month",
    "process_date_clean", 
    "posting_date_clean", 
    "posting_updated_clean"
]

df_final = df.drop(*columns_to_drop)

WRITE TO FINAL TABLE

In [81]:
df_final.write.mode("overwrite").parquet("nyc_jobs_processed")