# Imports and SparkSession setup

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType
import json
import os
import math
import re

In [2]:
# Add here your team number
team = 14

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName(f"Team {team} - spark ML Job Descriptions")\
        .master("yarn")\
        .config("spark.submit.deployMode", "client")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/23 19:49:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/23 19:49:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/04/23 19:49:03 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
25/04/23 19:49:03 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


# List Hive databases and tables

In [3]:
print("Listing available databases:")
print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

Listing available databases:


                                                                                

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/apps/hive/warehouse'), Database(name='retake1', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team36/retakedb1'), Database(name='root_db', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/root/root_db'), Database(name='show', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team36/data2'), Database(name='team0_projectdb', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team0/project/hive/warehouse'), Database(name='team11_projectdb', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team11/project/hive/warehouse'), Database(name='team12_hive_projectdb', catalog='spark_catalog', description='', locationUri

In [4]:
db = 'team14_projectdb'
print(f"\nListing tables in {db}:")
print(spark.catalog.listTables(db))
spark.sql(f"USE {db};")
spark.sql("SHOW TABLES;").show()


Listing tables in team14_projectdb:
[Table(name='job_descriptions_part', catalog='spark_catalog', namespace=['team14_projectdb'], description=None, tableType='EXTERNAL', isTemporary=False)]
+----------------+--------------------+-----------+
|       namespace|           tableName|isTemporary|
+----------------+--------------------+-----------+
|team14_projectdb|job_descriptions_...|      false|
+----------------+--------------------+-----------+



# Read Hive table

In [5]:
table_name = 'job_descriptions_part'
df = spark.read.format("avro").table(f'{db}.{table_name}')
print("\nDataframe schema:")
df.printSchema()


Dataframe schema:
root
 |-- id: integer (nullable = true)
 |-- job_id: long (nullable = true)
 |-- experience: string (nullable = true)
 |-- qualifications: string (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: decimal(9,6) (nullable = true)
 |-- longitude: decimal(9,6) (nullable = true)
 |-- company_size: integer (nullable = true)
 |-- job_posting_date: date (nullable = true)
 |-- contact_person: string (nullable = true)
 |-- preference: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- role: string (nullable = true)
 |-- job_portal: string (nullable = true)
 |-- job_description: string (nullable = true)
 |-- benefits: string (nullable = true)
 |-- skills: string (nullable = true)
 |-- responsibilities: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- company_profile: string (nullable = true)


In [6]:
print("\nSample data:")
df.show(5, truncate=False)


Sample data:


25/04/23 19:50:47 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
[Stage 5:>                                                          (0 + 1) / 1]

+-------+----------------+-------------+--------------+------------+------------+----------------------------+----------+----------+------------+----------------+---------------+----------+--------------------+------------------------+--------------------------------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------

                                                                                

In [7]:
print(f"\nTotal rows before cleaning: {df.count()}")




Total rows before cleaning: 1615940


                                                                                

# Data Preprocessing and Feature Engineering

In [8]:
# Handle missing values
df = df.na.drop()
print(f"Total rows after removing nulls: {df.count()}")



Total rows after removing nulls: 1615940


                                                                                

# Parse salary_range

In [11]:
def extract_salary_range(salary):
    try:
        if salary is None:
            return (None, None)
        
        # Remove the currency symbol and 'K'
        # Example: "$59K-$99K" -> ["59", "99"]
        salary = salary.replace('$', '')
        parts = salary.split('-')
        
        min_salary = float(parts[0].replace('K', '')) * 1000 if 'K' in parts[0] else float(parts[0])
        max_salary = float(parts[1].replace('K', '')) * 1000 if 'K' in parts[1] else float(parts[1])
        
        return (min_salary, max_salary)
    except:
        return (None, None)

In [12]:
extract_salary_udf = F.udf(extract_salary_range, StructType([
    StructField("min", DoubleType(), True),
    StructField("max", DoubleType(), True)
]))

In [13]:
df = df.withColumn("salary_parsed", extract_salary_udf(F.col("salary_range")))
df = df.withColumn("salary_min", F.col("salary_parsed.min"))
df = df.withColumn("salary_max", F.col("salary_parsed.max"))
df = df.withColumn("salary_avg", (F.col("salary_min") + F.col("salary_max"))/2)

In [14]:
# Drop rows with null salary values
df = df.filter(F.col("salary_min").isNotNull() & F.col("salary_max").isNotNull())

In [15]:
df.select("salary_avg").show(5, truncate=False)

[Stage 12:>                                                         (0 + 1) / 1]

+----------+
|salary_avg|
+----------+
|90000.0   |
|91000.0   |
|93000.0   |
|77500.0   |
|81000.0   |
+----------+
only showing top 5 rows



                                                                                

# Parse experience

In [16]:
def extract_experience(exp_str):
    try:
        if exp_str is None:
            return (None, None)
        
        # Find all numbers in the string
        numbers = re.findall(r'\d+', exp_str)
        
        if len(numbers) >= 2:
            min_exp = int(numbers[0])
            max_exp = int(numbers[1])
            return (min_exp, max_exp)
        elif len(numbers) == 1:
            # If only one number, use it for both min and max
            exp = int(numbers[0])
            return (exp, exp)
        else:
            return (None, None)
    except:
        return (None, None)

In [17]:
extract_experience_udf = F.udf(extract_experience, StructType([
    StructField("min", IntegerType(), True),
    StructField("max", IntegerType(), True)
]))

In [18]:
df = df.withColumn("experience_parsed", extract_experience_udf(F.col("experience")))
df = df.withColumn("experience_min", F.col("experience_parsed.min"))
df = df.withColumn("experience_max", F.col("experience_parsed.max"))
df = df.withColumn("experience_avg", (F.col("experience_min") + F.col("experience_max"))/2)

In [19]:
df.select("experience_avg").show(5, truncate=False)

[Stage 13:>                                                         (0 + 1) / 1]

+--------------+
|experience_avg|
+--------------+
|7.5           |
|8.5           |
|8.5           |
|7.5           |
|8.0           |
+--------------+
only showing top 5 rows



                                                                                

# Process company_profile JSON field

In [20]:
# Define a UDF to check if a string is valid JSON
def is_valid_json(s):
    try:
        json.loads(s)
        return True
    except:
        return False

In [21]:
is_valid_json_udf = F.udf(is_valid_json)

In [22]:
# Clean company_profile JSON field
df = df.withColumn("is_valid_json", is_valid_json_udf(F.col("company_profile")))
df = df.filter(F.col("is_valid_json") == True)

In [23]:
print(f"Total rows after json validation: {df.count()}")



Total rows after json validation: 1608618


                                                                                

In [24]:
# Clean JSON string by replacing single quotes with double quotes
df = df.withColumn("company_profile_cleaned", F.regexp_replace(F.col("company_profile"), "'", '"'))

In [25]:
# Define schema for company profile
company_profile_schema = StructType([
    StructField("Sector", StringType(), True),
    StructField("Industry", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Zip", StringType(), True),
    StructField("Website", StringType(), True),
    StructField("Ticker", StringType(), True),
    StructField("CEO", StringType(), True)
])

In [26]:
# Parse the JSON into separate columns
df = df.withColumn("company_profile_parsed", F.from_json(F.col("company_profile_cleaned"), company_profile_schema))

In [27]:
# Extract each field from the parsed struct
df = df.withColumn("sector", F.col("company_profile_parsed.Sector"))
df = df.withColumn("industry", F.col("company_profile_parsed.Industry"))
df = df.withColumn("company_city", F.col("company_profile_parsed.City"))
df = df.withColumn("company_state", F.col("company_profile_parsed.State"))
df = df.withColumn("company_zip", F.col("company_profile_parsed.Zip"))

In [31]:
df.select("company_state").show(5, truncate=False)

+-------------+
|company_state|
+-------------+
|Telangana    |
|N/A          |
|Pennsylvania |
|Texas        |
|Illinois     |
+-------------+
only showing top 5 rows



### It is experimental feature FIX ???

In [32]:
df = df.withColumn("is_public", 
    F.when(
        F.col("company_profile_parsed.Ticker").isNotNull() & (F.length(F.col("company_profile_parsed.Ticker")) > 0), 
        1
    ).otherwise(0)
)

In [33]:
# Infer CEO gender
def infer_gender(name):
    if name is None or name.strip() == "":
        return "unknown"
    # Very simple gender inference based on first name
    # This is a simplified approach - consider using a proper gender detection library in production
    male_names = {"john", "david", "michael", "robert", "james", "william", "mark", "richard", "thomas", "charles", 
                 "steven", "kevin", "joseph", "brian", "jeff", "scott", "mike", "paul", "dan", "chris", "tim", "greg"}
    female_names = {"mary", "patricia", "jennifer", "linda", "elizabeth", "barbara", "susan", "jessica", "sarah", 
                   "karen", "lisa", "nancy", "betty", "margaret", "sandra", "ashley", "kimberly", "emily", "donna", 
                   "michelle", "carol", "amanda", "melissa", "deborah", "stephanie"}
    
    first_name = name.split()[0].lower()
    if first_name in male_names:
        return "male"
    elif first_name in female_names:
        return "female"
    return "unknown"


In [34]:
infer_gender_udf = F.udf(infer_gender, StringType())
df = df.withColumn("ceo_gender", infer_gender_udf(F.col("company_profile_parsed.Ceo")))

In [35]:
# Drop temporary columns
df = df.drop("is_valid_json", "company_profile_cleaned", "company_profile_parsed", "experience_parsed", "salary_parsed")

In [44]:
df.select("ceo_gender").distinct().show(5, truncate=False)



+----------+
|ceo_gender|
+----------+
|unknown   |
|female    |
|male      |
+----------+



                                                                                

# Process benefits field ???? (FIX)

In [37]:
# Clean benefits field
df = df.withColumn("benefits_cleaned", 
                   F.regexp_replace(F.regexp_replace(F.col("benefits"), "\\{\\\'|\\\'\\\'\\\\'", ""), ",\\s", ","))

In [38]:
# Count the number of benefits listed
df = df.withColumn("benefits_count", F.size(F.split(F.col("benefits_cleaned"), ",")))

In [39]:
# Extract common benefits as binary flags
common_benefits = ["health insurance", "dental", "vision", "401k", "retirement", "pto", "paid time off", 
                   "flexible", "remote", "bonus", "education", "training", "insurance", "life insurance"]


In [40]:
for benefit in common_benefits:
    df = df.withColumn(f"has_{benefit.replace(' ', '_')}", 
                        F.when(F.lower(F.col("benefits")).contains(benefit), 1).otherwise(0))


In [41]:
df.select("benefits_cleaned").show(5, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------+
|benefits_cleaned                                                                                                                                |
+------------------------------------------------------------------------------------------------------------------------------------------------+
|Casual Dress Code,Social and Recreational Activities,Employee Referral Programs,Health and Wellness Facilities,Life and Disability Insurance'}  |
|Childcare Assistance,Paid Time Off (PTO),Relocation Assistance,Flexible Work Arrangements,Professional Development'}                            |
|Life and Disability Insurance,Stock Options or Equity Grants,Employee Recognition Programs,Health Insurance,Social and Recreational Activities'}|
|Employee Referral Programs,Financial Counseling,Health and Wellness Facilities,Casual Dress Code,Flexible Spending Ac

In [43]:
df.select("benefits_count").distinct().show(5, truncate=False)



+--------------+
|benefits_count|
+--------------+
|5             |
+--------------+



                                                                                

# Process skills and responsibilities fields

In [46]:
# Count the number of skills and responsibilities listed
df = df.withColumn("skills_count", F.size(F.split(F.col("skills"), " ")))
df = df.withColumn("responsibilities_count", F.size(F.split(F.col("responsibilities"), "\\.")))

In [51]:
df.select("skills_count").distinct().show(5, truncate=False)



+------------+
|skills_count|
+------------+
|31          |
|34          |
|28          |
|27          |
|26          |
+------------+
only showing top 5 rows



                                                                                

In [50]:
df.select("skills_count").distinct().show(5, truncate=False)



+------------+
|skills_count|
+------------+
|31          |
|34          |
|28          |
|27          |
|26          |
+------------+
only showing top 5 rows



                                                                                

# Process job_posting_date (temporal features)

In [54]:
# Extract year, month, day features from job_posting_date
df = df.withColumn("job_posting_year", F.year(F.col("job_posting_date")))
df = df.withColumn("job_posting_month", F.month(F.col("job_posting_date")))
df = df.withColumn("job_posting_day", F.dayofmonth(F.col("job_posting_date")))

In [55]:
# Create cyclical features for month and day
df = df.withColumn("month_sin", F.sin(2 * math.pi * F.col("job_posting_month") / 12))
df = df.withColumn("month_cos", F.cos(2 * math.pi * F.col("job_posting_month") / 12))
df = df.withColumn("day_sin", F.sin(2 * math.pi * F.col("job_posting_day") / 31))
df = df.withColumn("day_cos", F.cos(2 * math.pi * F.col("job_posting_day") / 31))

In [56]:
df.select("month_sin").distinct().show(5, truncate=False)



+-----------------------+
|month_sin              |
+-----------------------+
|-1.0                   |
|1.0                    |
|0.8660254037844386     |
|-2.4492935982947064E-16|
|-0.4999999999999997    |
+-----------------------+
only showing top 5 rows



                                                                                

# Process geospatial data (latitude and longitude)

In [57]:
# Convert latitude and longitude to ECEF (Earth-Centered, Earth-Fixed) coordinates
# WGS84 ellipsoid constants
a = 6378137.0  # semi-major axis in meters
b = 6356752.314245  # semi-minor axis in meters
f = (a - b) / a  # flattening
e_sq = f * (2 - f)  # eccentricity squared

In [58]:
def lat_lon_to_ecef(lat, lon, alt=0):
    # Convert geodetic coordinates to ECEF coordinates
    # lat, lon in degrees, alt in meters
    lat_rad = lat * math.pi / 180.0
    lon_rad = lon * math.pi / 180.0
    
    N = a / math.sqrt(1 - e_sq * math.sin(lat_rad)**2)
    
    x = (N + alt) * math.cos(lat_rad) * math.cos(lon_rad)
    y = (N + alt) * math.cos(lat_rad) * math.sin(lon_rad)
    z = (N * (1 - e_sq) + alt) * math.sin(lat_rad)
    
    return (x, y, z)


In [59]:
# Create UDF for the conversion
ecef_schema = StructType([
    StructField("x", DoubleType(), False),
    StructField("y", DoubleType(), False),
    StructField("z", DoubleType(), False)
])

In [60]:
def convert_to_ecef(lat, lon):
    if lat is None or lon is None:
        return None
    try:
        x, y, z = lat_lon_to_ecef(float(lat), float(lon))
        return (x, y, z)
    except:
        return None

In [61]:
ecef_udf = F.udf(convert_to_ecef, ecef_schema)

In [62]:
# Apply the UDF to create ECEF coordinates
df = df.withColumn("ecef", ecef_udf(F.col("latitude"), F.col("longitude")))

In [63]:
# Extract individual coordinates
df = df.withColumn("ecef_x", F.col("ecef.x"))
df = df.withColumn("ecef_y", F.col("ecef.y"))
df = df.withColumn("ecef_z", F.col("ecef.z"))

In [64]:
# Drop the temporary struct column
df = df.drop("ecef")

In [65]:
df.select("ecef_x").distinct().show(5, truncate=False)



+------------------+
|ecef_x            |
+------------------+
|-5915153.491310677|
|6176305.363394763 |
|3341897.406283359 |
|159804.00269604026|
|1675260.9402652734|
+------------------+
only showing top 5 rows



                                                                                

# Feature Selection and Categorization

In [69]:
# Define feature types for building the ML pipeline based on the transformation proposal
numerical_features = [
    'company_size', 'benefits_count', 'skills_count', 'responsibilities_count',
    'job_posting_year', 'experience_min', 'experience_max', 'experience_avg',
    'ecef_x', 'ecef_y', 'ecef_z', 'is_public'
]

categorical_features = [
    'qualifications', 'work_type', 'preference', 'job_portal', 'sector', 
    'industry', 'company_state', 'ceo_gender'
]

text_features = [
    'job_title', 'role', 'job_description'
]

cyclical_features = [
    'month_sin', 'month_cos', 'day_sin', 'day_cos'
]

binary_features = [col for col in df.columns if col.startswith('has_')]

In [70]:
# Target is salary_avg (average of min and max salary)
target = 'salary_avg'

In [71]:
# All features to use in the model
all_features = numerical_features + categorical_features + text_features + cyclical_features + binary_features
selected_columns = all_features + [target]

In [72]:
# Select only the columns we need
df_selected = df.select(selected_columns)
print(f"Selected {len(selected_columns)} columns for the ML pipeline")

Selected 42 columns for the ML pipeline


In [75]:
df_selected.columns

['company_size',
 'benefits_count',
 'skills_count',
 'responsibilities_count',
 'job_posting_year',
 'experience_min',
 'experience_max',
 'experience_avg',
 'ecef_x',
 'ecef_y',
 'ecef_z',
 'is_public',
 'qualifications',
 'work_type',
 'preference',
 'job_portal',
 'sector',
 'industry',
 'company_state',
 'ceo_gender',
 'job_title',
 'role',
 'job_description',
 'month_sin',
 'month_cos',
 'day_sin',
 'day_cos',
 'has_health_insurance',
 'has_dental',
 'has_vision',
 'has_401k',
 'has_retirement',
 'has_pto',
 'has_paid_time_off',
 'has_flexible',
 'has_remote',
 'has_bonus',
 'has_education',
 'has_training',
 'has_insurance',
 'has_life_insurance',
 'salary_avg']

# Build Feature Extraction Pipeline

In [85]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, 
    Tokenizer, StopWordsRemover, CountVectorizer, IDF,
    StandardScaler
)

In [86]:
# Process categorical features
categorical_indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_indexed", handleInvalid="skip") 
                       for c in categorical_features]
categorical_encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=f"{indexer.getOutputCol()}_encoded") 
                       for indexer in categorical_indexers]


