In [0]:
import json
import pandas as pd
import re
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import (
    StringType, IntegerType, StructType, StructField, BooleanType, FloatType, ArrayType
)
from pyspark.sql.functions import (
    col, udf, when, lit, to_json, split, element_at, count, lower, trim, explode, from_json, sum as _sum,
    regexp_extract, create_map, levenshtein
)
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, VectorAssembler, OneHotEncoder, StopWordsRemover, Tokenizer, NGram, HashingTF,
    MinHashLSH, RegexTokenizer, SQLTransformer
)
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from json.decoder import JSONDecodeError
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark Session
spark = SparkSession.builder.appName("Salary Predictor").getOrCreate()

# Load datasets
jobs_in_data = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/sewar.hino@campus.technion.ac.il/jobs_in_data.csv")

scrapped_big_companies = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/sewar.hino@campus.technion.ac.il/LinkedIn_people_profiles_BIG_COMPANIES_ONLY.csv")

job_titles = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/sewar.hino@campus.technion.ac.il/titles.csv")

companies = spark.read.parquet('/linkedin/companies')

profiles = spark.read.parquet('/linkedin/people')

extra_profiles = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/sewar.hino@campus.technion.ac.il/scrabbed_profiles.csv")


##########################################################################
# preapare data sets
# jobs_in_data = jobs_in_data.withColumnRenamed("employee_residence", "location__DUPLICATE")

profiles = profiles.withColumnRenamed("current_company:company_id", "current_company_company_id")
profiles = profiles.withColumnRenamed("current_company:name", "current_company_name")
profiles = profiles.withColumnRenamed("city", "location__DUPLICATE")
columns_of_interest = ['current_company_company_id', 'id', 'position', 'about','current_company','experience','location__DUPLICATE','current_company_name', 'country_code']  


profiles1 = profiles.select(columns_of_interest) # Original data
extra_profiles1 = extra_profiles.select(columns_of_interest) # Scraped data

profiles1 = profiles1.withColumn("is_original_profile", lit(1))
extra_profiles1 = extra_profiles1.withColumn("is_original_profile", lit(0))

profiles1 = profiles1.dropDuplicates(['id'])
extra_profiles1 = extra_profiles1.dropDuplicates(['id'])

profiles1 = profiles1.withColumn("current_company", to_json("current_company"))
profiles1 = profiles1.withColumn("experience", to_json("experience"))

combined_profiles = profiles1.unionByName(extra_profiles1, allowMissingColumns=True)

##########################################################################
# Employee Residence

# Define a mapping dictionary for country codes to country names
country_code_mapping = {
    "LT": "Lithuania",
    "FI": "Finland",
    "UA": "Ukraine",
    "RO": "Romania",
    "NL": "Netherlands",
    "PL": "Poland",
    "AT": "Austria",
    "RU": "Russia",
    "HR": "Croatia",
    "CZ": "Czech Republic",
    "PT": "Portugal",
    "BY": "Belarus",
    "MD": "Moldova",
    "DE": "Germany",
    "ES": "Spain",
    "ME": "Montenegro",
    "RS": "Serbia",
    "FR": "France",
    "CH": "Switzerland",
    "GR": "Greece",
    "US": "United States"
}

# Convert the mapping dictionary to a Spark DataFrame
country_mapping_df = spark.createDataFrame(country_code_mapping.items(), ["country_code", "country_name"])

# Join the sampled DataFrame with the country mapping DataFrame to get the country names
df_with_country_names = combined_profiles.join(country_mapping_df, on="country_code", how="left")

# Use when() to handle missing values and fill them with "Unknown"
df_with_country_names = df_with_country_names.withColumn("country_name", 
                                                         when(col("country_name").isNull(), "Unknown")
                                                         .otherwise(col("country_name")))

# Extract the distinct values from the 'employee_residence' column in 'jobs_in_data'
employee_residence_values = jobs_in_data.select('employee_residence').distinct().rdd.flatMap(lambda x: x).collect()

# Filter 'df_with_country_names' based on 'country_name' being in the values list of 'employee_residence' in 'jobs_in_data'
filtered_df = df_with_country_names.filter(col('country_name').isin(employee_residence_values))

# Rename the column country_name to employee_residence
filtered_df = filtered_df.withColumnRenamed("country_name", "employee_residence")

##########################################################################
# # keep profiles realted to Data 

# Rename 'id' column
filtered_df = filtered_df.withColumnRenamed("id", "profile_id")

# List of keywords related to the data domain
data_domain_keywords = ["data", "analyst", "scientist", "engineer", "machine learning", "AI", "big data"]

# Define a function to check if a profile is related to the data domain
def is_related_to_data_domain(position):
    for keyword in data_domain_keywords:
        if keyword.lower() in position.lower():
            return True
    return False

