In [5]:
!pip install kagglehub



In [6]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("zongaobian/h1b-lca-disclosure-data-2020-2024")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/zongaobian/h1b-lca-disclosure-data-2020-2024?dataset_version_number=1...


100%|██████████| 1.27G/1.27G [00:45<00:00, 29.8MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/zongaobian/h1b-lca-disclosure-data-2020-2024/versions/1


In [7]:
# List the files in the downloaded directory to see what files are available
import os
import pandas as pd
files = os.listdir(path)
print("Files in the dataset directory:", files)

Files in the dataset directory: ['Combined_LCA_Disclosure_Data_FY2024.csv', 'Combined_LCA_Disclosure_Data_FY2023.csv', 'Combined_LCA_Disclosure_Data_FY2022.csv', 'Combined_LCA_Disclosure_Data_FY2021.csv', 'Combined_LCA_Disclosure_Data_FY2020_to_FY2024.csv', 'Combined_LCA_Disclosure_Data_FY2020.csv']


In [8]:
from pyspark.sql import SparkSession

if 'spark' in locals():
    spark.stop()

In [9]:
spark = SparkSession.builder.master("local[*]").appName("DataFrame").getOrCreate()

# Join the tables
References: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.union.html

Before doing EDA, we need to join the data of each year in 2020-2024 using union join.

In [10]:
# Read and union all CSVs using Spark
# Initialize union DataFrame
df_union = None

# List all relevant CSV files, excluding the summary file
files = [
    f for f in os.listdir(path)
    if f.endswith(".csv") and "to_FY2024" not in f
]

# Read and union all CSVs using Spark with schema inference
for file in files:
    file_path = f"file://{os.path.join(path, file)}"
    print("Joining:", file_path)

    df = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .csv(file_path)

    if df_union is None:
        df_union = df
    else:
        df_union = df_union.unionByName(df)

# Show result
df_union.show(10)

Joining: file:///root/.cache/kagglehub/datasets/zongaobian/h1b-lca-disclosure-data-2020-2024/versions/1/Combined_LCA_Disclosure_Data_FY2024.csv
Joining: file:///root/.cache/kagglehub/datasets/zongaobian/h1b-lca-disclosure-data-2020-2024/versions/1/Combined_LCA_Disclosure_Data_FY2023.csv
Joining: file:///root/.cache/kagglehub/datasets/zongaobian/h1b-lca-disclosure-data-2020-2024/versions/1/Combined_LCA_Disclosure_Data_FY2022.csv
Joining: file:///root/.cache/kagglehub/datasets/zongaobian/h1b-lca-disclosure-data-2020-2024/versions/1/Combined_LCA_Disclosure_Data_FY2021.csv
Joining: file:///root/.cache/kagglehub/datasets/zongaobian/h1b-lca-disclosure-data-2020-2024/versions/1/Combined_LCA_Disclosure_Data_FY2020.csv
+------------------+-----------+-------------+-------------+------------------+--------------+--------------------+----------+--------------------+------------------+----------+----------+----------------------+--------------+--------------------+--------------------------+------

# data cleaning and EDA

In [11]:
df_union.printSchema()

root
 |-- CASE_NUMBER: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- RECEIVED_DATE: date (nullable = true)
 |-- DECISION_DATE: date (nullable = true)
 |-- ORIGINAL_CERT_DATE: date (nullable = true)
 |-- VISA_CLASS: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- SOC_CODE: string (nullable = true)
 |-- SOC_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- BEGIN_DATE: string (nullable = true)
 |-- END_DATE: string (nullable = true)
 |-- TOTAL_WORKER_POSITIONS: string (nullable = true)
 |-- NEW_EMPLOYMENT: string (nullable = true)
 |-- CONTINUED_EMPLOYMENT: string (nullable = true)
 |-- CHANGE_PREVIOUS_EMPLOYMENT: string (nullable = true)
 |-- NEW_CONCURRENT_EMPLOYMENT: string (nullable = true)
 |-- CHANGE_EMPLOYER: string (nullable = true)
 |-- AMENDED_PETITION: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- TRADE_NAME_DBA: string (nullable = true)
 |-- EMPLOYER_ADDRESS1: st

In [12]:
num_rows = df_union.count()
num_columns = len(df_union.columns)

print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_columns}")

Number of rows: 3564698
Number of columns: 96


In [13]:
from pyspark.sql.functions import col

# Get the schema as a list of tuples (column_name, data_type)
schema_info = df_union.dtypes

# Initialize counters
num_numeric = 0
num_string = 0
num_other = 0

# Classify columns based on their data types
for col_name, col_type in schema_info:
    if col_type in ['int', 'bigint', 'double', 'float', 'decimal', 'long', 'short']:
        num_numeric += 1
    elif col_type in ['string']:
        num_string += 1
    else:
        num_other += 1

# Display the summary
print(f"Number of Numerical Columns: {num_numeric}")
print(f"Number of String (Object) Columns: {num_string}")
print(f"Number of Other Data Type Columns: {num_other}")

Number of Numerical Columns: 1
Number of String (Object) Columns: 92
Number of Other Data Type Columns: 3


### Adress missing data

delete columns that have more than 50% null values

In [14]:
## Null values (percentage)- delete columns that have more than 50% null values
from pyspark.sql.functions import col, sum, when

# Calculate the percentage of missing values for each column
num_rows = df_union.count()
null_count_df = df_union.select([
    (sum(when(col(c).isNull(), 1).otherwise(0)) / num_rows * 100).alias(c)
    for c in df_union.columns
])

# Collect the missing percentage as a dictionary
missing_percentage = null_count_df.collect()[0].asDict()

# Filter out columns with more than 50% missing values
columns_to_keep = [col_name for col_name, perc in missing_percentage.items() if perc <= 50]

# Create a new DataFrame with the filtered columns
filtered_df = df_union.select(columns_to_keep)

# Display the first few rows to verify the content
filtered_df.show(10)

+------------------+-----------+-------------+-------------+--------------+--------------------+----------+--------------------+------------------+----------+----------+----------------------+--------------+--------------------+--------------------------+-------------------------+---------------+----------------+--------------------+--------------------+-------------+--------------+--------------------+--------------------+--------------+----------+----------------------+-----------------------+----------------------+---------------------+-----------------+------------------+------------------------+--------------------+------------------+--------------------+---------------------------+------------------------+-------------------------+-----------------------+-----------------------+-------------------+--------------------+--------------------------+----------------------+--------------------+----------------------------+--------------------------+----------------------+--------------

#### Filter out only the full time employee

In [15]:
# Check how many people are full time
from pyspark.sql.functions import col, count, round

# Count total number of rows
total_count = filtered_df.count()

# Count Y and N values, and calculate their percentage
filtered_df.groupBy("FULL_TIME_POSITION") \
    .agg(count("*").alias("count")) \
    .withColumn("percentage", round((col("count") / total_count) * 100, 2)) \
    .show()

+--------------------+-------+----------+
|  FULL_TIME_POSITION|  count|percentage|
+--------------------+-------+----------+
|                NULL|     59|       0.0|
|                   Y|3506603|     98.37|
| Materials Engineers|      3|       0.0|
|                   N|  57978|      1.63|
|Health Specialtie...|      1|       0.0|
|000 or higher in ...|     26|       0.0|
|  Financial Analysts|      1|       0.0|
|Software Develope...|      3|       0.0|
|                LLC"|      2|       0.0|
|000 or higher ann...|     12|       0.0|
|               Ren 2|      2|       0.0|
|               INC."|      2|       0.0|
|Environmental Eng...|      1|       0.0|
|",Rancho Cordova,...|      1|       0.0|
| Department of Ch...|      1|       0.0|
|Regulatory Affair...|      1|       0.0|
|          Suite 100"|      1|       0.0|
|          27-1014.00|      1|       0.0|
+--------------------+-------+----------+



In [16]:
# Filter only rows where the FULL_TIME_POSITION value is 'Y'
df_full_time_only = filtered_df.filter(col("FULL_TIME_POSITION") == "Y")

#### Calculate years of experience as one of our features for predictive modeling using the 'begin_date' and 'end_date' columns

In [17]:
# Calculate years of experience
from pyspark.sql.functions import to_date, datediff, round, col

# Step 1: Parse ISO date strings and filter rows with missing dates
df_dates = df_full_time_only.withColumn("BEGIN_DATE_PARSED", to_date(col("BEGIN_DATE"))) \
                   .withColumn("END_DATE_PARSED", to_date(col("END_DATE"))) \
                   .filter(col("BEGIN_DATE_PARSED").isNotNull() & col("END_DATE_PARSED").isNotNull())

# Step 2: Calculate experience in years (rounded to 2 decimals)
df_dates = df_dates.withColumn("yrs_of_experience",
                               round(datediff(col("END_DATE_PARSED"), col("BEGIN_DATE_PARSED")) / 365.0, 2))

# Step 3: Show result
df_dates.select("BEGIN_DATE", "END_DATE", "yrs_of_experience").show()

+----------+----------+-----------------+
|BEGIN_DATE|  END_DATE|yrs_of_experience|
+----------+----------+-----------------+
|2023-12-21|2026-12-20|              3.0|
|2023-12-21|2025-12-20|              2.0|
|2024-04-01|2027-03-31|              3.0|
|2024-03-23|2027-03-22|              3.0|
|2024-06-18|2027-06-17|              3.0|
|2024-03-01|2025-02-28|              1.0|
|2024-06-18|2027-06-16|             2.99|
|2024-01-01|2026-12-31|              3.0|
|2024-01-22|2027-01-21|              3.0|
|2024-01-02|2026-12-31|              3.0|
|2023-12-29|2026-12-28|              3.0|
|2024-01-08|2027-01-07|              3.0|
|2023-12-21|2026-12-20|              3.0|
|2024-03-23|2027-03-22|              3.0|
|2023-12-25|2026-12-24|              3.0|
|2024-06-18|2027-06-17|              3.0|
|2024-01-01|2026-12-31|              3.0|
|2024-01-18|2027-01-17|              3.0|
|2024-06-19|2027-06-18|              3.0|
|2024-06-07|2027-06-06|              3.0|
+----------+----------+-----------