In [87]:
# Process text features
tokenizers = [Tokenizer(inputCol=c, outputCol=f"{c}_tokens") for c in text_features]
remover = [StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol=f"{c}_filtered") 
          for tokenizer, c in zip(tokenizers, text_features)]
count_vectorizers = [CountVectorizer(inputCol=remover[i].getOutputCol(), outputCol=f"{c}_counted", minDF=5.0) 
                    for i, c in enumerate(text_features)]
idfs = [IDF(inputCol=count_vectorizer.getOutputCol(), outputCol=f"{c}_tfidf") 
       for count_vectorizer, c in  zip(count_vectorizers, text_features)]


In [90]:
# Get all feature columns after transformations
transformed_categorical = [encoder.getOutputCol() for encoder in categorical_encoders]
transformed_text = [idf.getOutputCol() for idf in idfs]
all_features = numerical_features + cyclical_features + binary_features + transformed_categorical + transformed_text


In [92]:
# First assemble just the numerical features for scaling
numerical_assembler = VectorAssembler(inputCols=numerical_features, outputCol="numerical_features")
numerical_scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features", withStd=True, withMean=True)

# Then assemble all features with the scaled numerical ones
all_features_for_assembly = ["scaled_numerical_features"] + cyclical_features + transformed_categorical + transformed_text + binary_features
assembler = VectorAssembler(inputCols=all_features_for_assembly, outputCol="features")

