In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder. \
    appName("pyspark-1"). \
    getOrCreate()

### Read data

In [None]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True)
df.printSchema()

### Sample function

In [None]:
def get_salary_frequency(df: DataFrame) -> list:
    row_list = df.select('Salary Frequency').distinct().collect()
    return [row['Salary Frequency'] for row in row_list]

### Example of test function

In [None]:
mock_data = [('A', 'Annual'), ('B', 'Daily')]
expected_result = ['Annual', 'Daily']

In [None]:
def test_get_salary_frequency(mock_data: list, 
                              expected_result: list,
                              schema: list = ['id', 'Salary Frequency']):  
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    assert get_salary_frequency(mock_df) == expected_result

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NYC Jobs Analysis").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


In [None]:
import os
os.listdir()


In [None]:
!ls

In [None]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)

In [None]:
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()


In [None]:
df = df.withColumn("salary_diff", col("salary_to") - col("salary_from"))


In [None]:
from pyspark.sql.functions import year, to_date

df = df.withColumn("posting_date", to_date(col("Posting Date"), "MM/dd/yyyy"))

df = df.withColumn("posting_year", year(col("posting_date"))
)


In [None]:
from pyspark.sql.functions import lower

df = df.withColumn("requires_masters", when(lower(col("Minimum Qual Requirements")).contains("master"), 1).otherwise(0))


In [None]:
top_categories = df.groupBy("Job Category").count().orderBy(col("count").desc()).limit(10)
top_categories.show()


In [None]:
salary_by_category = df.groupBy("Job Category").agg({"avg_salary": "avg"}).withColumnRenamed("avg(avg_salary)", "avg_salary").orderBy(col("avg_salary").desc())

salary_by_category.show()


In [None]:
highest_salary_agency = df.groupBy("Agency").agg({"avg_salary": "max"}).withColumnRenamed("max(avg_salary)", "max_salary").orderBy(col("max_salary").desc())

highest_salary_agency.show()


In [None]:
recent_salary = df.filter(col("posting_year") >= 2024).groupBy("Agency").agg({"avg_salary": "avg"}).withColumnRenamed("avg(avg_salary)", "avg_salary_last_2_years")

recent_salary.show()


In [None]:
df = df.withColumnRenamed("posting date", "posting_date")

In [None]:
df = df.drop("posting date")


In [None]:
df = df.drop("posting_date")

In [None]:
df.columns


In [None]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True, inferSchema=True)


In [None]:
df = df.toDF(*[c.strip().lower().replace(" ", "_")for c in df.columns])


In [None]:
from pyspark.sql.functions import to_timestamp, year, col

df = df.withColumn(
    "posting_timestamp",
    to_timestamp(col("posting_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
)

df = df.withColumn(
    "posting_year",
    year(col("posting_timestamp"))
)


In [None]:
df.select("posting_year").distinct().orderBy("posting_year").show()


In [None]:
top_categories = df.groupBy("Job_Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

top_categories.show()


In [None]:
from pyspark.sql.functions import col

df = df.withColumn(
    "avg_salary",
    (col("salary_range_from") + col("salary_range_to")) / 2
)


In [None]:
salary_by_category = df.groupBy("Job_Category").agg({"avg_salary": "avg"}).withColumnRenamed("avg(avg_salary)", "avg_salary") \
    .orderBy(col("avg_salary").desc())
salary_by_category.show()

In [None]:
top_categories = df.groupBy("Job_Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

top_categories.show()


In [None]:
salary_by_category = df.groupBy("Job_Category") \
    .agg({"avg_salary": "avg"}) \
    .withColumnRenamed("avg(avg_salary)", "avg_salary") \
    .orderBy(col("avg_salary").desc())

salary_by_category.show()

In [None]:
highest_salary_agency = df.groupBy("Agency") \
    .agg({"avg_salary": "max"}) \
    .withColumnRenamed("max(avg_salary)", "max_salary") \
    .orderBy(col("max_salary").desc())

highest_salary_agency.show()


In [None]:
recent_salary = df.filter(col("posting_year") >= 2024) \
    .groupBy("Agency") \
    .agg({"avg_salary": "avg"}) \
    .withColumnRenamed("avg(avg_salary)", "avg_salary_last_2_years")

recent_salary.show()


In [None]:
from pyspark.sql.functions import max

max_year = df.select(max("posting_year")).collect()[0][0]

recent_salary = df.filter(col("posting_year") >= (max_year - 1)).groupBy("Agency").agg({"avg_salary": "avg"}).withColumnRenamed("avg(avg_salary)", "avg_salary_last_2_years")

recent_salary.show()

In [None]:
from pyspark.sql.functions import explode, split

skills_df = df.withColumn(
    "skill",
    explode(split(col("Preferred_Skills"), ","))
)

highest_skills = skills_df.groupBy("skill").agg({"avg_salary": "avg"}).withColumnRenamed("avg(avg_salary)", "avg_salary").orderBy(col("avg_salary").desc())

highest_skills.show()


In [None]:
import matplotlib.pyplot as plt

top_cat_pd = top_categories.toPandas()

plt.figure(figsize=(10,6))
plt.bar(top_cat_pd["Job_Category"], top_cat_pd["count"])
plt.xticks(rotation=90)
plt.title("Top 10 Job Categories")
plt.show()


In [None]:
df.write.mode("overwrite") \
    .option("header", True) \
    .csv("output/processed_nyc_jobs.csv")


In [None]:
!ls output