#### Categorize job titles

In [18]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Define your categories as a list of tuples (already correct)
categories = [
    ("Computer Science", ["software", "developer", "computer", "programmer", "programming", "architect", "elect", "information","security"]),
    ("Business & Accounting", ["account", "audit","tax","business", "finan", "market", "purchas", "supply chain", "estate", "broker", "cost", "budget", "buyer", "management", "sale", "risk", "invest", "operation", "econ", "human resour", "credit", "manager", "project manager","intelligence","trader","treasu","talent","insur"]),
    ("Data", ["data", "statistic"]),
    ("Law", ["law", "police", "political", "regulat", "compliance",'legal']),
    ("Healthcare", ["medical", "physician", "doctor", "nurse", "therap", "animal", "surgeon", "health", "cardiovascular", "clini", "hospital", "dentist", "pediatri", "pharm", "epidemio", "dental","internist","psychiatrist","vet","fitness","patient","opticians","prosthodontist"]),
    ("Bio-chemical", ["bio", "chem", "plastic", "nuclear", "materi", "natural"]),
    ("Art/commu", ["design", "singer", "designer", "art", "audio", "jewel", "radio", "media", "public relation", "relig", "music", "actor","producer","dance","film","model","choreographer","photo"]),
    ("Higher Management", ["chief", "vice president", "vp"]),
    ("Education", ["education", "educator", "tutor", "drafter", "pathologist", "teacher", "train", "instruction", "professor","learn","translat"]),
    ("Production, Construction,& Manufacturing", ["production", "assembler", "fabricator", "quality control", "operator", "machinist","mechan","repair", "machine", "manufacturing", "millwright", "weld","civil", "architect", "maintenance", "construct", "floor","technician"]),
    ("Geo & Transportation", ["pilot", "captain", "vessel", "driver", "transportation", "truck", "courier","logistic","freight","transport","city","geo"]),
    ("Agriculture & Forestry & environment", ["agricultural", "forester", "conservation", "horticulture", "farming", "agronomist","landscaping", "groundskeeping","sustain", "envir", "soil","recyc","tree","climate"]),
    ("Office & Administrative", ["office", "administrative", "clerk", "secretary", "procurement", "payroll", "file","clerical", "customer service", "assistant","editor","writer"]),
    ("Social & Community Services", ["social", "counselor", "therapist", "community", "child", "residential","coach", "scout", "rehabilitation", "community service","firefighter", "protective", "coroner","food", "librar", "museum", "bake", "travel", "hotel", "cook","planner","family","curators","clergy","concierges","entertain"]),
    ("Sciences", ["physicist", "mathematician", "actuary", "astronomer", "geoscientist", "hydrologist","genetic", "social scientist", "anthropologist", "archaeologist", "historian","remote sensing", "conservation", "cytogenetic", "cytotechnologist","scientist","ists","research","actuaries","recreat"]),
    ("engineer", ["engineer"])
]

# Convert list of tuples to a UDF-compatible function
def assign_category(title):
    if title is None:
        return "Other"
    title_lower = title.lower()
    for category, keywords in categories:
        for keyword in keywords:
            if keyword in title_lower:
                return category
    return "Other"

# Register the function as a UDF
assign_category_udf = udf(assign_category, StringType())

# Apply the UDF to create a new column 'Job_Group'
df_with_group = df_dates.withColumn("Job_Group", assign_category_udf(col("SOC_TITLE")))

# Remove rows where Job_Group is "Other"
df_with_group = df_with_group.filter(col("Job_Group") != "Other")

# Show a sample
df_with_group.select("SOC_TITLE", "Job_Group").show(20)

+--------------------+--------------------+
|           SOC_TITLE|           Job_Group|
+--------------------+--------------------+
|   Registered Nurses|          Healthcare|
|Network and Compu...|    Computer Science|
|Software Quality ...|    Computer Science|
| Software Developers|    Computer Science|
|Database Administ...|                Data|
|            Chemists|        Bio-chemical|
|Information Techn...|    Computer Science|
| Software Developers|    Computer Science|
|Computer Systems ...|    Computer Science|
|Electrical Engineers|    Computer Science|
|Mechanical Engineers|Production, Const...|
| Software Developers|    Computer Science|
|Computer User Sup...|    Computer Science|
|Computer and Info...|    Computer Science|
|Financial and Inv...|Business & Accoun...|
|Accountants and A...|Business & Accoun...|
| Software Developers|    Computer Science|
|Financial Quantit...|Business & Accoun...|
|Mechanical Engineers|Production, Const...|
|Computer Systems ...|    Comput

### remove outliers from wage column

In [19]:
# filter only H1-B visa
df_h1b = df_with_group.filter(col("VISA_CLASS") == "H-1B")

# filter certified H1-B
df_certified = df_h1b.filter(col("CASE_STATUS") == "Certified")

# filter out null values in states
df_cleaned = df_certified.filter(
    col("WORKSITE_STATE").isNotNull() & col("EMPLOYER_STATE").isNotNull()
)

# Cast the wage column to FloatType
df_cleaned = df_cleaned.withColumn("WAGE_RATE_OF_PAY_FROM", col("WAGE_RATE_OF_PAY_FROM").cast("float"))

# filter out null values in wage column
df_cleaned = df_cleaned.filter(col("WAGE_RATE_OF_PAY_FROM").isNotNull())

# filter out yearly pay < 60k
df_cleaned = df_cleaned.filter(col("WAGE_UNIT_OF_PAY") == "Year")

In [20]:
# Calculate IQR (Q1 and Q3)
quantiles = df_cleaned.approxQuantile("WAGE_RATE_OF_PAY_FROM", [0.25, 0.75], 0.05)
q1, q3 = quantiles
iqr = q3 - q1

# Define bounds
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

print(f"Lower Bound: {lower_bound}, Upper Bound: {upper_bound}")

Lower Bound: 9000.0, Upper Bound: 213000.0


In [21]:
df_cleaned = df_cleaned.filter(
    (col("WAGE_RATE_OF_PAY_FROM") >= 60000) & (col("WAGE_RATE_OF_PAY_FROM") <= upper_bound)
)

In [22]:
df_cleaned.select("WAGE_RATE_OF_PAY_FROM").summary("count", "min", "max").show()

+-------+---------------------+
|summary|WAGE_RATE_OF_PAY_FROM|
+-------+---------------------+
|  count|              2800139|
|    min|              60000.0|
|    max|             213000.0|
+-------+---------------------+



### detrend the salary

In [23]:
from pyspark.sql.functions import year, col, avg, broadcast

# Step 1: Extract the year from RECEIVED_DATE
df_with_year = df_cleaned.withColumn("YEAR", year(col("RECEIVED_DATE")))

In [24]:
# Step 2: Calculate mean wage for each (Job_Group, YEAR)
mean_wages = df_with_year.groupBy("Job_Group", "YEAR") \
    .agg(avg("WAGE_RATE_OF_PAY_FROM").alias("mean_wage"))

# Step 3: Filter out 2019 mean wages as the baseline
baseline_2019 = mean_wages.filter(col("YEAR") == 2019) \
    .select("Job_Group", col("mean_wage").alias("baseline_wage"))

# Step 4: Join 2019 baseline wages back to all year-group mean wages
mean_wages_with_baseline = mean_wages.join(
    broadcast(baseline_2019), on="Job_Group", how="inner"
)

# Step 5: Compute multiplier = baseline / current year mean
mean_wages_with_baseline = mean_wages_with_baseline.withColumn(
    "multiplier", col("baseline_wage") / col("mean_wage")
)

# Step 6: Join multiplier back to main data on (Job_Group, YEAR)
df_detrend_ready = df_with_year.join(
    broadcast(mean_wages_with_baseline.select("Job_Group", "YEAR", "multiplier")),
    on=["Job_Group", "YEAR"],
    how="inner"
)

# Step 7: Apply the multiplier to get detrended wage
df_detrended = df_detrend_ready.withColumn(
    "WAGE_DETRENDED", col("WAGE_RATE_OF_PAY_FROM") * col("multiplier")
)

# Preview the result
df_detrended.select("Job_Group", "YEAR", "WAGE_RATE_OF_PAY_FROM", "WAGE_DETRENDED").show(20)

+--------------------+----+---------------------+------------------+
|           Job_Group|YEAR|WAGE_RATE_OF_PAY_FROM|    WAGE_DETRENDED|
+--------------------+----+---------------------+------------------+
|    Computer Science|2023|             133000.0|119649.47147181997|
|    Computer Science|2023|             130000.0|116950.61121305711|
|                Data|2023|             112000.0| 96262.31232656725|
|    Computer Science|2023|             194479.0| 174957.2147546472|
|    Computer Science|2023|             175000.0|157433.51509449995|
|    Computer Science|2023|             121347.0|109166.19860669877|
|    Computer Science|2023|             169560.0|152539.58182527663|
|Production, Const...|2023|             131500.0|120057.04808719583|
|    Computer Science|2023|             112000.0|100757.44966047998|
|    Computer Science|2023|             100848.0| 90724.88645857217|
|    Computer Science|2023|             169776.0|152733.89976390757|
|Business & Accoun...|2023|       

### add wage bucket
- df_cleaned is the df after clean.
- df_bucket is the df after adding wage bucket.
We add 2 columns: Wage Bucket (Categorical Label) and wage_bucket (Numerical Range). In detail, Wage Bucket represents a human-readable label for the wage range. wage_bucket shows the actual numerical range corresponding to the wage bucket.


In [25]:
# df_buckets = df_buckets.drop("Wage Bucket")

In [26]:
from pyspark.sql.functions import col, when, lit, count

# Define bucket ranges and labels
bucket_intervals = [(60000, 80000), (80000, 100000), (100000, 120000), (120000, 140000),
                    (140000, 160000), (160000, 180000), (180000, 200000), (200000, 220000),
                    (220000, 240000)]
bucket_labels = [f"{lower//1000}k-{upper//1000}k" for lower, upper in bucket_intervals]

# Start with an empty "Wage Bucket" column
df_buckets = df_detrended.withColumn("Wage Bucket", lit(None)).withColumn("wage_bucket", lit(None))

