In [None]:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

spark = SparkSession.builder.getOrCreate()

In [None]:
pip install spark-nlp
pip install xgboost

In [None]:
companies = spark.read.parquet('/dbfs/linkedin_train_data')
display(companies)

In [None]:
file_location = "dbfs:/user/hive/warehouse/syb2"
# Reading our Linkedin job listing scraping data
jobs = spark.read.format("delta").load(file_location)
display(jobs)

###EDA and organization of the data


In [None]:
from pyspark.sql.functions import regexp_extract, col, trim

# Adding new column for the applicant column only with numeric values
jobs_cleaned = jobs.dropna()
jobs_cleaned = jobs_cleaned.withColumn("applicants_count", regexp_extract(col("applicants"), r"(\d+)", 1))
jobs_cleaned = jobs_cleaned.withColumn("applicants_count", col("applicants_count").cast("int"))

# Removing rows with null values and blank strings in the specified fields
fields_to_check = ["applicants", "company", "industries", "time_posted", "job_function"]
for field in fields_to_check:
    jobs_cleaned = jobs_cleaned.filter(trim(col(field)) != "")

# Display cleaned data
display(jobs_cleaned)
print(jobs_cleaned.count())

###Cluster the Industries field

Find the best k for K-means using Elbow Method

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import Word2Vec, Normalizer, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import Tokenizer
import matplotlib.pyplot as plt

# Check for null values in the industries column
print("Null values in companies industries column:", companies.filter(col("industries").isNull()).count())
print("Null values in jobs_cleaned industries column:", jobs_cleaned.filter(col("industries").isNull()).count())

# Drop rows with null values in the industries column
companies = companies.dropna(subset=["industries"])
jobs_cleaned = jobs_cleaned.dropna(subset=["industries"])

# Tokenize the industries field for companies
tokenizer_industries = Tokenizer(inputCol="industries", outputCol="industries_words")
companies_tokenized = tokenizer_industries.transform(companies)

# Generate embeddings with Word2Vec for companies
word2vec_industries = Word2Vec(vectorSize=25, minCount=1, inputCol="industries_words", outputCol="industries_embedding")
model_industries = word2vec_industries.fit(companies_tokenized)
companies_embedded = model_industries.transform(companies_tokenized)

# Use only the industries_embedding as features for clustering
assembler = VectorAssembler(inputCols=["industries_embedding"], outputCol="raw_features")
companies_features = assembler.transform(companies_embedded)

# Normalize the features to simulate Cosine Similarity for K-means
normalizer = Normalizer(inputCol="raw_features", outputCol="features", p=2.0)
companies_features_normalized = normalizer.transform(companies_features)

# Elbow Method to find optimal k
cost = []
k_values = range(2, 15)
for k in k_values:
    kmeans = KMeans(k=k, seed=1, featuresCol="features")
    model = kmeans.fit(companies_features_normalized)
    cost.append(model.summary.trainingCost)

# Plot the Elbow Method
plt.figure(figsize=(10, 6))
plt.plot(k_values, cost, marker='o')
plt.title("Elbow Method for Optimal k", fontsize=16)
plt.xlabel("Number of Clusters (k)", fontsize=14)
plt.ylabel("Cost (Inertia)", fontsize=14)
plt.grid(True)
plt.show()

Apply KMeans with chosen k

In [None]:
# Choose the optimal k based on the elbow plot
optimal_k = 7

# Fit the final KMeans model on the companies table
kmeans_final = KMeans(k=optimal_k, seed=1, featuresCol="features")
final_model = kmeans_final.fit(companies_features_normalized)

# Now, predict clusters for the jobs_cleaned table
# Tokenize the industries field for jobs_cleaned
jobs_cleaned_tokenized = tokenizer_industries.transform(jobs_cleaned)

# Generate embeddings with Word2Vec for jobs_cleaned
jobs_cleaned_embedded = model_industries.transform(jobs_cleaned_tokenized)

# Use only the industries_embedding as features for clustering
jobs_features = assembler.transform(jobs_cleaned_embedded)

# Normalize the features
jobs_features_normalized = normalizer.transform(jobs_features)

# Predict clusters for jobs_cleaned
predictions = final_model.transform(jobs_features_normalized)
predictions = predictions.withColumnRenamed("prediction", "industry_cluster")

display(predictions)

Assign meaningful names to clusters

In [None]:
from pyspark.sql.functions import collect_list, col, explode, split, count, concat_ws, slice

# Group by cluster and collect industries
cluster_summary = predictions.groupBy("industry_cluster").agg(collect_list("industries").alias("industries_list")).orderBy("industry_cluster")
# Explode industries_list to individual industries
exploded_industries = cluster_summary.withColumn("industry", explode(col("industries_list")))
# Split industries into individual terms (if comma-separated)
exploded_terms = exploded_industries.withColumn("term", explode(split(col("industry"), ",\\s*")))
# Count the frequency of each term for each cluster
industry_counts = exploded_terms.groupBy("industry_cluster", "term").count().orderBy("industry_cluster", "count", ascending=[True, False])