# Register the UDF
is_related_udf = udf(is_related_to_data_domain, BooleanType())

# Apply the UDF to filter out unrelated profiles
filtered_df = filtered_df.filter(is_related_udf(col("position")))

# Pipeline for creating vectors of job_title in jobs_in_data
model = Pipeline(stages=[
    SQLTransformer(statement="SELECT *, lower(job_title) lower FROM __THIS__"),
    Tokenizer(inputCol="lower", outputCol="token"),
    StopWordsRemover(inputCol="token", outputCol="stop"),
    SQLTransformer(statement="SELECT *, concat_ws(' ', stop) concat FROM __THIS__"),
    RegexTokenizer(pattern="", inputCol="concat", outputCol="char", minTokenLength=1),
    NGram(n=2, inputCol="char", outputCol="ngram"),
    HashingTF(inputCol="ngram", outputCol="vector"),
    MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3)
]).fit(jobs_in_data)

result_jobs = model.transform(jobs_in_data)
result_jobs = result_jobs.filter(F.size(F.col("ngram")) > 0)

# Pipeline for creating vectors of position in filtered_df
model = Pipeline(stages=[
    SQLTransformer(statement="SELECT *, lower(position) lower FROM __THIS__"),
    Tokenizer(inputCol="lower", outputCol="token"),
    StopWordsRemover(inputCol="token", outputCol="stop"),
    SQLTransformer(statement="SELECT *, concat_ws(' ', stop) concat FROM __THIS__"),
    RegexTokenizer(pattern="", inputCol="concat", outputCol="char", minTokenLength=1),
    NGram(n=2, inputCol="char", outputCol="ngram"),
    HashingTF(inputCol="ngram", outputCol="vector"),
    MinHashLSH(inputCol="vector", outputCol="lsh", numHashTables=3)
]).fit(filtered_df)

result_filtered = model.transform(filtered_df)
result_filtered = result_filtered.filter(F.size(F.col("ngram")) > 0)

# Filter out rows with null values in the 'position' column
result_filtered = result_filtered.filter(col("position").isNotNull())

# Perform the similarity join operation
result = model.stages[-1].approxSimilarityJoin(result_filtered, result_jobs, 0.5, "jaccardDist")

# Select and sort the relevant columns
result = (result
          .select('datasetA.profile_id', 'datasetA.position', 'datasetB.job_title', 'jaccardDist')
          .sort(col('datasetA.position')))

w = Window.partitionBy('profile_id')
result = (result
           .withColumn('minDist', F.min('jaccardDist').over(w))
           .where(F.col('jaccardDist') == F.col('minDist'))
           .drop('minDist'))
(result
 .select('position', 'job_title', 'jaccardDist')
 .sort(F.col('profile_id'))).dropDuplicates()

result = result.withColumnRenamed('profile_id', 'result_profile_id').withColumnRenamed('position', 'result_position')

# Join filtered_df with result based on the position column
filtered_df = filtered_df.join(result, filtered_df.position == result.result_position, "inner")

filtered_df = filtered_df.dropDuplicates()

##########################################################################

def clean_company_name(name):
    if name is not None:
        return re.sub(r'[\W_]+', '', name.lower()).strip()
    return None

clean_company_name_udf = udf(clean_company_name, StringType())

filtered_profiles = filtered_df.withColumn('cleaned_company_name', clean_company_name_udf(col('current_company_name')))
companies = companies.withColumn('cleaned_name', clean_company_name_udf(col('name')))

joined_prof = filtered_profiles.join(companies, filtered_profiles['cleaned_company_name'] == companies['cleaned_name'], 'inner')


filtered_profiles = joined_prof
##########################################################################
# comapny size 

# UDF to extract and categorize company size
def categorize_company_size(size):
    if not size:
        return None
    num_match = re.findall(r'\d+', size)
    num_match = [int(num) for num in num_match]
    if not num_match:
        return None

    max_size = max(map(int, num_match))
    if max_size <= 50:
        return 'S'
    elif max_size <= 250:
        return 'M'
    else:
        return 'L'

categorize_company_size_udf = udf(categorize_company_size, StringType())

profiles_with_company_size = filtered_profiles.withColumn(
    "company_size_category",
    categorize_company_size_udf(filtered_profiles["company_size"])
)

filtered_profiles = profiles_with_company_size

###########################################################################

# parse 'duration_short' into total years
@udf(FloatType())
def parse_duration_short_udf(duration_str):
    import re 
    if not duration_str:
        return 0.0
    
    # Regex to extract years and months
    years = re.search(r'(\d+)\s+year', duration_str)
    months = re.search(r'(\d+)\s+month', duration_str)
    
    year_value = int(years.group(1)) if years else 0
    month_value = int(months.group(1)) if months else 0
    
    return year_value + month_value / 12.0