# Use when conditions to assign the correct bucket
for i, ((lower, upper), label) in enumerate(zip(bucket_intervals, bucket_labels)):
    if i < len(bucket_intervals) - 1:
        # For all but the last bucket, use exclusive upper bound
        df_buckets = df_buckets.withColumn(
            "Wage Bucket",
            when((col("WAGE_RATE_OF_PAY_FROM") >= lower) & (col("WAGE_RATE_OF_PAY_FROM") < upper), label)
            .otherwise(col("Wage Bucket"))
        ).withColumn(
            "wage_bucket",
            when((col("WAGE_RATE_OF_PAY_FROM") >= lower) & (col("WAGE_RATE_OF_PAY_FROM") < upper), f"({lower}, {upper})")
            .otherwise(col("wage_bucket"))
        )
    else:
        # For the last bucket, use inclusive upper bound
        df_buckets = df_buckets.withColumn(
            "Wage Bucket",
            when((col("WAGE_RATE_OF_PAY_FROM") >= lower) & (col("WAGE_RATE_OF_PAY_FROM") <= upper), label)
            .otherwise(col("Wage Bucket"))
        ).withColumn(
            "wage_bucket",
            when((col("WAGE_RATE_OF_PAY_FROM") >= lower) & (col("WAGE_RATE_OF_PAY_FROM") <= upper), f"({lower}, {upper})")
            .otherwise(col("wage_bucket"))
        )

# Handle any values that did not fall into the defined ranges
df_buckets = df_buckets.withColumn(
    "Wage Bucket",
    when(col("Wage Bucket").isNull(), "Out of Range").otherwise(col("Wage Bucket"))
).withColumn(
    "wage_bucket",
    when(col("wage_bucket").isNull(), "Out of Range").otherwise(col("wage_bucket"))
)

# Group by bucket and count occurrences
bucket_counts = df_buckets.groupBy("Wage Bucket", "wage_bucket").agg(count("*").alias("Count")).orderBy("Wage Bucket")

# Show the result
bucket_counts.show()

+-----------+----------------+------+
|Wage Bucket|     wage_bucket| Count|
+-----------+----------------+------+
|  100k-120k|(100000, 120000)|566317|
|  120k-140k|(120000, 140000)|433006|
|  140k-160k|(140000, 160000)|303666|
|  160k-180k|(160000, 180000)|199786|
|  180k-200k|(180000, 200000)|118603|
|  200k-220k|(200000, 220000)| 54383|
|    60k-80k|  (60000, 80000)|437496|
|   80k-100k| (80000, 100000)|686882|
+-----------+----------------+------+



In [27]:
df_cleaned.printSchema()

root
 |-- CASE_NUMBER: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- RECEIVED_DATE: date (nullable = true)
 |-- DECISION_DATE: date (nullable = true)
 |-- VISA_CLASS: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- SOC_CODE: string (nullable = true)
 |-- SOC_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- BEGIN_DATE: string (nullable = true)
 |-- END_DATE: string (nullable = true)
 |-- TOTAL_WORKER_POSITIONS: string (nullable = true)
 |-- NEW_EMPLOYMENT: string (nullable = true)
 |-- CONTINUED_EMPLOYMENT: string (nullable = true)
 |-- CHANGE_PREVIOUS_EMPLOYMENT: string (nullable = true)
 |-- NEW_CONCURRENT_EMPLOYMENT: string (nullable = true)
 |-- CHANGE_EMPLOYER: string (nullable = true)
 |-- AMENDED_PETITION: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- EMPLOYER_ADDRESS1: string (nullable = true)
 |-- EMPLOYER_CITY: string (nullable = true)
 |-- EMPLOYER_STATE: strin

In [28]:
# # Set checkpoint directory
# CHECKPOINT_PATH = "file:///tmp/spark_checkpoints"
# spark.sparkContext.setCheckpointDir(CHECKPOINT_PATH)

# df_cleaned = df_cleaned.checkpoint(eager=True)

### raw features and derived features planed to use:
begin_date, end_date, years_of_experience, wage, soc_title, job category, worksite_state, employer_state

### Plot of Distribution of Wage

This histogram displays the distribution of wage for certified H-1B visa positions, reflecting the offered annual salaries.

In [29]:
# import matplotlib.pyplot as plt
# import seaborn as sns

# # Step 1: Convert to Pandas (only the needed column)
# wages_pd = df_cleaned.select("WAGE_RATE_OF_PAY_FROM") \
#                     .toPandas()

In [30]:
# # Step 2: Plot using seaborn
# plt.figure(figsize=(10, 5))
# sns.distplot(wages_pd["WAGE_RATE_OF_PAY_FROM"], bins=50, kde=True, color='skyblue')
# plt.title("Distribution of WAGE_RATE_OF_PAY_FROM")
# plt.xlabel("Wage ($)")
# plt.ylabel("Count")
# plt.grid(True)
# plt.tight_layout()
# plt.show()

#### Plot of Average Wage by Job Category

In [31]:
# # Convert needed columns from PySpark to Pandas
# wages_group_pd = df_cleaned.select("WAGE_RATE_OF_PAY_FROM", "Job_Group") \
#     .toPandas()

# # Group and compute mean wage
# avg_wage_by_group = wages_group_pd.groupby("Job_Group")["WAGE_RATE_OF_PAY_FROM"].mean().sort_values(ascending=False)

# # Plot
# plt.figure(figsize=(12, 6))
# sns.barplot(x=avg_wage_by_group.index, y=avg_wage_by_group.values, palette="coolwarm")
# plt.xticks(rotation=45, ha="right")
# plt.title("Average Wage by Job Category (H-1B Certified)")
# plt.xlabel("Job Category")
# plt.ylabel("Average Wage ($)")
# plt.tight_layout()
# plt.show()

## Model: Linear Regression

## Objective

- **Predict:** `WAGE_RATE_OF_PAY_FROM` (salary)
- **Based on:** Job category, years of experience, and region (`WORKSITE_STATE`)

---

## Model Overview

- **Applied Linear Regression:**
  - Captured linear relationships between salary and predictors.
  - Evaluated predictor significance using statistical tests (p-values, coefficients).

- **Implementation Steps:**
  1. **Feature Selection & Preprocessing:**
     - Selected features: `Job_Group`, `yrs_of_experience`, `WORKSITE_STATE`
     - Encoded categorical variables using `StringIndexer` and `OneHotEncoder`
  2. **Data Splitting:**
     - Split dataset into 80% training and 20% testing subsets.
  3. **Model Fitting:**
     - Fitted the linear regression model on the training data.
  4. **Model Evaluation:**
     - Assessed predictive performance using R², RMSE, and MAE on both training and test sets.

In [32]:
# from pyspark.sql import SparkSession
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
# from pyspark.ml.regression import LinearRegression
# from pyspark.ml import Pipeline
# from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
# from pyspark.sql.functions import when, col

# # Initialize Spark session
# spark = SparkSession.builder.appName("H1BLinearRegression").getOrCreate()

# # Load the dataset (here df_cleaned is assumed to be your pre-loaded Spark DataFrame)
# data = df_cleaned

# # Select relevant columns (make sure these columns exist in your DataFrame)
# data = data.select("Job_Group", "yrs_of_experience", "WORKSITE_STATE", "WAGE_RATE_OF_PAY_FROM")

# # Define categorical columns (for this example, we use Job_Group, WORKSITE_STATE, and EMPLOYER_STATE)
# categorical_columns = ["Job_Group", "WORKSITE_STATE"]

# # Create StringIndexers for categorical columns
# indexers = [
#     StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
#     for col in categorical_columns
# ]

# # Create OneHotEncoders for the indexed columns
# encoders = [
#     OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_encoded")
#     for indexer in indexers
# ]

# # Assemble features: encoded categorical columns plus numeric column "yrs_of_experience"
# assembler_inputs = [col + "_index_encoded" for col in categorical_columns] + ["yrs_of_experience"]
# assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# # Define the Linear Regression model
# lr = LinearRegression(featuresCol="features", labelCol="WAGE_RATE_OF_PAY_FROM")

# print("here")

# # Build the pipeline with indexers, encoders, assembler, and the regression model
# pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

# # Split the data into training and testing sets
# train, test = data.randomSplit([0.8, 0.2], seed=42)

# # Fit the pipeline model on the training data
# model = pipeline.fit(train)

# print("here")

# # Make predictions on both the training and test sets
# train_predictions = model.transform(train)
# test_predictions = model.transform(test)

# # -------------------------
# # Regression Metrics
# # -------------------------

# # Define RegressionEvaluators for RMSE, MAE, and R²
# reg_evaluator_rmse = RegressionEvaluator(labelCol="WAGE_RATE_OF_PAY_FROM", predictionCol="prediction", metricName="rmse")
# reg_evaluator_mae = RegressionEvaluator(labelCol="WAGE_RATE_OF_PAY_FROM", predictionCol="prediction", metricName="mae")
# reg_evaluator_r2  = RegressionEvaluator(labelCol="WAGE_RATE_OF_PAY_FROM", predictionCol="prediction", metricName="r2")

# # Evaluate on training set
# print("Regression Metrics on Training Data:")
# train_rmse = reg_evaluator_rmse.evaluate(train_predictions)
# print("  RMSE:", train_rmse)
# train_mae  = reg_evaluator_mae.evaluate(train_predictions)
# print("  MAE :", train_mae)
# train_r2   = reg_evaluator_r2.evaluate(train_predictions)
# print("  R²  :", train_r2)

# # Evaluate on test set
# test_rmse = reg_evaluator_rmse.evaluate(test_predictions)
# test_mae  = reg_evaluator_mae.evaluate(test_predictions)
# test_r2   = reg_evaluator_r2.evaluate(test_predictions)

# print("\nRegression Metrics on Test Data:")
# print("  RMSE:", test_rmse)
# print("  MAE :", test_mae)
# print("  R²  :", test_r2)

## Performance Metrics

**Training Data:**
- **RMSE:** 34,943.61  
- **MAE:** 27,123.98  
- **R²:** 0.205

**Test Data:**
- **RMSE:** 34,913.56  
- **MAE:** 27,105.75  
- **R²:** 0.203

---

## Interpretation