# Extract top term for each cluster
top_keywords = (
    industry_counts
    .groupBy("industry_cluster")
    .agg(collect_list("term").alias("top_terms"))
)
top_keywords = top_keywords.withColumn("top_terms", slice(col("top_terms"), 1, 1))
# Concatenate the top terms into a single cluster name
cluster_names_automated = top_keywords.withColumn("cluster_name", concat_ws(", ", col("top_terms")))
display(cluster_names_automated)
# Join cluster names back to predictions
predictions_with_names = predictions.join(cluster_names_automated.select("industry_cluster", "cluster_name"), on="industry_cluster", how="left")

Dimentionality reduction for cluster plotting

In [None]:
from pyspark.ml.feature import PCA
import pandas as pd
import matplotlib.pyplot as plt

# Apply PCA to reduce features to 2 dimensions
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(predictions_with_names)
pca_result = pca_model.transform(predictions_with_names)
pca_data = pca_result.select("pca_features", "industry_cluster", "cluster_name").rdd.map(
    lambda row: (row["pca_features"][0], row["pca_features"][1], row["industry_cluster"], row["cluster_name"])
).collect()
pca_df = pd.DataFrame(pca_data, columns=["pca_x", "pca_y", "cluster", "cluster_name"])

plt.figure(figsize=(8, 6))
scatter = plt.scatter(pca_df["pca_x"], pca_df["pca_y"], c=pca_df["cluster"], cmap="tab10", s=30)
plt.title("PCA Visualization of Clusters", fontsize=16)
plt.xlabel("PCA Dimension 1", fontsize=14)
plt.ylabel("PCA Dimension 2", fontsize=14)
plt.grid(True)

# Annotate Clusters with Names
unique_clusters = pca_df[["cluster", "cluster_name"]].drop_duplicates().sort_values("cluster")
for cluster, name in unique_clusters.itertuples(index=False):
    cluster_points = pca_df[pca_df["cluster"] == cluster]
    center_x, center_y = cluster_points["pca_x"].mean(), cluster_points["pca_y"].mean()
    plt.text(center_x, center_y, name, fontsize=12, ha="center", bbox=dict(facecolor="white", alpha=0.8))
plt.tight_layout()
plt.show()

predictions_with_names.display()

### **Preparing the features for the model**

In [None]:
from pyspark.sql.functions import col, regexp_extract, when, max
def convert_to_minutes(df):
    # Extract number and unit using regexp
    df = df.withColumn('time_number',
                      regexp_extract(col('time_posted'), r'(\d+)', 1).cast('int'))
    df = df.withColumn('time_unit',
                      regexp_extract(col('time_posted'), r'(\w+) ago', 1))

    # Convert all times to minutes
    df = df.withColumn('minutes_ago',
        when(col('time_unit') == 'minutes', col('time_number'))
        .when(col('time_unit') == 'hours', col('time_number') * 60)
        .when(col('time_unit') == 'hour', col('time_number') * 60)
        .when(col('time_unit') == 'days', col('time_number') * 60 * 24)
        .when(col('time_unit') == 'day', col('time_number') * 60 * 24)
        .when(col('time_unit') == 'weeks', col('time_number') * 60 * 24 * 7)
        .when(col('time_unit') == 'week', col('time_number') * 60 * 24 * 7)
        .when(col('time_unit') == 'months', col('time_number') * 60 * 24 * 7 * 30)
        .when(col('time_unit') == 'month', col('time_number') * 60 * 24 * 7 * 30)
        .when(col('time_unit') == 'year', col('time_number') * 60 * 24 * 7 * 30 * 12)
    )

    return df

records_with_minutes = convert_to_minutes(predictions_with_names)
group_columns = [col_name for col_name in records_with_minutes.columns if col_name != "minutes_ago"]
# Group by these columns and get the maximum `minutes_ago` for each group
records_with_minutes = (
    records_with_minutes.groupBy(*group_columns)
    .agg(max("minutes_ago").alias("minutes_ago"))
)
display(records_with_minutes)

###Normalization of applicants with important parameters


number of applicants has a few problems:
1. correlation with company status: size \ market cap \ followers - bigger companies with more followers will tend to have more exposure.
2. number of applicants Missing not at random - amount of applicants is between 25 to 200 with under 25 as 25 and over 200 as 200.
3. number of applicants Missing at Random with correlation to time posted
4. time posted Missing not at random - we gather time only in the largest unit avalible (minute, hour, day, weeks...). if a post exceeds minutes we will count only hours etc.


Train the first model to predict number of applicants with normalization

In [None]:
# Define the fraction of records for each model

from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import IntegerType

original_count = records_with_minutes.count()
records_with_minutes = records_with_minutes.filter(
    col("applicants_count").isNotNull() &
    col("applicants_count").cast(IntegerType()).isNotNull()
)
new_count = records_with_minutes.count()
print(f"Rows dropped: {original_count - new_count}")

# Set the fraction to 1/3 for the sampling
applicants_normalization_fraction = 1 / 3
applicants_prediction_fraction = 2/3

# Take a random sample of 1/3 of the records
applicants_normalization_data, applicants_prediction_data = records_with_minutes.randomSplit([applicants_normalization_fraction, applicants_prediction_fraction], seed=42)

display(applicants_normalization_data)
print(applicants_normalization_data.count())
display(applicants_prediction_data)
print(applicants_prediction_data.count())


Count the record count of each company to use it as a "company size" normalization

In [None]:
from pyspark.sql.functions import col, count