experience_schema = ArrayType(StructType([
    StructField("duration_short", StringType(), True),
]))

filtered_profiles = filtered_profiles.withColumn("experience_details", from_json("experience", experience_schema))
filtered_profiles = filtered_profiles.withColumn("single_experience", explode("experience_details"))
filtered_profiles = filtered_profiles.withColumn("years", parse_duration_short_udf(col("single_experience.duration_short")))

@udf(StringType())
def categorize_experience(years):
    if years <= 2:
        return 'Entry-Level'
    elif years <= 5:
        return 'Mid-Level'
    else:
        return 'Senior'
    

total_experience = filtered_profiles.groupBy(filtered_profiles["profile_id"]) \
    .agg(_sum("years").alias("total_experience_years"))

filtered_profiles = filtered_profiles.join(total_experience, "profile_id", "left")

filtered_profiles = filtered_profiles.withColumn("experience_level", categorize_experience(col("total_experience_years")))

filtered_profiles = filtered_profiles.dropDuplicates(['experience_level', 'position','company_size'])

jobs_in_data = jobs_in_data.withColumn("salary_in_usd", col("salary_in_usd").cast(FloatType()))
jobs_in_data = jobs_in_data.withColumnRenamed("job_title", "position")
jobs_in_data = jobs_in_data.withColumnRenamed("company_size", "company_size_category")

##########################################################################

stages = []

# Indexing and encoding for "experience_level", "company_size", "position"
for feature_name in ["experience_level", "company_size_category", "position"]:
    indexer = StringIndexer(inputCol=feature_name, outputCol=f"{feature_name}_index", handleInvalid="keep")
    encoder = OneHotEncoder(inputCols=[f"{feature_name}_index"], outputCols=[f"{feature_name}_vec"])
    stages += [indexer, encoder]

# Vector assembler
assembler = VectorAssembler(inputCols=[f"{feature_name}_vec" for feature_name in ["experience_level", "company_size_category", "position"]], outputCol="features")
stages += [assembler]

# Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="salary_in_usd")
gbt = GBTRegressor(featuresCol="features", labelCol="salary_in_usd")

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(gbt.maxDepth, [2, 5]) \
    .addGrid(gbt.maxIter, [10, 20]) \
    .build()

# stages += [rf]

evaluator = RegressionEvaluator(labelCol="salary_in_usd", predictionCol="prediction", metricName="rmse")

# Using RandomForestRegressor in the pipeline for demonstration
pipeline = Pipeline(stages=stages + [rf])

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

##########################################################################

experience_levels = jobs_in_data.select("experience_level").distinct().count()

if experience_levels > 1:

    cvModel = crossval.fit(jobs_in_data)
    bestModel = cvModel.bestModel
    predictions = bestModel.transform(filtered_profiles)
    predictions = predictions.dropDuplicates(['experience_level', 'position','company_size'])
    predictions.select("experience_level", "company_size_category", "position", "prediction").show()
    
else:
    print("Insufficient diversity in 'experience_level'.")

##############################################################################
# spark.stop()

from pyspark.ml.evaluation import RegressionEvaluator

(train_data, test_data) = jobs_in_data.randomSplit([0.8, 0.2], seed=1234)

cvModel = crossval.fit(train_data)

# Predict and evaluate on test data which must contain 'salary_in_usd'
predictions = cvModel.bestModel.transform(test_data)

# Evaluators for RMSE, MAE, and R²
evaluator_rmse = RegressionEvaluator(labelCol="salary_in_usd", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="salary_in_usd", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="salary_in_usd", predictionCol="prediction", metricName="r2")


rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"Coefficient of Determination (R²): {r2}")


+----------------+---------------------+--------------------+------------------+
|experience_level|company_size_category|            position|        prediction|
+----------------+---------------------+--------------------+------------------+
|     Entry-Level|                    S|--Software Engine...|117515.18209311957|
|     Entry-Level|                    S|AI Scientist at R...|117515.18209311957|
|     Entry-Level|                    S|AWS /DevOps Engineer|117515.18209311957|
|     Entry-Level|                    S|Air management sp...|117515.18209311957|
|     Entry-Level|                    S|             Analyst|117515.18209311957|
|     Entry-Level|                    S|             Analyst|117515.18209311957|
|     Entry-Level|                    S|             Analyst|117515.18209311957|
|     Entry-Level|                    S|      Analyst at Abc|117515.18209311957|
|     Entry-Level|                    M|Analytics Enginee...|142859.65785400855|
|     Entry-Level|          