- **RMSE & MAE:**  
  - Represent the average error in salary predictions.
- **R² (~20%):**  
  - Indicates that approximately 20% of the variance in salary is explained by the current predictors.
  - Suggests potential for model improvement by incorporating additional features or exploring alternative modeling techniques.

## Retrain

In [33]:
# from pyspark.sql import SparkSession
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
# from pyspark.ml.regression import LinearRegression
# from pyspark.ml import Pipeline
# from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
# from pyspark.sql.functions import when, col

# # Initialize Spark session
# spark = SparkSession.builder.appName("H1BLinearRegression").getOrCreate()

# # Load the dataset (here df_buckets is the one with wage bucket)
# data = df_buckets

# # Select relevant columns (make sure these columns exist in your DataFrame)
# data = data.select("Job_Group", "yrs_of_experience", "WORKSITE_STATE", "wage_bucket")

# # Define categorical columns (for this example, we use Job_Group, WORKSITE_STATE, and EMPLOYER_STATE)
# categorical_columns = ["Job_Group", "WORKSITE_STATE"]

# # Create StringIndexers for categorical columns
# indexers = [
#     StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep")
#     for col in categorical_columns
# ]

# # Create OneHotEncoders for the indexed columns
# encoders = [
#     OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_encoded")
#     for indexer in indexers
# ]

# # Assemble features: encoded categorical columns plus numeric column "yrs_of_experience"
# assembler_inputs = [col + "_index_encoded" for col in categorical_columns] + ["yrs_of_experience"]
# assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# # Define the Linear Regression model
# lr = LinearRegression(featuresCol="features", labelCol="wage_bucket")

# print("here")

# # Build the pipeline with indexers, encoders, assembler, and the regression model
# pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

# # Split the data into training and testing sets
# train, test = data.randomSplit([0.8, 0.2], seed=42)

# # Fit the pipeline model on the training data
# model = pipeline.fit(train)

# print("here")

# # Make predictions on both the training and test sets
# train_predictions = model.transform(train)
# test_predictions = model.transform(test)

# # -------------------------
# # Regression Metrics
# # -------------------------

# # Define RegressionEvaluators for RMSE, MAE, and R²
# reg_evaluator_rmse = RegressionEvaluator(labelCol="wage_bucket", predictionCol="prediction", metricName="rmse")
# reg_evaluator_mae = RegressionEvaluator(labelCol="wage_bucket", predictionCol="prediction", metricName="mae")
# reg_evaluator_r2  = RegressionEvaluator(labelCol="wage_bucket", predictionCol="prediction", metricName="r2")

# # Evaluate on training set
# print("Regression Metrics on Training Data:")
# train_rmse = reg_evaluator_rmse.evaluate(train_predictions)
# print("  RMSE:", train_rmse)
# train_mae  = reg_evaluator_mae.evaluate(train_predictions)
# print("  MAE :", train_mae)
# train_r2   = reg_evaluator_r2.evaluate(train_predictions)
# print("  R²  :", train_r2)

# # Evaluate on test set
# print("\nRegression Metrics on Test Data:")
# test_rmse = reg_evaluator_rmse.evaluate(test_predictions)
# print("  RMSE:", test_rmse)
# test_mae  = reg_evaluator_mae.evaluate(test_predictions)
# print("  MAE :", test_mae)
# test_r2   = reg_evaluator_r2.evaluate(test_predictions)
# print("  R²  :", test_r2)

# Tree Model

In [34]:
# column_names = df_cleaned.columns
# print(column_names)

## 1. Baseline

### adress non-numerical features

In [35]:
# 1. string index and one-hot encode each nominal feature
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# 1. Encode 'Job_Group'
job_indexer = StringIndexer(inputCol="Job_Group", outputCol="Job_Group_Index")
df_job_indexed = job_indexer.fit(df_cleaned).transform(df_cleaned)

job_encoder = OneHotEncoder(inputCol="Job_Group_Index", outputCol="Job_Group_Encoded")
df_job_encoded = job_encoder.fit(df_job_indexed).transform(df_job_indexed)

# 2. Encode 'EMPLOYER_STATE'
employer_indexer = StringIndexer(inputCol="EMPLOYER_STATE", outputCol="Employer_Location_Index")
df_employer_indexed = employer_indexer.fit(df_job_encoded).transform(df_job_encoded)

employer_encoder = OneHotEncoder(inputCol="Employer_Location_Index", outputCol="Employer_Location_Encoded")
df_employer_encoded = employer_encoder.fit(df_employer_indexed).transform(df_employer_indexed)

# 3. Encode 'WORKSITE_STATE'
worksite_indexer = StringIndexer(inputCol="WORKSITE_STATE", outputCol="Worksite_Location_Index")
df_worksite_indexed = worksite_indexer.fit(df_employer_encoded).transform(df_employer_encoded)

worksite_encoder = OneHotEncoder(inputCol="Worksite_Location_Index", outputCol="Worksite_Location_Encoded")
final_encoded_df = worksite_encoder.fit(df_worksite_indexed).transform(df_worksite_indexed)

# Show encoded result (optional: adjust number of rows)
final_encoded_df.select("Job_Group", "Job_Group_Encoded",
                        "EMPLOYER_STATE", "Employer_Location_Encoded",
                        "WORKSITE_STATE", "Worksite_Location_Encoded").show(10, truncate=False)


+----------------------------------------+-----------------+--------------+-------------------------+--------------+-------------------------+
|Job_Group                               |Job_Group_Encoded|EMPLOYER_STATE|Employer_Location_Encoded|WORKSITE_STATE|Worksite_Location_Encoded|
+----------------------------------------+-----------------+--------------+-------------------------+--------------+-------------------------+
|Computer Science                        |(15,[0],[1.0])   |NJ            |(56,[2],[1.0])           |PA            |(54,[9],[1.0])           |
|Computer Science                        |(15,[0],[1.0])   |NY            |(56,[4],[1.0])           |NJ            |(54,[4],[1.0])           |
|Data                                    |(15,[2],[1.0])   |NJ            |(56,[2],[1.0])           |NJ            |(54,[4],[1.0])           |
|Computer Science                        |(15,[0],[1.0])   |WA            |(56,[3],[1.0])           |CA            |(54,[0],[1.0])           |

In [36]:
# 2. bundle all the 4 input features together
from pyspark.ml.feature import VectorAssembler

# Bundle the encoded features into a single vector column
assembler = VectorAssembler(
    inputCols=["Job_Group_Encoded", "Employer_Location_Encoded", "Worksite_Location_Encoded", "yrs_of_experience"],
    outputCol="features"
)

# Apply the assembler to the final encoded DataFrame
vectorized_df = assembler.transform(final_encoded_df)

In [37]:
# 3. choose the data
final_data = vectorized_df.select("features",'WAGE_RATE_OF_PAY_FROM')
final_data.show(5)