# Group by 'company', count records, order by count in descending order, and show the result
amount_of_jobs = records_with_minutes.groupBy("company") \
    .agg(count("*").alias("record_count")) \
    .orderBy(col("record_count").desc())
amount_of_jobs.display()

Finding best parameters for the **Random Forest** model using the elbow method

In [None]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, Bucketizer, OneHotEncoder, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Create bins for the "applicants_count" column
def create_binned_features(df):
    splits = [float('-inf'), 26, 50, 75, 100, 125, 150, 175, 199, float('inf')] # Dividing to bins and not to an actual number because of the MAR\MNAR mechnism in the applicants count column
    bucketizer = Bucketizer(
        splits=splits,
        inputCol="applicants_count",
        outputCol="applicants_count_bin"
    )
    return bucketizer.transform(df)

# Prepare features for the model
def prepare_features(df, categorical_cols=["seniority_level", "employment_type"],
                    numeric_cols=["record_count", "minutes_ago", "industry_cluster"]):
    for col in categorical_cols + numeric_cols:
        df = df.na.fill(0, subset=[col])

    if "bin_prediction" in df.columns:
        df = df.drop("bin_prediction")

    indexers = [
        StringIndexer(
            inputCol=col,
            outputCol=f"{col}_indexed",
            handleInvalid="keep"
        ) for col in categorical_cols
    ]

    for indexer in indexers:
        df = indexer.fit(df).transform(df)

    encoder = OneHotEncoder(
        inputCols=[f"{col}_indexed" for col in categorical_cols],
        outputCols=[f"{col}_encoded" for col in categorical_cols]
    )
    df = encoder.fit(df).transform(df)

    feature_cols = numeric_cols + [f"{col}_encoded" for col in categorical_cols]

    assembler = VectorAssembler(
        inputCols=feature_cols,
        outputCol="bin_prediction"
    )


    return assembler.transform(df)

def plot_elbow_curves(param_metrics):
    """Plot elbow curves for different parameters."""
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('Parameter Tuning Elbow Curves')

    # Plot numTrees
    num_trees_df = param_metrics[param_metrics['maxDepth'] == param_metrics['maxDepth'].median()]
    axes[0, 0].plot(num_trees_df['numTrees'], num_trees_df['metric'], marker='o')
    axes[0, 0].set_xlabel('Number of Trees')
    axes[0, 0].set_ylabel('F1 Score')
    axes[0, 0].set_title('Effect of Number of Trees')

    # Plot maxDepth
    max_depth_df = param_metrics[param_metrics['numTrees'] == param_metrics['numTrees'].median()]
    axes[0, 1].plot(max_depth_df['maxDepth'], max_depth_df['metric'], marker='o')
    axes[0, 1].set_xlabel('Max Depth')
    axes[0, 1].set_ylabel('F1 Score')
    axes[0, 1].set_title('Effect of Max Depth')

    # Plot 3D surface
    ax = fig.add_subplot(2, 1, 2, projection='3d')
    X = param_metrics['numTrees'].unique()
    Y = param_metrics['maxDepth'].unique()
    X, Y = np.meshgrid(X, Y)
    Z = param_metrics.pivot(index='maxDepth', columns='numTrees', values='metric').values
    surf = ax.plot_surface(X, Y, Z, cmap='viridis')
    ax.set_xlabel('Number of Trees')
    ax.set_ylabel('Max Depth')
    ax.set_zlabel('F1 Score')
    ax.set_title('Parameter Interaction Effect on F1 Score')
    fig.colorbar(surf)

    plt.tight_layout()
    return fig


def evaluate_model(model, data, evaluator):
    """Evaluate model performance using multiple metrics."""
    predictions = model.transform(data)
    metrics = {}
    for metric in ["accuracy", "weightedPrecision", "weightedRecall", "f1"]:
        evaluator.setMetricName(metric)
        metrics[metric] = evaluator.evaluate(predictions)
    return metrics, predictions