In [99]:
# Create the pipeline stages
stages = categorical_indexers + categorical_encoders + tokenizers + remover + count_vectorizers + idfs
stages.append(numerical_assembler)
stages.append(numerical_scaler)
stages.append(assembler)

In [100]:
# Create the pipeline
pipeline = Pipeline(stages=stages)
print(f"Created pipeline with {len(stages)} stages")

Created pipeline with 31 stages


# Fit the Pipeline and Transform Data

In [101]:
# Fit the pipeline to the data
pipeline_model = pipeline.fit(df_selected)

                                                                                

In [102]:
# Transform the data
transformed_data = pipeline_model.transform(df_selected)

In [104]:
# Select only the columns we need for ML
ml_data = transformed_data.select("features", F.col(target).alias("label"))
print("Pipeline fitted and data transformed successfully")

Pipeline fitted and data transformed successfully


# Split into Train and Test Sets

In [105]:
# Split the data into training (70%) and test (30%) sets
(train_data, test_data) = ml_data.randomSplit([0.7, 0.3], seed=42)

In [79]:
# Show sizes of train and test sets
print(f"Training set size: {train_data.count()}")
print(f"Test set size: {test_data.count()}")

                                                                                

Training set size: 1111430




Test set size: 475255


                                                                                

In [106]:
train_data.columns

['features', 'label']

# Save Data to HDFS

In [109]:
# Save training data to HDFS in JSON format
train_data.repartition(3) \
    .write \
    .mode("overwrite") \
    .format("json") \
    .save("project/data/train")

                                                                                

In [110]:
# Save test data to HDFS in JSON format
test_data.repartition(2) \
    .write \
    .mode("overwrite") \
    .format("json") \
    .save("project/data/test")

                                                                                