+--------------------+---------------------+
|            features|WAGE_RATE_OF_PAY_FROM|
+--------------------+---------------------+
|(126,[0,17,80,125...|             133000.0|
|(126,[0,19,75,125...|             130000.0|
|(126,[2,17,75,125...|             112000.0|
|(126,[0,18,71,125...|             194479.0|
|(126,[0,24,71,125...|             175000.0|
+--------------------+---------------------+
only showing top 5 rows



In [38]:
# 4. split the train and test data
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [39]:
from pyspark.ml.regression import DecisionTreeRegressor, GBTRegressor, RandomForestRegressor
from pyspark.sql import SparkSession
from tqdm import tqdm
import time

# 5. train the model
def print_progress(message):
    """Helper function to print progress with a timestamp."""
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Step 1: Train Decision Tree Regressor
print_progress("Starting training: Decision Tree Regressor...")
dtr = DecisionTreeRegressor(labelCol='WAGE_RATE_OF_PAY_FROM', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Decision Tree model...")
for _ in tqdm(range(1), desc="Training Decision Tree"):
    dtr_model = dtr.fit(train_data)
print_progress("Training completed: Decision Tree Regressor.")

# Make predictions
print_progress("Predicting with Decision Tree Regressor...")
dtr_predictions = dtr_model.transform(test_data)
print_progress("Prediction completed: Decision Tree Regressor.")

# Step 2: Train Random Forest Regressor
print_progress("Starting training: Random Forest Regressor...")
rfr = RandomForestRegressor(labelCol='WAGE_RATE_OF_PAY_FROM', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Random Forest model...")
for _ in tqdm(range(1), desc="Training Random Forest"):
    rfr_model = rfr.fit(train_data)
print_progress("Training completed: Random Forest Regressor.")

# Make predictions
print_progress("Predicting with Random Forest Regressor...")
rfr_predictions = rfr_model.transform(test_data)
print_progress("Prediction completed: Random Forest Regressor.")

# Step 3: Train Gradient-Boosted Tree Regressor
print_progress("Starting training: Gradient-Boosted Tree Regressor...")
gbtr = GBTRegressor(labelCol='WAGE_RATE_OF_PAY_FROM', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Gradient-Boosted Tree model...")
for _ in tqdm(range(1), desc="Training Gradient-Boosted Tree"):
    gbtr_model = gbtr.fit(train_data)
print_progress("Training completed: Gradient-Boosted Tree Regressor.")

# Make predictions
print_progress("Predicting with Gradient-Boosted Tree Regressor...")
gbtr_predictions = gbtr_model.transform(test_data)
print_progress("Prediction completed: Gradient-Boosted Tree Regressor.")

print_progress("All models trained and predictions made successfully.")


[2025-04-12 19:20:19] Starting training: Decision Tree Regressor...
[2025-04-12 19:20:19] Fitting the Decision Tree model...


Training Decision Tree: 100%|██████████| 1/1 [01:20<00:00, 80.68s/it]


[2025-04-12 19:21:40] Training completed: Decision Tree Regressor.
[2025-04-12 19:21:40] Predicting with Decision Tree Regressor...
[2025-04-12 19:21:40] Prediction completed: Decision Tree Regressor.
[2025-04-12 19:21:40] Starting training: Random Forest Regressor...
[2025-04-12 19:21:40] Fitting the Random Forest model...


Training Random Forest: 100%|██████████| 1/1 [01:40<00:00, 100.56s/it]


[2025-04-12 19:23:21] Training completed: Random Forest Regressor.
[2025-04-12 19:23:21] Predicting with Random Forest Regressor...
[2025-04-12 19:23:21] Prediction completed: Random Forest Regressor.
[2025-04-12 19:23:21] Starting training: Gradient-Boosted Tree Regressor...
[2025-04-12 19:23:21] Fitting the Gradient-Boosted Tree model...


Training Gradient-Boosted Tree: 100%|██████████| 1/1 [07:10<00:00, 430.20s/it]

[2025-04-12 19:30:31] Training completed: Gradient-Boosted Tree Regressor.
[2025-04-12 19:30:31] Predicting with Gradient-Boosted Tree Regressor...
[2025-04-12 19:30:31] Prediction completed: Gradient-Boosted Tree Regressor.
[2025-04-12 19:30:31] All models trained and predictions made successfully.





In [40]:
# 6. Compare the different models
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the evaluator for RMSE
rmse_evaluator = RegressionEvaluator(labelCol="WAGE_RATE_OF_PAY_FROM", predictionCol="prediction", metricName="rmse")

# Evaluate the models using RMSE
dtr_rmse = rmse_evaluator.evaluate(dtr_predictions)
rfr_rmse = rmse_evaluator.evaluate(rfr_predictions)
gbtr_rmse = rmse_evaluator.evaluate(gbtr_predictions)

print("Here are the results!")
print('-'*80)
print('A single decision tree had an RMSE of: {0:2.2f}'.format(dtr_rmse))
print('-'*80)
print('A random forest ensemble had an RMSE of: {0:2.2f}'.format(rfr_rmse))
print('-'*80)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse))
print('-'*80)

Here are the results!
--------------------------------------------------------------------------------
A single decision tree had an RMSE of: 30806.11
--------------------------------------------------------------------------------
A random forest ensemble had an RMSE of: 30680.90
--------------------------------------------------------------------------------
A gradient-boosted tree ensemble had an RMSE of: 29763.33
--------------------------------------------------------------------------------


In [41]:
# Evaluate the models using RMSE
print_progress("Predicting with Decision Tree Regressor...")
dtr_predictions_train = dtr_model.transform(train_data)

print_progress("Predicting with Random Forest Regressor...")
rfr_predictions_train = rfr_model.transform(train_data)

print_progress("Predicting with Gradient-Boosted Tree Regressor...")
gbtr_predictions_train = gbtr_model.transform(train_data)

dtr_rmse_train = rmse_evaluator.evaluate(dtr_predictions_train)
print('A single decision tree had an RMSE of: {0:2.2f}'.format(dtr_rmse_train))

rfr_rmse_train = rmse_evaluator.evaluate(rfr_predictions_train)
print('A random forest ensemble had an RMSE of: {0:2.2f}'.format(rfr_rmse_train))

gbtr_rmse_train = rmse_evaluator.evaluate(gbtr_predictions_train)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_train))


[2025-04-12 19:31:42] Predicting with Decision Tree Regressor...
[2025-04-12 19:31:42] Predicting with Random Forest Regressor...
[2025-04-12 19:31:42] Predicting with Gradient-Boosted Tree Regressor...
A single decision tree had an RMSE of: 30860.31
A random forest ensemble had an RMSE of: 30739.24
A gradient-boosted tree ensemble had an RMSE of: 29803.27


In [42]:
# r2
r2_evaluator = RegressionEvaluator(labelCol="WAGE_RATE_OF_PAY_FROM", predictionCol="prediction", metricName="r2")
# Evaluate the models using r2_evaluator (test data)
dtr_rmse_r2 = r2_evaluator.evaluate(dtr_predictions)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_r2))

rfr_rmse_r2 = r2_evaluator.evaluate(rfr_predictions)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_r2))

gbtr_rmse_r2 = r2_evaluator.evaluate(gbtr_predictions)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_r2))

# Evaluate the models using r2_evaluator (train data)
dtr_rmse_r2_train = r2_evaluator.evaluate(dtr_predictions_train)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_r2_train))

rfr_rmse_r2_train = r2_evaluator.evaluate(rfr_predictions_train)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_r2_train))

gbtr_rmse_r2_train = r2_evaluator.evaluate(gbtr_predictions_train)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_r2_train))


A single decision tree had an r2 of: 0.22
A random forest ensemble had an r2 of: 0.23
A gradient-boosted tree ensemble had an RMSE of: 0.28
A single decision tree had an r2 of: 0.22
A random forest ensemble had an r2 of: 0.23
A gradient-boosted tree ensemble had an RMSE of: 0.28


In [43]:
# MAE
mae_evaluator = RegressionEvaluator(labelCol="WAGE_RATE_OF_PAY_FROM", predictionCol="prediction", metricName="mae")

# Evaluate the models using r2_evaluator (test data)
dtr_rmse_mae = mae_evaluator.evaluate(dtr_predictions)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_mae))

rfr_rmse_mae = mae_evaluator.evaluate(rfr_predictions)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_mae))

gbtr_rmse_mae = mae_evaluator.evaluate(gbtr_predictions)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_mae))

# Evaluate the models using r2_evaluator (train data)
dtr_rmse_train_mae = mae_evaluator.evaluate(dtr_predictions_train)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_train_mae))

rfr_rmse_train_mae = mae_evaluator.evaluate(rfr_predictions_train)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_train_mae))

gbtr_rmse_train_mae = mae_evaluator.evaluate(gbtr_predictions_train)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_train_mae))

A single decision tree had an r2 of: 24568.15
A random forest ensemble had an r2 of: 24579.61
A gradient-boosted tree ensemble had an RMSE of: 23512.30
A single decision tree had an r2 of: 24599.37
A random forest ensemble had an r2 of: 24613.53
A gradient-boosted tree ensemble had an RMSE of: 23537.02


### Interpretation:

- Decision Tree Regressor

The difference between training and testing metrics is small, indicating that the Decision Tree model is not overfitting.
Both R² values are quite low (0.22), indicating that the model is not capturing much of the variance, suggesting underfitting.

Similar RMSE and MAE values in both training and testing imply that the model is consistently performing poorly on both datasets.

- Random Forest Regressor

The training and testing metrics are very similar, indicating that the Random Forest model generalizes well.

The low R² (0.22) shows that the model is not explaining much variance, indicating potential underfitting.

Despite being more robust than a single Decision Tree, the model may not be complex enough to capture patterns in the data.

- Gradient-boosted Tree Regressor

The Gradient-Boosted model has the lowest RMSE and MAE among the three models, indicating the best performance.

The difference between training and testing metrics is still small, which means the model is not overfitting.

The slightly higher R² value (0.26) indicates a marginal improvement over the other models, but it still struggles to capture variance, indicating some level of underfitting.


## 2. Retrain (Use detrended salary, do regression)

In [44]:
# column_names = df_detrended.columns
# print(column_names)

In [45]:
# 1. string index and one-hot encode each nominal feature
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# 1. Encode Job_Group
job_indexer = StringIndexer(inputCol="Job_Group", outputCol="Job_Group_Index")
df_indexed_job = job_indexer.fit(df_detrended).transform(df_detrended)

job_encoder = OneHotEncoder(inputCol="Job_Group_Index", outputCol="Job_Group_Encoded")
df_encoded_job = job_encoder.fit(df_indexed_job).transform(df_indexed_job)

df_encoded_job.select("Job_Group", "Job_Group_Index", "Job_Group_Encoded").show(10)

# 2. Encode EMPLOYER_STATE
print("Encoding EMPLOYER_STATE...")
employer_indexer = StringIndexer(inputCol="EMPLOYER_STATE", outputCol="Employer_Location_Index")
df_indexed_employer = employer_indexer.fit(df_encoded_job).transform(df_encoded_job)

employer_encoder = OneHotEncoder(inputCol="Employer_Location_Index", outputCol="Employer_Location_Encoded")
df_encoded_employer = employer_encoder.fit(df_indexed_employer).transform(df_indexed_employer)

# 3. Encode WORKSITE_STATE
print("Encoding WORKSITE_STATE...")
worksite_indexer = StringIndexer(inputCol="WORKSITE_STATE", outputCol="Worksite_Location_Index")
df_indexed_worksite = worksite_indexer.fit(df_encoded_employer).transform(df_encoded_employer)

worksite_encoder = OneHotEncoder(inputCol="Worksite_Location_Index", outputCol="Worksite_Location_Encoded")
final_encoded_df = worksite_encoder.fit(df_indexed_worksite).transform(df_indexed_worksite)

# Show the result
final_encoded_df.select(
    "Job_Group", "Job_Group_Encoded",
    "EMPLOYER_STATE", "Employer_Location_Encoded",
    "WORKSITE_STATE", "Worksite_Location_Encoded"
).show(truncate=False)

+--------------------+---------------+-----------------+
|           Job_Group|Job_Group_Index|Job_Group_Encoded|
+--------------------+---------------+-----------------+
|    Computer Science|            0.0|   (15,[0],[1.0])|
|    Computer Science|            0.0|   (15,[0],[1.0])|
|                Data|            2.0|   (15,[2],[1.0])|
|    Computer Science|            0.0|   (15,[0],[1.0])|
|    Computer Science|            0.0|   (15,[0],[1.0])|
|    Computer Science|            0.0|   (15,[0],[1.0])|
|    Computer Science|            0.0|   (15,[0],[1.0])|
|Production, Const...|            3.0|   (15,[3],[1.0])|
|    Computer Science|            0.0|   (15,[0],[1.0])|
|    Computer Science|            0.0|   (15,[0],[1.0])|
+--------------------+---------------+-----------------+
only showing top 10 rows

Encoding EMPLOYER_STATE...
Encoding WORKSITE_STATE...
+----------------------------------------+-----------------+--------------+-------------------------+--------------+------

In [46]:
# 2. bundle all the 4 input features together
from pyspark.ml.feature import VectorAssembler

# Bundle the encoded features into a single vector column
assembler = VectorAssembler(
    inputCols=["Job_Group_Encoded", "Employer_Location_Encoded", "Worksite_Location_Encoded", "yrs_of_experience"],
    outputCol="features"
)