def create_applicant_classifier(amount_of_jobs, applicants_normalization_data):
    # Join and prepare initial dataset
    joined_df = amount_of_jobs.join(
        applicants_normalization_data,
        on="company",
        how="inner"
    )
    df_with_bins = create_binned_features(joined_df).na.drop(subset=["applicants_count_bin"])
    vector_df = prepare_features(df_with_bins)
    # Split into train, eval, and test sets
    train_data, eval_data, test_data = vector_df.randomSplit([0.6, 0.2, 0.2], seed=42)
    # Create evaluator
    evaluator = MulticlassClassificationEvaluator(
        labelCol="applicants_count_bin",
        predictionCol="prediction",
        metricName="f1"
    )
    # Define parameter grid
    param_grid = {
        'numTrees': [50, 100, 150],
        'maxDepth': [5, 10, 15]
    }
    # Train and evaluate models with different parameters
    param_metrics = []
    best_metric = -1
    best_model = None
    best_params = None

    for num_trees in param_grid['numTrees']:
        for max_depth in param_grid['maxDepth']:
            # Train model with current parameters
            rf = RandomForestClassifier(
                labelCol="applicants_count_bin",
                featuresCol="bin_prediction",
                numTrees=num_trees,
                maxDepth=max_depth,
                seed=42
            )
            model = rf.fit(train_data)
            # Evaluate on eval set
            eval_metrics, _ = evaluate_model(model, eval_data, evaluator)
            # Store metrics
            param_metrics.append({
                'numTrees': num_trees,
                'maxDepth': max_depth,
                'metric': eval_metrics['f1']
            })
            # Update best model if necessary
            if eval_metrics['f1'] > best_metric:
                best_metric = eval_metrics['f1']
                best_model = model
                best_params = {'numTrees': num_trees, 'maxDepth': max_depth}

    param_metrics_df = pd.DataFrame(param_metrics)

    # Generate elbow plots
    elbow_plot = plot_elbow_curves(param_metrics_df)
    # Evaluate best model on test set
    test_metrics, test_predictions = evaluate_model(best_model, test_data, evaluator)
    # Get feature importance
    numeric_cols = ["record_count", "minutes_ago", "industry_cluster"]
    categorical_cols = ["seniority_level", "employment_type"]
    feature_cols = numeric_cols + [f"{col}_encoded" for col in categorical_cols]

    feature_importance = best_model.featureImportances
    feature_importance_list = [
        (feature, importance)
        for feature, importance in zip(feature_cols, feature_importance)
    ]
    bin_distribution = test_predictions.groupBy("prediction") \
        .agg(F.count("*").alias("count")) \
        .orderBy("prediction") \
        .toPandas()

    return {
        'model': best_model,
        'test_metrics': test_metrics,
        'test_predictions': test_predictions,
        'feature_importance': feature_importance_list,
        'bin_distribution': bin_distribution,
        'param_metrics': param_metrics_df,
        'elbow_plot': elbow_plot,
        'best_params': best_params
    }
result = create_applicant_classifier(amount_of_jobs, applicants_normalization_data)

# Print best parameters
print("\nBest Parameters:")
print(f"Number of Trees: {result['best_params']['numTrees']}")
print(f"Max Depth: {result['best_params']['maxDepth']}")
# Print test metrics
print("\nTest Set Metrics:")
for metric, value in result['test_metrics'].items():
    print(f"{metric}: {value:.4f}")
# Print feature importance
print("\nFeature Importance:")
for feature, importance in result['feature_importance']:
    print(f"{feature}: {importance:.4f}")
# Print parameter tuning results
print("\nParameter Tuning Results:")
print(result['param_metrics'].sort_values('metric', ascending=False).head())

# Display elbow plot
plt.show()

In [None]:
norm_model = result['model']
applicants_prediction_data_for_model = create_binned_features(applicants_prediction_data)
applicants_prediction_data_for_model = amount_of_jobs.join(
    applicants_prediction_data_for_model,
    on="company",
    how="inner"
)
applicants_prediction_data_for_model = prepare_features(applicants_prediction_data_for_model)
applicants_prediction_data_predictions = norm_model.transform(applicants_prediction_data_for_model)
applicants_prediction_data_predictions = applicants_prediction_data_predictions.select(["company", "applicants_count", "description", "employment_type", "industries", "job_function", "location", "seniority_level", "industry_cluster", "minutes_ago", "title", "applicants_count_bin", "prediction"])
applicants_prediction_data_predictions = applicants_prediction_data_predictions.withColumnRenamed("prediction", "prediction_applicants_count_bin")
applicants_prediction_data_predictions = applicants_prediction_data_predictions.withColumn("applicant_bin_difference", col("prediction_applicants_count_bin") - col("applicants_count_bin"))

display(applicants_prediction_data_predictions)

In [None]:
# Show distribution of bin difference
import matplotlib.pyplot as plt

bin_diff_df = applicants_prediction_data_predictions.select("applicant_bin_difference").toPandas()
plt.figure(figsize=(10, 6))
bin_diff_df['applicant_bin_difference'].value_counts().sort_index().plot(kind='bar')
plt.xlabel('Applicant Bin Difference')
plt.ylabel('Frequency')
plt.title('Distribution of Applicant Bin Difference')
plt.figtext(0.5, -0.05, 'Note: Lower values indicate better job ads', ha='center', fontsize=10)
plt.show()

## Predicting difference based on description and title

First, we try with the **Random Forest** model, lets investigate its performance

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, udf, abs, avg, length
from pyspark.sql.types import DoubleType

def clean_data(data):
    """Remove null values and empty strings from the dataset"""
    return data.filter(
        (col("title").isNotNull()) &
        (col("description").isNotNull()) &
        (col("applicant_bin_difference").isNotNull()) &
        (length(col("title")) > 0) &
        (length(col("description")) > 0)
    )

def shift_labels(data):
    """Shift the labels to non-negative values"""
    return data.withColumn(
        "shifted_label",
        col("applicant_bin_difference") + 8
    )

def shift_predictions_back(predictions):
    """Shift predictions back to original range"""
    return predictions.withColumn(
        "adjusted_prediction",
        col("prediction") - 8
    )

# Create tokenizers for both text columns
title_tokenizer = RegexTokenizer(
    inputCol="title",
    outputCol="title_words",
    pattern="\\W"
)

description_tokenizer = RegexTokenizer(
    inputCol="description",
    outputCol="description_words",
    pattern="\\W"
)

# Remove stop words from both columns
title_remover = StopWordsRemover(
    inputCol="title_words",
    outputCol="title_filtered"
)

