### <font color="#1F618D"> Newyork City Job Postings - Data Engineering Challenge </font>
### <font color="#F5B041"> Part-2: Feature Engineering </font>

In [28]:
import findspark

findspark.init()

import jupyter_black

jupyter_black.load()

In [29]:
from pyspark.sql.functions import (
    col,
    concat_ws,
    regexp_replace,
    split,
    lit,
    lower,
    explode,
    count,
)
import re
from pyspark.sql.types import ArrayType

import nltk
from nltk.stem import WordNetLemmatizer

from pyspark.sql.functions import (
    col,
    regexp_replace,
    lower,
    split,
    udf,
    avg,
    explode,
    count,
)
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import NGram

import pyspark.sql.functions as f
from pyspark.sql.functions import col, when, udf
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.types import StringType
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession

In [33]:
from nltk.stem import WordNetLemmatizer

# Initialize NLTK lemmatizer
nltk.download("wordnet")
lemmatizer = WordNetLemmatizer()

[nltk_data] Error loading wordnet: <urlopen error [Errno 0] Error>


In [34]:
spark = SparkSession.builder.appName("NYCJobsFeatureEngineering").getOrCreate()

In [32]:
df = spark.read.csv(
    "../../dataset/raw_data/nyc-jobs.csv", header=True, inferSchema=True, escape='"'
)

### <font color="#1F618D"> 1. Feature Extraction - </font> <font color="#F5B041"> Annual Salary </font>

In [35]:
def standardize_annual_salary(df: DataFrame) -> DataFrame:
    """
    Standardize salary range columns to annual salary.

    Parameters:
        df (DataFrame): The input DataFrame containing salary information.

    Returns:
        DataFrame: The DataFrame with standardized annual salary columns.
    """
    workhours_per_day = 8
    workdays_per_week = 5
    workweeks_per_year = 52

    workdays_per_year = workdays_per_week * workweeks_per_year
    workhours_per_year = workhours_per_day * workdays_per_year

    df = df.withColumn(
        "AnnualSalaryFrom",
        when(
            col("Salary Frequency") == "Daily",
            col("Salary Range From") * workdays_per_year,
        )
        .when(
            col("Salary Frequency") == "Hourly",
            col("Salary Range From") * workhours_per_year,
        )
        .otherwise(col("Salary Range From")),
    )

    df = df.withColumn(
        "AnnualSalaryTo",
        when(
            col("Salary Frequency") == "Daily",
            col("Salary Range To") * workdays_per_year,
        )
        .when(
            col("Salary Frequency") == "Hourly",
            col("Salary Range To") * workhours_per_year,
        )
        .otherwise(col("Salary Range To")),
    )

    return df

In [36]:
df = standardize_annual_salary(df)

In [37]:
## Extracting average annual salary feature

df = df.withColumn(
    "AverageAnnualSalary", (col("AnnualSalaryFrom") + col("AnnualSalaryTo")) / 2
)

### <font color="#1F618D"> 2. Feature Extraction - </font>  <font color="#F5B041"> Degree List & Highest Degree </font> 

In [38]:
import re
from typing import List, Optional

In [39]:
# Define keywords for degrees
keywords = [
    "master",
    "phd",
    "pg",
    "post graduate",
    "baccalaureate",
    "diploma",
    "high school",
]

# Define the priority order for degrees
degree_priority = [
    "phd",
    "master",
    "post graduate",
    "pg",
    "baccalaureate",
    "diploma",
    "high school",
]


def build_regex(keywords: List[str]) -> str:
    """
    Build a regular expression pattern to match keywords in a line.

    Parameters:
        keywords (list of str): List of keywords to build the regex pattern for.

    Returns:
        str: The regex pattern to match the keywords.
    """
    res = "("
    for key in keywords:
        res += "\\b" + key + "\\b|"
    res = res[0 : len(res) - 1] + ")"
    return res


def get_matching_string(line: str, regex: str) -> List[str]:
    """
    Find all matches of a regex pattern in a line.

    Parameters:
        line (str): The input line to search for matches.
        regex (str): The regex pattern to search for.

    Returns:
        list of str: List of matching strings found in the line.
    """
    matches = re.findall(regex, line)
    return matches if matches else []


def get_highest_degree(degrees: List[str]) -> Optional[str]:
    """
    Get the highest priority degree from a list of degrees.

    Parameters:
        degrees (list of str): List of degrees to choose from.

    Returns:
        str or None: The highest priority degree or None if no valid degree found.
    """

    if degrees:
        for degree in degree_priority:
            if degree in degrees:
                return degree
    return None


def safe_get_matching_string(line, regex):
    if isinstance(line, (str, bytes)):
        return get_matching_string(line, regex)
    else:
        return []


get_degree_list_udf = udf(
    lambda line, regex: safe_get_matching_string(line, regex), ArrayType(StringType())
)