# Apply the assembler to the final encoded DataFrame
vectorized_df = assembler.transform(final_encoded_df)

In [47]:
# 3. choose the data
final_data_detrended = vectorized_df.select("features",'WAGE_DETRENDED')
final_data_detrended.show(5)

+--------------------+------------------+
|            features|    WAGE_DETRENDED|
+--------------------+------------------+
|(126,[0,17,80,125...|119649.47147181997|
|(126,[0,19,75,125...|116950.61121305711|
|(126,[2,17,75,125...| 96262.31232656725|
|(126,[0,18,71,125...| 174957.2147546472|
|(126,[0,24,71,125...|157433.51509449995|
+--------------------+------------------+
only showing top 5 rows



In [48]:
# 4. split the train and test data
train_data,test_data = final_data_detrended.randomSplit([0.7,0.3])

In [49]:
from pyspark.ml.regression import DecisionTreeRegressor, GBTRegressor, RandomForestRegressor
from pyspark.sql import SparkSession
from tqdm import tqdm
import time

# 5. train the model
def print_progress(message):
    """Helper function to print progress with a timestamp."""
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Step 1: Train Decision Tree Regressor
print_progress("Starting training: Decision Tree Regressor...")
dtr = DecisionTreeRegressor(labelCol='WAGE_DETRENDED', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Decision Tree model...")
for _ in tqdm(range(1), desc="Training Decision Tree"):
    dtr_model = dtr.fit(train_data)
print_progress("Training completed: Decision Tree Regressor.")

# Make predictions
print_progress("Predicting with Decision Tree Regressor...")
dtr_predictions = dtr_model.transform(test_data)
print_progress("Prediction completed: Decision Tree Regressor.")

# Step 2: Train Random Forest Regressor
print_progress("Starting training: Random Forest Regressor...")
rfr = RandomForestRegressor(labelCol='WAGE_DETRENDED', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Random Forest model...")
for _ in tqdm(range(1), desc="Training Random Forest"):
    rfr_model = rfr.fit(train_data)
print_progress("Training completed: Random Forest Regressor.")

# Make predictions
print_progress("Predicting with Random Forest Regressor...")
rfr_predictions = rfr_model.transform(test_data)
print_progress("Prediction completed: Random Forest Regressor.")

# Step 3: Train Gradient-Boosted Tree Regressor
print_progress("Starting training: Gradient-Boosted Tree Regressor...")
gbtr = GBTRegressor(labelCol='WAGE_DETRENDED', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Gradient-Boosted Tree model...")
for _ in tqdm(range(1), desc="Training Gradient-Boosted Tree"):
    gbtr_model = gbtr.fit(train_data)
print_progress("Training completed: Gradient-Boosted Tree Regressor.")

# Make predictions
print_progress("Predicting with Gradient-Boosted Tree Regressor...")
gbtr_predictions = gbtr_model.transform(test_data)
print_progress("Prediction completed: Gradient-Boosted Tree Regressor.")

print_progress("All models trained and predictions made successfully.")


[2025-04-12 19:41:38] Starting training: Decision Tree Regressor...
[2025-04-12 19:41:38] Fitting the Decision Tree model...


Training Decision Tree: 100%|██████████| 1/1 [01:59<00:00, 119.43s/it]


[2025-04-12 19:43:37] Training completed: Decision Tree Regressor.
[2025-04-12 19:43:37] Predicting with Decision Tree Regressor...
[2025-04-12 19:43:37] Prediction completed: Decision Tree Regressor.
[2025-04-12 19:43:37] Starting training: Random Forest Regressor...
[2025-04-12 19:43:37] Fitting the Random Forest model...


Training Random Forest: 100%|██████████| 1/1 [02:17<00:00, 137.92s/it]


[2025-04-12 19:45:55] Training completed: Random Forest Regressor.
[2025-04-12 19:45:55] Predicting with Random Forest Regressor...
[2025-04-12 19:45:55] Prediction completed: Random Forest Regressor.
[2025-04-12 19:45:55] Starting training: Gradient-Boosted Tree Regressor...
[2025-04-12 19:45:55] Fitting the Gradient-Boosted Tree model...


Training Gradient-Boosted Tree: 100%|██████████| 1/1 [09:00<00:00, 540.89s/it]

[2025-04-12 19:54:56] Training completed: Gradient-Boosted Tree Regressor.
[2025-04-12 19:54:56] Predicting with Gradient-Boosted Tree Regressor...
[2025-04-12 19:54:56] Prediction completed: Gradient-Boosted Tree Regressor.
[2025-04-12 19:54:56] All models trained and predictions made successfully.





In [50]:
# 6. Compare the different models
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize the evaluator for RMSE
rmse_evaluator = RegressionEvaluator(labelCol="WAGE_DETRENDED", predictionCol="prediction", metricName="rmse")

# Evaluate the models using RMSE
dtr_rmse = rmse_evaluator.evaluate(dtr_predictions)
rfr_rmse = rmse_evaluator.evaluate(rfr_predictions)
gbtr_rmse = rmse_evaluator.evaluate(gbtr_predictions)

print("Here are the results!")
print('-'*80)
print('A single decision tree had an RMSE of: {0:2.2f}'.format(dtr_rmse))
print('-'*80)
print('A random forest ensemble had an RMSE of: {0:2.2f}'.format(rfr_rmse))
print('-'*80)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse))
print('-'*80)

Here are the results!
--------------------------------------------------------------------------------
A single decision tree had an RMSE of: 28604.10
--------------------------------------------------------------------------------
A random forest ensemble had an RMSE of: 28468.69
--------------------------------------------------------------------------------
A gradient-boosted tree ensemble had an RMSE of: 27611.71
--------------------------------------------------------------------------------


In [51]:
# Evaluate the models using RMSE-continue
print_progress("Predicting with Decision Tree Regressor...")
dtr_predictions_train = dtr_model.transform(train_data)

print_progress("Predicting with Random Forest Regressor...")
rfr_predictions_train = rfr_model.transform(train_data)

print_progress("Predicting with Gradient-Boosted Tree Regressor...")
gbtr_predictions_train = gbtr_model.transform(train_data)

dtr_rmse_train = rmse_evaluator.evaluate(dtr_predictions_train)
print('A single decision tree had an RMSE of: {0:2.2f}'.format(dtr_rmse_train))

rfr_rmse_train = rmse_evaluator.evaluate(rfr_predictions_train)
print('A random forest ensemble had an RMSE of: {0:2.2f}'.format(rfr_rmse_train))

gbtr_rmse_train = rmse_evaluator.evaluate(gbtr_predictions_train)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_train))


[2025-04-12 19:57:50] Predicting with Decision Tree Regressor...
[2025-04-12 19:57:51] Predicting with Random Forest Regressor...
[2025-04-12 19:57:51] Predicting with Gradient-Boosted Tree Regressor...
A single decision tree had an RMSE of: 28571.50
A random forest ensemble had an RMSE of: 28434.47
A gradient-boosted tree ensemble had an RMSE of: 27576.20


In [52]:
# r2
r2_evaluator = RegressionEvaluator(labelCol="WAGE_DETRENDED", predictionCol="prediction", metricName="r2")
# Evaluate the models using r2_evaluator (test data)
dtr_rmse_r2 = r2_evaluator.evaluate(dtr_predictions)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_r2))

rfr_rmse_r2 = r2_evaluator.evaluate(rfr_predictions)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_r2))

gbtr_rmse_r2 = r2_evaluator.evaluate(gbtr_predictions)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_r2))

# Evaluate the models using r2_evaluator (train data)
dtr_rmse_r2_train = r2_evaluator.evaluate(dtr_predictions_train)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_r2_train))

rfr_rmse_r2_train = r2_evaluator.evaluate(rfr_predictions_train)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_r2_train))

gbtr_rmse_r2_train = r2_evaluator.evaluate(gbtr_predictions_train)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_r2_train))


A single decision tree had an r2 of: 0.22
A random forest ensemble had an r2 of: 0.23
A gradient-boosted tree ensemble had an RMSE of: 0.27
A single decision tree had an r2 of: 0.22
A random forest ensemble had an r2 of: 0.23
A gradient-boosted tree ensemble had an RMSE of: 0.28


In [53]:
# MAE
mae_evaluator = RegressionEvaluator(labelCol="WAGE_DETRENDED", predictionCol="prediction", metricName="mae")

# Evaluate the models using r2_evaluator (test data)
dtr_rmse_mae = mae_evaluator.evaluate(dtr_predictions)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_mae))

rfr_rmse_mae = mae_evaluator.evaluate(rfr_predictions)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_mae))

gbtr_rmse_mae = mae_evaluator.evaluate(gbtr_predictions)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_mae))

# Evaluate the models using r2_evaluator (train data)
dtr_rmse_train_mae = mae_evaluator.evaluate(dtr_predictions_train)
print('A single decision tree had an r2 of: {0:2.2f}'.format(dtr_rmse_train_mae))

rfr_rmse_train_mae = mae_evaluator.evaluate(rfr_predictions_train)
print('A random forest ensemble had an r2 of: {0:2.2f}'.format(rfr_rmse_train_mae))

gbtr_rmse_train_mae = mae_evaluator.evaluate(gbtr_predictions_train)
print('A gradient-boosted tree ensemble had an RMSE of: {0:2.2f}'.format(gbtr_rmse_train_mae))

A single decision tree had an r2 of: 22677.41
A random forest ensemble had an r2 of: 22649.50
A gradient-boosted tree ensemble had an RMSE of: 21712.26
A single decision tree had an r2 of: 22656.87
A random forest ensemble had an r2 of: 22623.68
A gradient-boosted tree ensemble had an RMSE of: 21692.12


In [58]:
df_detrended.select('yrs_of_experience').show()

+-----------------+
|yrs_of_experience|
+-----------------+
|              3.0|
|              3.0|
|              3.0|
|             2.99|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
|              3.0|
+-----------------+
only showing top 20 rows



In [59]:
# 7.1 Finetune decision tree (maxDepth)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
import time

# Print progress function
def print_progress(message):
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Initialize Decision Tree Regressor
dtr = DecisionTreeRegressor(labelCol='WAGE_DETRENDED', featuresCol='features')