description_remover = StopWordsRemover(
    inputCol="description_words",
    outputCol="description_filtered"
)

# Convert text to term frequency vectors
title_vectorizer = CountVectorizer(
    inputCol="title_filtered",
    outputCol="title_tf",
    minDF=5.0
)

description_vectorizer = CountVectorizer(
    inputCol="description_filtered",
    outputCol="description_tf",
    minDF=5.0
)

# Apply IDF to both columns
title_idf = IDF(
    inputCol="title_tf",
    outputCol="title_features"
)

description_idf = IDF(
    inputCol="description_tf",
    outputCol="description_features"
)

# Combine features from both columns
assembler = VectorAssembler(
    inputCols=["title_features", "description_features"],
    outputCol="raw_features"
)

# Scale features
scaler = StandardScaler(
    inputCol="raw_features",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Create the classifier
rf = RandomForestClassifier(
    labelCol="shifted_label",
    featuresCol="features",
    numTrees=100
)

# Create the pipeline
pipeline = Pipeline(stages=[
    title_tokenizer,
    description_tokenizer,
    title_remover,
    description_remover,
    title_vectorizer,
    description_vectorizer,
    title_idf,
    description_idf,
    assembler,
    scaler,
    rf
])

def train_and_evaluate_model(data):
    """Train and evaluate the model with data cleaning and error handling"""
    # Print initial data count
    initial_count = data.count()
    print(f"Initial number of records: {initial_count}")

    # Clean the data
    cleaned_data = clean_data(data)
    cleaned_count = cleaned_data.count()
    print(f"Number of records after cleaning: {cleaned_count}")
    print(f"Removed {initial_count - cleaned_count} records with null or empty values")

    # Shift labels to non-negative range
    shifted_data = shift_labels(cleaned_data)

    # Split the data into training and test sets
    train_data, test_data = shifted_data.randomSplit([0.8, 0.2], seed=42)
    print(f"Training set size: {train_data.count()}")
    print(f"Test set size: {test_data.count()}")

    # Fit the pipeline on the training data
    model = pipeline.fit(train_data)

    # Make predictions on the test data
    predictions = model.transform(test_data)

    # Shift predictions back to original range
    final_predictions = shift_predictions_back(predictions)

    # Evaluate the model
    evaluator = MulticlassClassificationEvaluator(
        labelCol="shifted_label",
        predictionCol="prediction",
        metricName="accuracy"
    )

    accuracy = evaluator.evaluate(predictions)

    # Calculate mean absolute error
    mae = final_predictions.select(
        avg(abs(col("applicant_bin_difference") - col("adjusted_prediction")))
    ).collect()[0][0]

    # Calculate feature importance
    feature_importance = model.stages[-1].featureImportances

    # Show distribution of predictions
    print("\nPrediction Distribution:")
    final_predictions.groupBy("adjusted_prediction").count().orderBy("adjusted_prediction").show()

    return model, accuracy, mae, feature_importance, final_predictions

In [None]:
# Train and evaluate the model
model, accuracy, mae, feature_importance, predictions = train_and_evaluate_model(applicants_prediction_data_predictions)
# Print metrics
print(f"\nModel Accuracy: {accuracy}")
print(f"Mean Absolute Error: {mae}")
# Look at sample predictions with original and predicted values
predictions.select(
    "title",
    "applicant_bin_difference",
    "adjusted_prediction"
).show(5)

predictions.display()

As we can see, the results of the random forest are not so high. so we try to investigare another model : **XGboost**

In [None]:
from pyspark.ml import Pipeline
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, udf, abs, avg, length, count
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, udf, abs, avg, length
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler

def clean_data(data):
    """Remove null values and empty strings from the dataset"""
    return data.filter(
        (col("title").isNotNull()) &
        (col("description").isNotNull()) &
        (col("applicant_bin_difference").isNotNull()) &
        (length(col("title")) > 0) &
        (length(col("description")) > 0)
    )

def shift_labels(data):
    """Shift the labels to non-negative values"""
    return data.withColumn(
        "shifted_label",
        col("applicant_bin_difference") + 8
    )

def shift_predictions_back(predictions):
    """Shift predictions back to original range"""
    return predictions.withColumn(
        "adjusted_prediction",
        col("prediction") - 8
    )

# Create tokenizers for both text columns
title_tokenizer = RegexTokenizer(
    inputCol="title",
    outputCol="title_words",
    pattern="\\W"
)

description_tokenizer = RegexTokenizer(
    inputCol="description",
    outputCol="description_words",
    pattern="\\W"
)

# Remove stop words from both columns
title_remover = StopWordsRemover(
    inputCol="title_words",
    outputCol="title_filtered"
)

description_remover = StopWordsRemover(
    inputCol="description_words",
    outputCol="description_filtered"
)

# Apply Word2Vec to both columns
title_word2vec = Word2Vec(
    inputCol="title_filtered",
    outputCol="title_features",
    vectorSize=100,
    minCount=5
)

description_word2vec = Word2Vec(
    inputCol="description_filtered",
    outputCol="description_features",
    vectorSize=100,
    minCount=5
)

# Combine features from both columns
assembler = VectorAssembler(
    inputCols=["title_features", "description_features"],
    outputCol="raw_features"
)

# Create the XGBoost classifier
xgb = SparkXGBClassifier(
    label_col="shifted_label",
    features_col="raw_features",
    # objective="multi:softmax",
    num_class=16,  # Adjust based on the range of shifted labels
    max_depth=6,
    eta=0.1,
    num_round=100
)


# Create the pipeline
pipeline = Pipeline(stages=[
    title_tokenizer,
    description_tokenizer,
    title_remover,
    description_remover,
    title_word2vec,
    description_word2vec,
    assembler,
    xgb
])

def tune_max_depth(train_data, eval_data, max_depth_values):
    """
    Perform hyperparameter tuning for max_depth using the elbow method
    Returns the best max_depth value based on evaluation metrics
    """
    results = []

    def evaluate_max_depth(depth):
        # Create pipeline with current max_depth
        xgb = SparkXGBClassifier(
            label_col="shifted_label",
            features_col="raw_features",
            num_class=16,
            max_depth=depth,
            eta=0.1,
            num_round=100
        )

        current_pipeline = Pipeline(stages=[
            title_tokenizer,
            description_tokenizer,
            title_remover,
            description_remover,
            title_word2vec,
            description_word2vec,
            assembler,
            xgb
        ])

        # Train model
        model = current_pipeline.fit(train_data)
        predictions = model.transform(eval_data)

        # Calculate metrics
        evaluator = MulticlassClassificationEvaluator(
            labelCol="shifted_label",
            predictionCol="prediction",
            metricName="accuracy"
        )

        accuracy = evaluator.evaluate(predictions)

        # Calculate MAE
        final_predictions = shift_predictions_back(predictions)
        mae = final_predictions.select(
            avg(abs(col("applicant_bin_difference") - col("adjusted_prediction"))).alias("mae")
        ).collect()[0]["mae"]

        return depth, accuracy, mae, model

    # Parallel execution of hyperparameter evaluation
    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(
            evaluate_max_depth,
            max_depth_values
        ))

    # Manually find the best result by sorting
    results_sorted = sorted(results, key=lambda x: x[1], reverse=True)  # Sort by accuracy
    best_result = results_sorted[0]

    return best_result


