In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[*]").appName("nyc").getOrCreate()

df = spark.read.option("header", True).csv("nyc-jobs.csv")


In [2]:
df = df.withColumn("salary_from", col("Salary Range From").cast("double")) \
       .withColumn("salary_to", col("Salary Range To").cast("double"))

df = df.withColumn("avg_salary", (col("salary_from") + col("salary_to")) / 2)


In [3]:
df = df.withColumn(
    "posting_year",
    regexp_extract(col("Posting Date"), r'(\d{4})', 1).cast("int")
)


In [4]:
df = df.withColumn(
    "has_degree",
    when(lower(col("Minimum Qual Requirements")).contains("degree"),1).otherwise(0)
)


In [5]:
df.select("Posting Date","posting_year").show(5)


26/02/17 21:22:11 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"cast" was called from
line 3 in cell [4]

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:147)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toIntExact(UTF8StringUtils.scala:34)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils.toIntExact(UTF8StringUtils.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expre

NumberFormatException: [CAST_INVALID_INPUT] The value '' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== DataFrame ==
"cast" was called from
line 3 in cell [4]


In [8]:
from pyspark.sql.functions import regexp_extract, when, length, col

df = df.withColumn(
    "posting_year_raw",
    regexp_extract(col("Posting Date"), r'(\d{4})', 1)
)

df = df.withColumn(
    "posting_year",
    when(length(col("posting_year_raw")) > 0, col("posting_year_raw").cast("int"))
    .otherwise(None)
)


In [9]:
df.show()

+------+--------------------+------------+--------------+--------------------+--------------------+-------------+-----+--------------------+-----------------------------+-----------------+---------------+----------------+--------------------+--------------------+--------------------+-------------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+-----------+---------+----------+------------+----------+----------------+
|Job ID|              Agency|Posting Type|# Of Positions|      Business Title| Civil Service Title|Title Code No|Level|        Job Category|Full-Time/Part-Time indicator|Salary Range From|Salary Range To|Salary Frequency|       Work Location|  Division/Work Unit|     Job Description|Minimum Qual Requirements|    Preferred Skills|Additional Information|            To Apply|      

In [10]:
recent = df.filter(col("posting_year") >= 2024)

recent.groupBy("Agency") \
      .agg(avg("avg_salary").alias("avg_salary_last_2yrs")) \
      .show()


+--------------------+--------------------+
|              Agency|avg_salary_last_2yrs|
+--------------------+--------------------+
|DEPT OF HEALTH/ME...|            74968.25|
|DEPT OF DESIGN & ...|             80526.5|
|CIVILIAN COMPLAIN...|             27.1355|
|DEPT OF ENVIRONME...|             89349.0|
|DEPT OF CITYWIDE ...|             78804.5|
|DEPARTMENT OF TRA...|            61358.25|
|DEPT OF INFO TECH...|            117470.0|
|DEPT OF YOUTH & C...|             54744.0|
|DEPARTMENT OF INV...|             63024.5|
|LANDMARKS PRESERV...|             60103.5|
|   POLICE DEPARTMENT|             54402.0|
+--------------------+--------------------+



In [12]:
skills_df = df.withColumn(
    "skill",
    explode(split(col("Preferred Skills"), ","))
)

skills_df = skills_df.withColumn("skill", trim(lower(col("skill"))))

top_skills = skills_df.groupBy("skill").agg(avg("avg_salary").alias("avg_salary")).orderBy(desc("avg_salary")).limit(10)

top_skills.show()

+--------------------+----------+
|               skill|avg_salary|
+--------------------+----------+
|he/she must be an...|  218587.0|
|and implement act...|  218587.0|
|   diagnose problems|  218587.0|
|and implementing ...|  218587.0|
|of which at least...|  218587.0|
|the following ski...|  218587.0|
|develop and retai...|  218587.0|
|city and state go...|  218587.0|
|continuous improv...|  218587.0|
|communication and...|  218587.0|
+--------------------+----------+



In [19]:
from pyspark.sql.functions import col, when, lower, regexp_extract

def clean_salary(df):
    df = df.withColumn("salary_from", col("Salary Range From").cast("double")) \
           .withColumn("salary_to", col("Salary Range To").cast("double"))
    df= df.withColumn("avg_salary", (col("salary_from") + col("salary_to")) / 2)
    
    return df

def extract_posting_year(df):
    df = df.withColumn("posting_year_raw",
                       regexp_extract(col("Posting Date"), r'(\d{4})', 1))
    df= df.withColumn(
        "posting_year",
        when(col("posting_year_raw") != "", col("posting_year_raw").cast("int"))
        .otherwise(None)
    )
    return df
    

def add_degree_flag(df):
    df=df.withColumn(
        "has_degree",
        when(lower(col("Minimum Qual Requirements")).contains("degree"),1).otherwise(0)
    )
    return df
    


In [20]:
df = df.withColumn(
    "salary_bucket",
    when(col("avg_salary") >= 100000, "High")
    .when(col("avg_salary") >= 50000, "Medium")
    .otherwise("Low")
).show()


{"ts": "2026-02-17 23:45:09.497", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `avg_salary` cannot be resolved. Did you mean one of the following? [`Agency`, `Level`, `Job ID`, `To Apply`, `Job Category`]. SQLSTATE: 42703", "context": {"file": "line 3 in cell [21]", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o226.withColumn.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `avg_salary` cannot be resolved. Did you mean one of the following? [`Agency`, `Level`, `Job ID`, `To Apply`, `Job Category`]. SQLSTATE: 42703;\n'Project [Job ID#649, Agency#650, Posting Type#651, # Of Positions#652, Business Title#653, Civil Service Title#654, Title Code No#655, Level#656, Job Category#657, Fu

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `avg_salary` cannot be resolved. Did you mean one of the following? [`Agency`, `Level`, `Job ID`, `To Apply`, `Job Category`]. SQLSTATE: 42703;
'Project [Job ID#649, Agency#650, Posting Type#651, # Of Positions#652, Business Title#653, Civil Service Title#654, Title Code No#655, Level#656, Job Category#657, Full-Time/Part-Time indicator#658, Salary Range From#659, Salary Range To#660, Salary Frequency#661, Work Location#662, Division/Work Unit#663, Job Description#664, Minimum Qual Requirements#665, Preferred Skills#666, Additional Information#667, To Apply#668, Hours/Shift#669, Work Location 1#670, Recruitment Contact#671, Residency Requirement#672, Posting Date#673, ... 4 more fields]
+- Relation [Job ID#649,Agency#650,Posting Type#651,# Of Positions#652,Business Title#653,Civil Service Title#654,Title Code No#655,Level#656,Job Category#657,Full-Time/Part-Time indicator#658,Salary Range From#659,Salary Range To#660,Salary Frequency#661,Work Location#662,Division/Work Unit#663,Job Description#664,Minimum Qual Requirements#665,Preferred Skills#666,Additional Information#667,To Apply#668,Hours/Shift#669,Work Location 1#670,Recruitment Contact#671,Residency Requirement#672,Posting Date#673,... 3 more fields] csv


In [22]:
df = df.drop("Job Description")


In [23]:
df.write.mode("overwrite").csv("processed_jobs")


In [None]:
def get_salary_frequency(df: DataFrame) -> list:
    row_list = df.select('Salary Frequency').distinct().collect()
    return [row['Salary Frequency'] for row in row_list]

In [None]:
def test_get_salary_frequency(mock_data: list, 
                              expected_result: list,
                              schema: list = ['id', 'Salary Frequency']):  
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert get_salary_frequency(mock_df) == expected_result

In [48]:
assert df.filter(col("avg_salary").isNull()).count() == 0


ConnectionRefusedError: [Errno 61] Connection refused