# Build parameter grid with only two maxDepth values
paramGrid_dtr = (ParamGridBuilder()
                 .addGrid(dtr.maxDepth, [5, 10])
                 .build())

# Regression evaluator using RMSE
reg_evaluator = RegressionEvaluator(
    labelCol="WAGE_DETRENDED",
    predictionCol="prediction",
    metricName="rmse"
)

# Set up cross-validation
cv_dtr = CrossValidator(
    estimator=dtr,
    estimatorParamMaps=paramGrid_dtr,
    evaluator=reg_evaluator,
    numFolds=3,
    parallelism=2
)

# Train with cross-validation
print_progress("Starting Decision Tree Regressor training with cross-validation (maxDepth = 5, 10)...")
cv_dtr_model = cv_dtr.fit(train_data)
print_progress("Training completed with cross-validation.")

# Get best model and its parameters
best_dtr_model = cv_dtr_model.bestModel
best_max_depth = best_dtr_model.getOrDefault("maxDepth")

# Make predictions and evaluate RMSE on test set
dtr_predictions = best_dtr_model.transform(test_data)
rmse = reg_evaluator.evaluate(dtr_predictions)

# Print results
print_progress(f"Best Decision Tree Regressor Parameter: maxDepth = {best_max_depth}")
print_progress(f"Test RMSE of best model: {rmse:.4f}")

# Make predictions on the training dataset using the best model
dtr_train_predictions = best_dtr_model.transform(train_data)

# Evaluate RMSE on the training dataset
train_rmse = reg_evaluator.evaluate(dtr_train_predictions)

# Print training RMSE
print_progress(f"Training RMSE of best model: {train_rmse:.4f}")


[2025-04-12 20:29:21] Starting Decision Tree Regressor training with cross-validation (maxDepth = 5, 10)...
[2025-04-12 20:39:48] Training completed with cross-validation.
[2025-04-12 20:40:46] Best Decision Tree Regressor Parameter: maxDepth = 10
[2025-04-12 20:40:46] Test RMSE of best model: 27806.1708
[2025-04-12 20:41:44] Training RMSE of best model: 27767.1092


In [60]:
# 7.2 Finetune random forest
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
import time

# Print progress function
def print_progress(message):
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Initialize Random Forest Regressor
rfr = RandomForestRegressor(labelCol='WAGE_DETRENDED', featuresCol='features')

# Build parameter grid (e.g., maxDepth and numTrees)
paramGrid_rfr = (ParamGridBuilder()
                 .addGrid(rfr.maxDepth, [5, 10])
                 .addGrid(rfr.numTrees, [20, 50])
                 .build())

# Regression evaluator using RMSE
reg_evaluator = RegressionEvaluator(
    labelCol="WAGE_DETRENDED",
    predictionCol="prediction",
    metricName="rmse"
)

# Set up cross-validation
cv_rfr = CrossValidator(
    estimator=rfr,
    estimatorParamMaps=paramGrid_rfr,
    evaluator=reg_evaluator,
    numFolds=3,
    parallelism=2
)

# Train with cross-validation
print_progress("Starting Random Forest Regressor training with cross-validation...")
cv_rfr_model = cv_rfr.fit(train_data)
print_progress("Training completed with cross-validation.")

# Get best model and parameters
best_rfr_model = cv_rfr_model.bestModel
best_max_depth = best_rfr_model.getOrDefault("maxDepth")
best_num_trees = best_rfr_model.getOrDefault("numTrees")

# Evaluate on test data
rfr_predictions = best_rfr_model.transform(test_data)
test_rmse = reg_evaluator.evaluate(rfr_predictions)
print_progress(f"Best Random Forest Parameters: maxDepth = {best_max_depth}, numTrees = {best_num_trees}")
print_progress(f"Test RMSE of best Random Forest model: {test_rmse:.4f}")

# Evaluate on training data
rfr_train_predictions = best_rfr_model.transform(train_data)
train_rmse = reg_evaluator.evaluate(rfr_train_predictions)
print_progress(f"Training RMSE of best Random Forest model: {train_rmse:.4f}")


[2025-04-12 20:53:32] Starting Random Forest Regressor training with cross-validation...
[2025-04-12 21:17:36] Training completed with cross-validation.
[2025-04-12 21:18:38] Best Random Forest Parameters: maxDepth = 10, numTrees = 50
[2025-04-12 21:18:38] Test RMSE of best Random Forest model: 27717.0163
[2025-04-12 21:19:45] Training RMSE of best Random Forest model: 27677.8554


In [61]:
# 7.3 Finetune Gradient-Boosted Tree Regressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
import time

# Print progress function
def print_progress(message):
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Initialize GBT Regressor
gbt = GBTRegressor(labelCol='WAGE_DETRENDED', featuresCol='features')

# Build parameter grid (e.g., maxDepth and maxIter)
paramGrid_gbt = (ParamGridBuilder()
                 .addGrid(gbt.maxDepth, [5, 10])
                 .addGrid(gbt.maxIter, [20, 50])  # maxIter = number of boosting rounds
                 .build())

# Regression evaluator using RMSE
reg_evaluator = RegressionEvaluator(
    labelCol="WAGE_DETRENDED",
    predictionCol="prediction",
    metricName="rmse"
)

# Set up cross-validation
cv_gbt = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=paramGrid_gbt,
    evaluator=reg_evaluator,
    numFolds=3,
    parallelism=2
)

# Train with cross-validation
print_progress("Starting GBT Regressor training with cross-validation...")
cv_gbt_model = cv_gbt.fit(train_data)
print_progress("Training completed with cross-validation.")

# Get best model and parameters
best_gbt_model = cv_gbt_model.bestModel
best_max_depth = best_gbt_model.getOrDefault("maxDepth")
best_max_iter = best_gbt_model.getOrDefault("maxIter")

# Evaluate on test data
gbt_predictions = best_gbt_model.transform(test_data)
test_rmse = reg_evaluator.evaluate(gbt_predictions)
print_progress(f"Best GBT Parameters: maxDepth = {best_max_depth}, maxIter = {best_max_iter}")
print_progress(f"Test RMSE of best GBT model: {test_rmse:.4f}")

# Evaluate on training data
gbt_train_predictions = best_gbt_model.transform(train_data)
train_rmse = reg_evaluator.evaluate(gbt_train_predictions)
print_progress(f"Training RMSE of best GBT model: {train_rmse:.4f}")

[2025-04-12 21:20:39] Starting GBT Regressor training with cross-validation...
[2025-04-13 01:04:55] Training completed with cross-validation.
[2025-04-13 01:05:59] Best GBT Parameters: maxDepth = 10, maxIter = 50
[2025-04-13 01:05:59] Test RMSE of best GBT model: 27041.1945
[2025-04-13 01:07:10] Training RMSE of best GBT model: 26960.2027


## 3. Retrain (Classification)
We train the tree model again after we change output from continous variable (wage) to catogory (wage bucket).

In [62]:
column_names = df_buckets.columns
print(column_names)