def train_evaluate_tune_model(data):
    """
    Complete pipeline for model training, tuning, and evaluation
    """
    print("Starting model training and evaluation pipeline...")

    # Clean data
    cleaned_data = clean_data(data)
    print(f"Data size after cleaning: {cleaned_data.count()}")

    # Shift labels
    shifted_data = shift_labels(cleaned_data)

    # Split data into train (60%), evaluation (20%), and test (20%) sets
    train_data, remaining = shifted_data.randomSplit([0.6, 0.4], seed=42)
    eval_data, test_data = remaining.randomSplit([0.5, 0.5], seed=42)

    print(f"Training set size: {train_data.count()}")
    print(f"Evaluation set size: {eval_data.count()}")
    print(f"Test set size: {test_data.count()}")

    # Define range of max_depth values to test
    max_depth_values = list(range(3, 11))

    print("\nPerforming hyperparameter tuning...")
    best_depth, best_eval_accuracy, best_eval_mae, best_model = tune_max_depth(
        train_data,
        eval_data,
        max_depth_values
    )

    print(f"\nBest parameters found:")
    print(f"max_depth: {best_depth}")
    print(f"Evaluation accuracy: {best_eval_accuracy:.4f}")
    print(f"Evaluation MAE: {best_eval_mae:.4f}")

    # Evaluate on test set
    print("\nEvaluating on test set...")
    test_predictions = best_model.transform(test_data)

    # Calculate test metrics
    evaluator = MulticlassClassificationEvaluator(
        labelCol="shifted_label",
        predictionCol="prediction",
        metricName="accuracy"
    )

    test_accuracy = evaluator.evaluate(test_predictions)

    # Shift predictions back and calculate MAE
    final_predictions = shift_predictions_back(test_predictions)
    test_mae = final_predictions.select(
        avg(abs(col("applicant_bin_difference") - col("adjusted_prediction")))
    ).collect()[0][0]

    # Add helpful columns for analysis
    final_predictions = final_predictions.withColumn(
        "prediction_difference",
        abs(col("applicant_bin_difference") - col("adjusted_prediction"))
    ).withColumn(
        "prediction_correct",
        (col("applicant_bin_difference") == col("adjusted_prediction")).cast("integer")
    )

    return best_model, test_accuracy, test_mae, final_predictions

def analyze_predictions(predictions_df):
    """
    Analyze prediction results in detail
    """
    # Distribution of prediction differences
    print("\nDistribution of prediction differences:")
    predictions_df.groupBy("prediction_difference") \
        .count() \
        .orderBy("prediction_difference") \
        .show()

    # Accuracy by original bin
    print("\nAccuracy by original bin:")
    predictions_df.groupBy("applicant_bin_difference") \
        .agg(
            avg("prediction_correct").alias("accuracy"),
            count("*").alias("count")
        ) \
        .orderBy("applicant_bin_difference") \
        .show()

    # Common misclassifications
    print("\nMost common misclassifications:")
    predictions_df.filter(col("prediction_difference") > 0) \
        .groupBy("applicant_bin_difference", "adjusted_prediction") \
        .count() \
        .orderBy(col("count").desc()) \
        .show(10)