df = df.withColumn(
    "degrees",
    get_degree_list_udf(df["Minimum Qual Requirements"], lit(build_regex(keywords))),
)

get_highest_degree_udf = udf(get_highest_degree, StringType())

df = df.withColumn("HighestDegree", get_highest_degree_udf(col("degrees")))

In [40]:
df.select("degrees", "HighestDegree").show(4, truncate=False)

+-------------------------------------+-------------+
|degrees                              |HighestDegree|
+-------------------------------------+-------------+
|[baccalaureate]                      |baccalaureate|
|[baccalaureate, high school, diploma]|baccalaureate|
|[high school]                        |high school  |
|[high school]                        |high school  |
+-------------------------------------+-------------+
only showing top 4 rows



### <font color="#1F618D"> 3. Feature Extraction - </font>  <font color="#F5B041"> Skills as ngrams</font> 

In [41]:
# Define a UDF for lemmatization using NLTK
def lemmatize_words(words):
    if words is not None:
        return [lemmatizer.lemmatize(word) for word in words]
    else:
        return []

In [42]:
cleaned_df = df.withColumn(
    "cleaned_text", lower(regexp_replace(col("Preferred SKills"), "[^a-zA-Z\s]", ""))
)
# Display the tokenized words
tokenized_df = cleaned_df.withColumn("words", split(col("cleaned_text"), "\s+"))

# Remove stop words using StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_tokenized_df = tokenized_df.filter(col("words").isNotNull())
filtered_df = remover.transform(filtered_tokenized_df)

# Create the UDF
lemmatize_udf = udf(lemmatize_words, ArrayType(StringType()))

# Apply lemmatization using the UDF
lemmatized_df = filtered_df.withColumn("lemmatized_words", lemmatize_udf(col("words")))

# Extract n-grams
ngram = NGram(n=1, inputCol="lemmatized_words", outputCol="ngrams")
ngram_df = ngram.transform(lemmatized_df)

In [43]:
# dropping all the columns which are not required for kpi analysis

cols_to_drop = [
    "Civil Service Title",
    "Title Code No",
    "Level",
    "Full-Time/Part-Time indicator",
    "Salary Range From",
    "Salary Range To",
    "Salary Frequency",
    "Work Location",
    "Division/Work Unit",
    "Job Description",
    "Minimum Qual Requirements",
    "Preferred Skills",
    "Additional Information",
    "To Apply",
    "Hours/Shift",
    "Work Location 1",
    "Recruitment Contact",
    "Residency Requirement",
    "Post Until",
    "Posting Updated",
    "Process Date",
    "cleaned_text",
    "words",
    "filtered_words",
    "lemmatized_words",
]

ngram_df = ngram_df.drop(*cols_to_drop)

In [44]:
import os

# Define the target directory path
target_directory = "../../dataset/processed/"

# Create the directory if it doesn't exist
if not os.path.exists(target_directory):
    os.makedirs(target_directory)

In [45]:
# Rename columns with invalid characters
columns_with_valid_names = [
    col.replace(" ", "_").replace("-", "_").lower() for col in ngram_df.columns
]
ngram_df_renamed = ngram_df.toDF(*columns_with_valid_names)

In [46]:
# Convert array column to comma-separated string
ngram_df_renamed = ngram_df_renamed.withColumn("ngrams", concat_ws(",", col("ngrams")))
ngram_df_renamed = ngram_df_renamed.withColumn(
    "degrees", concat_ws(",", col("degrees"))
)

In [47]:
# Convert "Posting Date" to date format
ngram_df_renamed = ngram_df_renamed.withColumn(
    "Posting_Date", col("Posting_Date").cast("date")
)

In [48]:
ngram_df_renamed.coalesce(1).write.mode("overwrite").format("parquet").save(
    "../../dataset/processed/nyc_job_postings_processed_data.parquet"
)

In [22]:
ngram_df_renamed.coalesce(1).write.mode("overwrite").format("csv").option(
    "sep", "\t"
).save("nyc_job_postings_processed_data.csv", header=True)

In [23]:
ngram_df_renamed.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- agency: string (nullable = true)
 |-- posting_type: string (nullable = true)
 |-- #_of_positions: integer (nullable = true)
 |-- business_title: string (nullable = true)
 |-- job_category: string (nullable = true)
 |-- posting_date: timestamp (nullable = true)
 |-- annualsalaryfrom: double (nullable = true)
 |-- annualsalaryto: double (nullable = true)
 |-- averageannualsalary: double (nullable = true)
 |-- degrees: string (nullable = false)
 |-- highestdegree: string (nullable = true)
 |-- ngrams: string (nullable = false)



In [24]:
!ls -l ../../dataset/processed/

total 0
drwxr-xr-x 1 root root 512 Aug 30 22:59 nyc_job_postings_processed_data.csv
drwxr-xr-x 1 root root 512 Aug 30 22:35 nyc_job_postings_processed_data.parquet