['Job_Group', 'YEAR', 'CASE_NUMBER', 'CASE_STATUS', 'RECEIVED_DATE', 'DECISION_DATE', 'VISA_CLASS', 'JOB_TITLE', 'SOC_CODE', 'SOC_TITLE', 'FULL_TIME_POSITION', 'BEGIN_DATE', 'END_DATE', 'TOTAL_WORKER_POSITIONS', 'NEW_EMPLOYMENT', 'CONTINUED_EMPLOYMENT', 'CHANGE_PREVIOUS_EMPLOYMENT', 'NEW_CONCURRENT_EMPLOYMENT', 'CHANGE_EMPLOYER', 'AMENDED_PETITION', 'EMPLOYER_NAME', 'EMPLOYER_ADDRESS1', 'EMPLOYER_CITY', 'EMPLOYER_STATE', 'EMPLOYER_POSTAL_CODE', 'EMPLOYER_COUNTRY', 'EMPLOYER_PHONE', 'NAICS_CODE', 'EMPLOYER_POC_LAST_NAME', 'EMPLOYER_POC_FIRST_NAME', 'EMPLOYER_POC_JOB_TITLE', 'EMPLOYER_POC_ADDRESS1', 'EMPLOYER_POC_CITY', 'EMPLOYER_POC_STATE', 'EMPLOYER_POC_POSTAL_CODE', 'EMPLOYER_POC_COUNTRY', 'EMPLOYER_POC_PHONE', 'EMPLOYER_POC_EMAIL', 'AGENT_REPRESENTING_EMPLOYER', 'AGENT_ATTORNEY_LAST_NAME', 'AGENT_ATTORNEY_FIRST_NAME', 'AGENT_ATTORNEY_ADDRESS1', 'AGENT_ATTORNEY_ADDRESS2', 'AGENT_ATTORNEY_CITY', 'AGENT_ATTORNEY_STATE', 'AGENT_ATTORNEY_POSTAL_CODE', 'AGENT_ATTORNEY_COUNTRY', 'AGENT_ATTO

In [64]:
# 1. string index and one-hot encode each nominal feature (same as before)
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# 1. Job Group
indexer = StringIndexer(inputCol="Job_Group", outputCol="Job_Group_Index")
indexed_df = indexer.fit(df_buckets).transform(df_buckets)

encoder = OneHotEncoder(inputCols=["Job_Group_Index"], outputCols=["Job_Group_Encoded"])
encoded_df = encoder.fit(indexed_df).transform(indexed_df)

# 2. Employer Location
employer_indexer = StringIndexer(inputCol="EMPLOYER_STATE", outputCol="Employer_Location_Index")
employer_indexed_df = employer_indexer.fit(encoded_df).transform(encoded_df)

employer_encoder = OneHotEncoder(inputCols=["Employer_Location_Index"], outputCols=["Employer_Location_Encoded"])
employer_encoded_df = employer_encoder.fit(employer_indexed_df).transform(employer_indexed_df)

# 3. Worksite Location
worksite_indexer = StringIndexer(inputCol="WORKSITE_STATE", outputCol="Worksite_Location_Index")
worksite_indexed_df = worksite_indexer.fit(employer_encoded_df).transform(employer_encoded_df)

worksite_encoder = OneHotEncoder(inputCols=["Worksite_Location_Index"], outputCols=["Worksite_Location_Encoded"])
final_encoded_df = worksite_encoder.fit(worksite_indexed_df).transform(worksite_indexed_df)

print("here")


here


In [65]:
# 2. bundle all the 4 input features together
from pyspark.ml.feature import VectorAssembler

# Bundle the encoded features into a single vector column
assembler = VectorAssembler(
    inputCols=["Job_Group_Encoded", "Employer_Location_Encoded", "Worksite_Location_Encoded", "yrs_of_experience"],
    outputCol="features"
)

# Apply the assembler to the final encoded DataFrame
vectorized_df = assembler.transform(final_encoded_df)
print("here")

here


In [66]:
# 3. string indexer output feature (we do not need to do one hot encoder)
bucket_indexer = StringIndexer(inputCol="wage_bucket", outputCol="wage_bucket_index")
vectorized_df = bucket_indexer.fit(vectorized_df).transform(vectorized_df)
vectorized_df.select("wage_bucket", "wage_bucket_index").show(10)
print("here")

+----------------+-----------------+
|     wage_bucket|wage_bucket_index|
+----------------+-----------------+
|(120000, 140000)|              3.0|
|(120000, 140000)|              3.0|
|(100000, 120000)|              1.0|
|(180000, 200000)|              6.0|
|(160000, 180000)|              5.0|
|(120000, 140000)|              3.0|
|(160000, 180000)|              5.0|
|(120000, 140000)|              3.0|
|(100000, 120000)|              1.0|
|(100000, 120000)|              1.0|
+----------------+-----------------+
only showing top 10 rows

here


In [67]:
# 4. choose the data
final_data = vectorized_df.select("features",'wage_bucket_index')
final_data.show(5)

+--------------------+-----------------+
|            features|wage_bucket_index|
+--------------------+-----------------+
|(126,[0,17,80,125...|              3.0|
|(126,[0,19,75,125...|              3.0|
|(126,[2,17,75,125...|              1.0|
|(126,[0,18,71,125...|              6.0|
|(126,[0,24,71,125...|              5.0|
+--------------------+-----------------+
only showing top 5 rows



In [68]:
# 5. split the train and test data
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [69]:
# 6. train decision tree classifier and random forest classifier
from pyspark.ml.classification import DecisionTreeClassifier,  RandomForestClassifier
from pyspark.sql import SparkSession
from tqdm import tqdm
import time

# Helper function to print progress with a timestamp
def print_progress(message):
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Step 1: Train Decision Tree Classifier
print_progress("Starting training: Decision Tree Classifier...")
dtr = DecisionTreeClassifier(labelCol='wage_bucket_index', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Decision Tree model...")
for _ in tqdm(range(1), desc="Training Decision Tree"):
    dtr_model = dtr.fit(train_data)
print_progress("Training completed: Decision Tree Classifier.")

# Make predictions
print_progress("Predicting with Decision Tree Classifier...")
dtr_predictions = dtr_model.transform(test_data)
print_progress("Prediction completed: Decision Tree Classifier.")

# Step 2: Train Random Forest Classifier
print_progress("Starting training: Random Forest Classifier...")
rfc = RandomForestClassifier(labelCol='wage_bucket_index', featuresCol='features')

# Progress bar for training
print_progress("Fitting the Random Forest model...")
for _ in tqdm(range(1), desc="Training Random Forest"):
    rfc_model = rfc.fit(train_data)
print_progress("Training completed: Random Forest Classifier.")

# Make predictions
print_progress("Predicting with Random Forest Classifier...")
rfc_predictions = rfc_model.transform(test_data)
print_progress("Prediction completed: Random Forest Classifier.")


[2025-04-13 03:26:26] Starting training: Decision Tree Classifier...
[2025-04-13 03:26:26] Fitting the Decision Tree model...


Training Decision Tree: 100%|██████████| 1/1 [02:57<00:00, 177.24s/it]


[2025-04-13 03:29:23] Training completed: Decision Tree Classifier.
[2025-04-13 03:29:23] Predicting with Decision Tree Classifier...
[2025-04-13 03:29:24] Prediction completed: Decision Tree Classifier.
[2025-04-13 03:29:24] Starting training: Random Forest Classifier...
[2025-04-13 03:29:24] Fitting the Random Forest model...


Training Random Forest: 100%|██████████| 1/1 [04:10<00:00, 250.53s/it]


[2025-04-13 03:33:34] Training completed: Random Forest Classifier.
[2025-04-13 03:33:34] Predicting with Random Forest Classifier...
[2025-04-13 03:33:35] Prediction completed: Random Forest Classifier.


In [70]:
# 6. Compare the different models
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the evaluator for accuracy
acc_evaluator = MulticlassClassificationEvaluator(labelCol="wage_bucket_index", predictionCol="prediction", metricName="accuracy")

# Evaluate the models using accuracy on training data
dtr_train_acc = acc_evaluator.evaluate(dtr_model.transform(train_data))
print('Decision Tree Classifier - Training Accuracy: {0:2.2f}'.format(dtr_train_acc))
rfc_train_acc = acc_evaluator.evaluate(rfc_model.transform(train_data))
print('Random Forest Classifier - Training Accuracy: {0:2.2f}'.format(rfc_train_acc))


Decision Tree Classifier - Training Accuracy: 0.29
Random Forest Classifier - Training Accuracy: 0.28


In [71]:
# Initialize the evaluator for accuracy
acc_evaluator = MulticlassClassificationEvaluator(labelCol="wage_bucket_index", predictionCol="prediction", metricName="accuracy")

# Evaluate the models using accuracy on testing data
dtr_test_acc = acc_evaluator.evaluate(dtr_predictions)
print('Decision Tree Classifier - Testing Accuracy: {0:2.2f}'.format(dtr_test_acc))
rfc_test_acc = acc_evaluator.evaluate(rfc_predictions)
print('Random Forest Classifier - Testing Accuracy: {0:2.2f}'.format(rfc_test_acc))

Decision Tree Classifier - Testing Accuracy: 0.29
Random Forest Classifier - Testing Accuracy: 0.28


In [73]:
# 7.1 finetune the parameters- decision tree
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
import time

# Utility function to print progress with timestamps
def print_progress(message):
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Initialize Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol='wage_bucket_index', featuresCol='features')

# Create parameter grid for tuning
paramGrid_dt = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [5, 10])
                .build())

# Set up evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="wage_bucket_index",
    predictionCol="prediction",
    metricName="accuracy"
)

# Set up cross-validation
cv_dt = CrossValidator(
    estimator=dt,
    estimatorParamMaps=paramGrid_dt,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)

# Train the model with cross-validation
print_progress("Starting Decision Tree training with cross-validation...")
dt_model = cv_dt.fit(train_data)
print_progress("Training completed.")

# Extract the best model and its parameters
best_model = dt_model.bestModel
best_max_depth = best_model.getMaxDepth()
print_progress(f"Best maxDepth: {best_max_depth}")

# Evaluate on training data
train_predictions = best_model.transform(train_data)
train_accuracy = evaluator.evaluate(train_predictions)
print_progress(f"Training Accuracy: {train_accuracy:.4f}")

# Evaluate on test data
test_predictions = best_model.transform(test_data)
test_accuracy = evaluator.evaluate(test_predictions)
print_progress(f"Test Accuracy: {test_accuracy:.4f}")

# Cross-validation best accuracy
best_index = dt_model.avgMetrics.index(max(dt_model.avgMetrics))
best_cv_accuracy = dt_model.avgMetrics[best_index]
print_progress(f"Best Cross-Validation Accuracy: {best_cv_accuracy:.4f}")


[2025-04-13 03:44:23] Starting Decision Tree training with cross-validation...
[2025-04-13 03:57:24] Training completed.
[2025-04-13 03:57:24] Best maxDepth: 10
[2025-04-13 03:58:24] Training Accuracy: 0.3119
[2025-04-13 03:59:24] Test Accuracy: 0.3110
[2025-04-13 03:59:24] Best Cross-Validation Accuracy: 0.3122


In [74]:
# 7.2 finetune the parameters- random forest
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

# Function to print progress
def print_progress(message):
    print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}")

# Initialize Random Forest Classifier
rf = RandomForestClassifier(labelCol='wage_bucket_index', featuresCol='features')

# Create parameter grid
paramGrid_rf = (ParamGridBuilder()
                .addGrid(rf.numTrees, [20, 50])
                .addGrid(rf.maxDepth, [5, 10])
                .build())

# Define evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="wage_bucket_index",
    predictionCol="prediction",
    metricName="accuracy"
)

# Set up cross-validation
cv_rf = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid_rf,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)

# Fit model using training data
print_progress("Starting Random Forest training with cross-validation...")
rf_model = cv_rf.fit(train_data)
print_progress("Random Forest training completed.")

# Get best model and parameters
best_rf_model = rf_model.bestModel
print_progress(f"Best maxDepth: {best_rf_model.getMaxDepth()}")
print_progress(f"Best numTrees: {best_rf_model.getNumTrees}")

# Evaluate on training data
train_predictions = best_rf_model.transform(train_data)
train_accuracy = evaluator.evaluate(train_predictions)
print_progress(f"Training Accuracy: {train_accuracy:.4f}")

# Evaluate on test data
test_predictions = best_rf_model.transform(test_data)
test_accuracy = evaluator.evaluate(test_predictions)
print_progress(f"Test Accuracy: {test_accuracy:.4f}")

# Cross-validation best accuracy
best_index = rf_model.avgMetrics.index(max(rf_model.avgMetrics))
best_cv_accuracy = rf_model.avgMetrics[best_index]
print_progress(f"Best Cross-Validation Accuracy: {best_cv_accuracy:.4f}")


[2025-04-13 04:42:10] Starting Random Forest training with cross-validation...
[2025-04-13 05:25:38] Random Forest training completed.
[2025-04-13 05:25:38] Best maxDepth: 10
[2025-04-13 05:25:38] Best numTrees: 50
[2025-04-13 05:26:53] Training Accuracy: 0.3023
[2025-04-13 05:28:00] Test Accuracy: 0.3020
[2025-04-13 05:28:00] Best Cross-Validation Accuracy: 0.3006