In [None]:
# Train and evaluate the model
model, accuracy, mae, jobs_predicted = train_evaluate_tune_model(applicants_prediction_data_predictions)
# Print metrics
print(f"\nModel Accuracy: {accuracy}")
print(f"Mean Absolute Error: {mae}")
# Look at sample predictions with original and predicted values
jobs_predicted.select(
    "title",
    "applicant_bin_difference",
    "adjusted_prediction"
).show(5)

jobs_predicted.display()

In [None]:
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import pandas as pd

jobs_predicted.createOrReplaceTempView("jobs_predicted")
# Calculate the difference between the two columns
df = spark.sql("SELECT *, (adjusted_prediction - applicant_bin_difference) AS difference FROM jobs_predicted")
# Aggregate the data to get the count of each difference value
diff_counts = df.groupBy("difference").count().orderBy("difference")

diff_counts_pd = diff_counts.toPandas()
plt.figure(figsize=(10, 6))
plt.bar(diff_counts_pd['difference'], diff_counts_pd['count'], color='blue')
plt.xlabel('Difference (adjusted_prediction - prediction_difference)')
plt.ylabel('Count')
plt.title('Difference between adjusted_prediction and prediction_difference')
plt.xticks(range(-8, 7))  # Set x-ticks from -8 to 6
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()

In [None]:
# Getting similar jobs

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, levenshtein, row_number
from pyspark.sql.window import Window
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import VectorUDT, DenseVector
import numpy as np
from pyspark.sql.functions import concat_ws
from pyspark.ml.linalg import Vectors

# Extract top 10 entries with the highest adjusted_prediction
top_10_jobs = jobs_predicted.orderBy(col("adjusted_prediction").desc()).limit(10)
top_10_jobs = top_10_jobs.withColumnRenamed("title", "jobs_title")
top_10_jobs = top_10_jobs.withColumnRenamed("description", "jobs_description")
# top_10_jobs.display()

# Drop entries where applicant_bin_difference is null in applicants_prediction_data_predictions
applicants_prediction_data_predictions_adj = applicants_prediction_data_predictions.dropna(subset=["applicant_bin_difference"])
applicants_prediction_data_predictions_adj = applicants_prediction_data_predictions_adj.withColumnRenamed("title", "applicants_title")
applicants_prediction_data_predictions_adj = applicants_prediction_data_predictions_adj.withColumnRenamed("description", "applicants_description")
applicants_good = applicants_prediction_data_predictions_adj.filter("applicant_bin_difference <= -7")
applicants_bad = applicants_prediction_data_predictions_adj.filter("applicant_bin_difference >= 7")

# Prepare combined data for TF-IDF models
# Combine titles from jobs and applicants
all_titles = jobs_predicted.select(col("title").alias("text")).union(applicants_prediction_data_predictions.select(col("title").alias("text")))
# Combine descriptions from jobs and applicants
all_descriptions = jobs_predicted.select(col("description").alias("text")).union(applicants_prediction_data_predictions.select(col("description").alias("text")))

# Define pipelines for title and description processing
title_pipeline = Pipeline(stages=[
    Tokenizer(inputCol="text", outputCol="tokens"),
    StopWordsRemover(inputCol="tokens", outputCol="filtered"),
    CountVectorizer(inputCol="filtered", outputCol="tf"),
    IDF(inputCol="tf", outputCol="tfidf")
])

desc_pipeline = Pipeline(stages=[
    Tokenizer(inputCol="text", outputCol="tokens"),
    StopWordsRemover(inputCol="tokens", outputCol="filtered"),
    CountVectorizer(inputCol="filtered", outputCol="tf"),
    IDF(inputCol="tf", outputCol="tfidf")
])

# Fit models on combined data
title_model = title_pipeline.fit(all_titles)
desc_model = desc_pipeline.fit(all_descriptions)

# Join the top 10 jobs with applicants
cross_joined_good_df = top_10_jobs.alias("jobs").crossJoin(applicants_good.alias("applicants"))
cross_joined_bad_df = top_10_jobs.alias("jobs").crossJoin(applicants_bad.alias("applicants"))
select_cols = ["jobs.company", "jobs.applicants_count", "jobs_title", "jobs_description", "jobs.applicant_bin_difference", "jobs.adjusted_prediction", "applicants.company", "applicants.applicants_count", "applicants_title", "applicants_description", "applicants.applicant_bin_difference"]
jobs_col = ["jobs.company", "jobs.applicants_count", "jobs_title", "jobs_description", "jobs.applicant_bin_difference", "jobs.adjusted_prediction"]
cross_joined_good_df = cross_joined_good_df.select(select_cols)
cross_joined_bad_df = cross_joined_bad_df.select(select_cols)

# Define a UDF to calculate cosine similarity
def cosine_similarity(v1, v2):
    return float(v1.dot(v2) / (v1.norm(2) * v2.norm(2)))

cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# Process titles and descriptions using the fitted models
def apply_title_model(df, model, col_name):
    df = df.withColumnRenamed(col_name, "text")
    df = model.transform(df)
    df = df.withColumnRenamed("text", col_name)
    df = df.withColumnRenamed("tfidf", f"{col_name}_tfidf")
    return df.drop("tokens", "filtered", "tf")

