In [94]:
!pip install pandas

[0m

In [95]:
import pandas as pd

df = pd.read_csv("/data/AI_Impact_on_Jobs_2030.csv")
df.head()
df.dtypes
df.shape


(3000, 18)

# STEP 2 — Manual data split (PROJECT REQUIREMENT)

In [96]:
from pathlib import Path

df = df.sample(frac=1.0, random_state=42).reset_index(drop=True)

Path("/data/split").mkdir(exist_ok=True)

n = len(df)
a, b = n//3, 2*n//3

df.iloc[:a].to_csv("/data/split/jobs_master.csv", index=False)
df.iloc[a:b].to_csv("/data/split/jobs_worker1.csv", index=False)
df.iloc[b:].to_csv("/data/split/jobs_worker2.csv", index=False)

!ls /data/split


jobs_master.csv  jobs_worker1.csv  jobs_worker2.csv


# STEP 3 — Start Spark (cluster mode)

In [97]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("AI_Jobs_Final_Project")
    .master("spark://bd-spark-master:7077")
    .config("spark.executor.memory", "1g")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)


# STEP 4 — Load split data into Spark (distributed)

In [None]:
m  = spark.read.csv("/data/split/jobs_master.csv", header=True, inferSchema=True)
w1 = spark.read.csv("/data/split/jobs_worker1.csv", header=True, inferSchema=True)
w2 = spark.read.csv("/data/split/jobs_worker2.csv", header=True, inferSchema=True)

df_all = m.unionByName(w1).unionByName(w2)
df_all.count()


# STEP 5 — Spark SQL exploration + repartition

In [None]:
df_all.createOrReplaceTempView("jobs")

spark.sql("""
SELECT AI_Exposure_Index, COUNT(*) AS cnt
FROM jobs
GROUP BY AI_Exposure_Index
""").show()


df_all = df_all.repartition(8, "AI_Exposure_Index").cache()
df_all.count()



# STEP 6 — Spark MLlib pipeline (PART 1: Global model)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer,
    OneHotEncoder,
    VectorAssembler
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [None]:
label_indexer = StringIndexer(
    inputCol="Risk_Category",
    outputCol="label",
    handleInvalid="keep"
)

education_indexer = StringIndexer(
    inputCol="Education_Level",
    outputCol="education_idx",
    handleInvalid="keep"
)

job_indexer = StringIndexer(
    inputCol="Job_Title",
    outputCol="job_idx",
    handleInvalid="keep"
)


In [None]:
encoder = OneHotEncoder(
    inputCols=["education_idx", "job_idx"],
    outputCols=["education_vec", "job_vec"]
)


In [None]:
numeric_features = [
    "Average_Salary",
    "Years_Experience",
    "AI_Exposure_Index",
    "Tech_Growth_Factor",
    "Automation_Probability_2030",
    "Skill_1", "Skill_2", "Skill_3", "Skill_4", "Skill_5",
    "Skill_6", "Skill_7", "Skill_8", "Skill_9", "Skill_10"
]


In [None]:
assembler = VectorAssembler(
    inputCols=numeric_features + ["education_vec", "job_vec"],
    outputCol="features"
)


In [None]:
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=50
)


In [None]:
pipeline = Pipeline(stages=[
    label_indexer,
    education_indexer,
    job_indexer,
    encoder,
    assembler,
    lr
])


#  Train / test split, training, prediction

In [None]:
train_df, test_df = df_all.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_df)
pred = model.transform(test_df)


# Evaluation

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

print("GLOBAL MODEL F1:", evaluator.evaluate(pred))


# PART 2 — Separating Models per Node (Master / Worker1 / Worker2)

## Helper function (train & evaluate ONE split)

In [None]:
def train_eval_split(path, name):
    df = spark.read.csv(path, header=True, inferSchema=True)

    # Optional repartition (shows parallelism even on small splits)
    df = df.repartition(4)

    train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

    model = pipeline.fit(train_df)
    pred = model.transform(test_df)

    f1 = evaluator.evaluate(pred)

    print(f"{name} MODEL F1:", f1)
    return f1


## Train the three independent models

In [None]:
f1_master = train_eval_split(
    "/data/split/jobs_master.csv",
    "MASTER"
)

f1_worker1 = train_eval_split(
    "/data/split/jobs_worker1.csv",
    "WORKER 1"
)

f1_worker2 = train_eval_split(
    "/data/split/jobs_worker2.csv",
    "WORKER 2"
)


## Comparing results 

In [None]:
global_f1 = evaluator.evaluate(pred)


In [None]:
print("=== MODEL COMPARISON ===")
print(f"Global Model F1 : {global_f1}")
print(f"Master Model F1 : {f1_master}")
print(f"Worker1 Model F1: {f1_worker1}")
print(f"Worker2 Model F1: {f1_worker2}")