def apply_desc_model(df, model, col_name):
    df = df.withColumnRenamed(col_name, "text")
    df = model.transform(df)
    df = df.withColumnRenamed("text", col_name)
    df = df.withColumnRenamed("tfidf", f"{col_name}_tfidf")
    return df.drop("tokens", "filtered", "tf")

# Apply title model to jobs_title and applicants_title
cross_joined_good_df = apply_title_model(cross_joined_good_df, title_model, "jobs_title")
cross_joined_good_df = apply_title_model(cross_joined_good_df, title_model, "applicants_title")

# Apply description model to jobs_description and applicants_description
cross_joined_good_df = apply_desc_model(cross_joined_good_df, desc_model, "jobs_description")
cross_joined_good_df = apply_desc_model(cross_joined_good_df, desc_model, "applicants_description")

# Calculate cosine similarity for titles and descriptions
cross_joined_good_df = cross_joined_good_df.withColumn(
    "title_similarity",
    cosine_similarity_udf(col("jobs_title_tfidf"), col("applicants_title_tfidf"))
).withColumn(
    "description_similarity",
    cosine_similarity_udf(col("jobs_description_tfidf"), col("applicants_description_tfidf"))
)
# Combine similarities
cross_joined_good_df = cross_joined_good_df.withColumn(
    "combined_similarity",
    col("title_similarity") * 0.4 + col("description_similarity") * 0.6
)
# Select top entry for each job
window_spec = Window.partitionBy("jobs_title").orderBy(col("combined_similarity").desc())
cross_joined_good_df = cross_joined_good_df.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1).drop("rank")

# Repeat processing for bad applicants
cross_joined_bad_df = apply_title_model(cross_joined_bad_df, title_model, "jobs_title")
cross_joined_bad_df = apply_title_model(cross_joined_bad_df, title_model, "applicants_title")
cross_joined_bad_df = apply_desc_model(cross_joined_bad_df, desc_model, "jobs_description")
cross_joined_bad_df = apply_desc_model(cross_joined_bad_df, desc_model, "applicants_description")
cross_joined_bad_df = cross_joined_bad_df.withColumn(
    "title_similarity",
    cosine_similarity_udf(col("jobs_title_tfidf"), col("applicants_title_tfidf"))
).withColumn(
    "description_similarity",
    cosine_similarity_udf(col("jobs_description_tfidf"), col("applicants_description_tfidf"))
).withColumn(
    "combined_similarity",
    col("title_similarity") * 0.4 + col("description_similarity") * 0.6
)
window_spec_bad = Window.partitionBy("jobs_title").orderBy(col("combined_similarity").desc())
cross_joined_bad_df = cross_joined_bad_df.withColumn("rank", row_number().over(window_spec_bad)).filter(col("rank") == 1).drop("rank")

# Display results
cross_joined_good_df = cross_joined_good_df.select(select_cols)
cross_joined_bad_df = cross_joined_bad_df.select(select_cols)
display(cross_joined_good_df) # An input with a good similar match
display(cross_joined_bad_df) # An input with a bad similar match

## Analyze a good description structure

Calculate the mean length and common length good description has

In [None]:
from pyspark.sql.functions import length, col

# Add a new column for description length
applicants_good = applicants_good.withColumn("description_length", length(col("applicants_description")))
# Calculate statistics for the description length
applicants_good.selectExpr(
    "AVG(description_length) as avg_length",
    "MIN(description_length) as min_length",
    "MAX(description_length) as max_length",
    "STDDEV(description_length) as stddev_length"
).show()
# Distribution of lengths
length_distribution = applicants_good.groupBy("description_length").count()
length_distribution.orderBy("description_length").show(10, truncate=False)


In [None]:
import matplotlib.pyplot as plt

# Visualize the distribution of description length
length_data = applicants_good.select("description_length").toPandas()

plt.figure(figsize=(10, 6))
plt.hist(length_data["description_length"], bins=20, edgecolor="black", alpha=0.7)
plt.title("Distribution of Description Lengths", fontsize=16)
plt.xlabel("Description Length", fontsize=14)
plt.ylabel("Frequency", fontsize=14)
plt.grid(axis="y", linestyle="--", alpha=0.7)
plt.tight_layout()
plt.show()

Analyze the common words in a good description

In [None]:
from pyspark.sql.functions import explode, split, lower, regexp_replace, col

# Preprocess descriptions: clean and split into words
applicants_good = applicants_good.withColumn(
    "words",
    split(
        regexp_replace(lower(col("applicants_description")), "[^a-zA-Z\\s]", ""),
        "\\s+"
    )
)
# Remove stopwords
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
applicants_good = remover.transform(applicants_good)
word_df = applicants_good.select(explode(col("filtered_words")).alias("word"))

# Count word frequencies
word_freq = word_df.groupBy("word").count()
word_freq = word_freq.orderBy(col("count").desc())
# Show the top 10 most common words and their count
word_freq.show(10, truncate=False)

Introduce word cloud of most common words

In [None]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt

word_freq_pd = word_freq.toPandas()
word_dict = dict(zip(word_freq_pd["word"], word_freq_pd["count"]))
wordcloud = WordCloud(width=800, height=400, background_color="white").generate_from_frequencies(word_dict)
plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation="bilinear")
plt.axis("off")
plt.title("Word Cloud of Most Common Words", fontsize=16)
plt.tight_layout()
plt.show()