# JOB PLACEMENT CLASSIFICATION

### Step 1: Importing Libraries

In [103]:
# Let's start by importing necessary libraries
# Using Spark ML library instead of MLlib 

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col, round
from pyspark.sql.types import IntegerType, FloatType


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: Initilizing Spark Session


In [82]:
spark = SparkSession.builder.appName("JobPlacement").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 3: Load Cleaned Data from S3 (Cleaned Data from Hive)

In [83]:
# Load the Hive cleaned file (no headers)
df = spark.read.csv('s3://group4project1/Processed/', header=False, inferSchema=True)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: integer (nullable = true)

In [85]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---+---+---+---+---+---+---+---+---+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|
+---+---+---+---+---+---+---+---+---+---+----+----+
| \N| \N| \N| \N| \N| \N| \N|  0|  0| \N|  \N|   0|
|  1|7.5|  1|  1|  1| 65|4.4|  0|  0| 61|  79|   0|
|  2|8.9|  0|  3|  2| 90|4.0|  1|  1| 78|  82|   1|
|  3|7.3|  1|  2|  2| 82|4.8|  1|  0| 79|  80|   0|
|  4|7.5|  1|  1|  2| 85|4.4|  1|  1| 81|  80|   1|
+---+---+---+---+---+---+---+---+---+---+----+----+
only showing top 5 rows

In [86]:
# Now, let's provide the columns name

columns = ["std_id", "cgpa", "internship", "project", "certification", "testscore", 
           "softskillsrating", "extra", "placement", "ssc", "hsc", "placementstatus"]

df = df.toDF(*columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----+----------+-------+-------------+---------+----------------+-----+---------+---+---+---------------+
|std_id|cgpa|internship|project|certification|testscore|softskillsrating|extra|placement|ssc|hsc|placementstatus|
+------+----+----------+-------+-------------+---------+----------------+-----+---------+---+---+---------------+
|    \N|  \N|        \N|     \N|           \N|       \N|              \N|    0|        0| \N| \N|              0|
|     1| 7.5|         1|      1|            1|       65|             4.4|    0|        0| 61| 79|              0|
|     2| 8.9|         0|      3|            2|       90|             4.0|    1|        1| 78| 82|              1|
|     3| 7.3|         1|      2|            2|       82|             4.8|    1|        0| 79| 80|              0|
|     4| 7.5|         1|      1|            2|       85|             4.4|    1|        1| 81| 80|              1|
+------+----+----------+-------+-------------+---------+----------------+-----+---------

### Step 4: Perform Data Preprocessing

In [88]:
# Check for nulls
df.select([df[c].isNull().alias(c) for c in df.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----+----------+-------+-------------+---------+----------------+-----+---------+-----+-----+---------------+
|std_id| cgpa|internship|project|certification|testscore|softskillsrating|extra|placement|  ssc|  hsc|placementstatus|
+------+-----+----------+-------+-------------+---------+----------------+-----+---------+-----+-----+---------------+
| false|false|     false|  false|        false|    false|           false|false|    false|false|false|          false|
| false|false|     false|  false|        false|    false|           false|false|    false|false|false|          false|
| false|false|     false|  false|        false|    false|           false|false|    false|false|false|          false|
| false|false|     false|  false|        false|    false|           false|false|    false|false|false|          false|
| false|false|     false|  false|        false|    false|           false|false|    false|false|false|          false|
| false|false|     false|  false|        false| 

In [89]:
# Remove the first row with consits of \N
# Replace all '\N' with None (null)
for c in df.columns:
    df = df.withColumn(c, when(col(c) == "\\N", None).otherwise(col(c)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
# Now we can drop the column
df = df.na.drop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----+----------+-------+-------------+---------+----------------+-----+---------+---+---+---------------+
|std_id|cgpa|internship|project|certification|testscore|softskillsrating|extra|placement|ssc|hsc|placementstatus|
+------+----+----------+-------+-------------+---------+----------------+-----+---------+---+---+---------------+
|     1| 7.5|         1|      1|            1|       65|             4.4|    0|        0| 61| 79|              0|
|     2| 8.9|         0|      3|            2|       90|             4.0|    1|        1| 78| 82|              1|
|     3| 7.3|         1|      2|            2|       82|             4.8|    1|        0| 79| 80|              0|
|     4| 7.5|         1|      1|            2|       85|             4.4|    1|        1| 81| 80|              1|
|     5| 8.3|         1|      2|            2|       86|             4.5|    1|        1| 74| 88|              1|
+------+----+----------+-------+-------------+---------+----------------+-----+---------

In [92]:
# Now to convert string data type to integer

# Cast each column to the correct type
df = df.withColumn("std_id", col("std_id").cast(IntegerType())) \
       .withColumn("cgpa", round(col("cgpa").cast(FloatType()),2)) \
       .withColumn("internship", col("internship").cast(IntegerType())) \
       .withColumn("project", col("project").cast(IntegerType())) \
       .withColumn("certification", col("certification").cast(IntegerType())) \
       .withColumn("testscore", col("testscore").cast(IntegerType())) \
       .withColumn("softskillsrating", round(col("softskillsrating").cast(FloatType()),2)) \
       .withColumn("extra", col("extra").cast(IntegerType())) \
       .withColumn("placement", col("placement").cast(IntegerType())) \
       .withColumn("ssc", col("ssc").cast(IntegerType())) \
       .withColumn("hsc", col("hsc").cast(IntegerType())) \
       .withColumn("placementstatus", col("placementstatus").cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 5: Exploratory Data Analysis (EDA)

In [93]:
# Summerize the dataset
df.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+------------------+------------------+------------------+-----------------+-------------------+-------------------+-------------------+------------------+-----------------+------------------+
|summary|            std_id|              cgpa|        internship|           project|     certification|        testscore|   softskillsrating|              extra|          placement|               ssc|              hsc|   placementstatus|
+-------+------------------+------------------+------------------+------------------+------------------+-----------------+-------------------+-------------------+-------------------+------------------+-----------------+------------------+
|  count|             10000|             10000|             10000|             10000|             10000|            10000|              10000|              10000|              10000|             10000|            10000|             10000|
|   mean|            5000.5| 7.6980100034236

In [94]:
# Check for class imbalance

df.groupBy("placementstatus").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-----+
|placementstatus|count|
+---------------+-----+
|              1| 4197|
|              0| 5803|
+---------------+-----+

### Step 6: Assemble Initial Features

In [95]:

assembler = VectorAssembler(
    inputCols=["cgpa", "internship", "project", "certification", 
               "testscore", "softskillsrating", "extra", 
               "placement", "ssc", "hsc"], 
    outputCol="features"
)

# Assemble features
final_data = assembler.transform(df).select("features", "placementstatus")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 7: Feature Selection using Random Forest

In [96]:
# First, Train Random Forest

rf = RandomForestClassifier(labelCol="placementstatus", featuresCol="features", numTrees=50)
rf_model = rf.fit(final_data)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [97]:
# Feature names
feature_names = ["cgpa", "internship", "project", "certification", 
                 "testscore", "softskillsrating", "extra", 
                 "placement", "ssc", "hsc"]

# Extract feature importances
importances = rf_model.featureImportances

# Print feature importance nicely
for idx, importance in enumerate(importances):
    print(f"{feature_names[idx]}: {importance}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cgpa: 0.044870793266754694
internship: 0.0006021943500142089
project: 0.16259823358528794
certification: 0.0541487966224545
testscore: 0.1943758967442116
softskillsrating: 0.027508224277824737
extra: 0.2190301527676093
placement: 0.013211577375403988
ssc: 0.03672588438461719
hsc: 0.24692824662582172

#### So, from the above results we will exclude the features with value less then 0.04

In [98]:
# Updated VectorAssembler after features selection


assembler = VectorAssembler(
    inputCols=["cgpa", "project", "certification", "testscore", "extra", "hsc"], 
    outputCol="features"
)

final_data = assembler.transform(df).select("features", "placementstatus")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
final_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------+
|            features|placementstatus|
+--------------------+---------------+
|[7.5,1.0,1.0,65.0...|              0|
|[8.89999961853027...|              1|
|[7.30000019073486...|              0|
|[7.5,1.0,2.0,85.0...|              1|
|[8.30000019073486...|              1|
+--------------------+---------------+
only showing top 5 rows

### Step 8: Test-Train Split

In [100]:
# Split data 80% training, 20% testing
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

print(f"Training Data Count: {train_data.count()}")
print(f"Test Data Count: {test_data.count()}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training Data Count: 8079
Test Data Count: 1921

### Step 9: Model Training

In [101]:
# A. Decision Tree

# Initialize Decision Tree
decision_tree = DecisionTreeClassifier(labelCol="placementstatus", featuresCol="features")

# Train the model
dt_model = decision_tree.fit(train_data)

# Predict on test data
dt_predictions = dt_model.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [102]:
# B. Random Forest

# Initialize Random Forest
random_Forest = RandomForestClassifier(labelCol="placementstatus", featuresCol="features", numTrees=50)

# Train the model
rf_model = random_Forest.fit(train_data)

# Predict on test data
rf_predictions = rf_model.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [104]:
# Initialize Logistic Regression
lr = LogisticRegression(labelCol="placementstatus", featuresCol="features", maxIter=50)

# Train the model
lr_model = lr.fit(train_data)

# Predict on test data
lr_predictions = lr_model.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 10: Model Evaluation

In [106]:
# Define function to calculate AUC, Accuracy, Precision, Recall, and F1 Score


def evaluate_model(predictions, label_col="placementstatus", prediction_col="prediction", raw_prediction_col="rawPrediction"):
    binary_evaluator = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol=raw_prediction_col, metricName="areaUnderROC")
    multi_evaluator_accuracy = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName="accuracy")
    multi_evaluator_precision = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName="weightedPrecision")
    multi_evaluator_recall = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName="weightedRecall")
    multi_evaluator_f1 = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=prediction_col, metricName="f1")
    
    return {
        "AUC": binary_evaluator.evaluate(predictions),
        "Accuracy": multi_evaluator_accuracy.evaluate(predictions),
        "Precision": multi_evaluator_precision.evaluate(predictions),
        "Recall": multi_evaluator_recall.evaluate(predictions),
        "F1 Score": multi_evaluator_f1.evaluate(predictions)
    }

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [107]:
# Evaluate models

# Evaluate Decision Tree
dt_metrics = evaluate_model(dt_predictions)
print("Decision Tree Metrics:", dt_metrics)

# Evaluate Random Forest
rf_metrics = evaluate_model(rf_predictions)
print("Random Forest Metrics:", rf_metrics)

# Evaluate Logistic Regression
lr_metrics = evaluate_model(lr_predictions)
print("Logistic Regression Metrics:", lr_metrics)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Decision Tree Metrics: {'AUC': 0.6163654103479036, 'Accuracy': 0.7829255596043727, 'Precision': 0.781739708331313, 'Recall': 0.7829255596043728, 'F1 Score': 0.7817660714033596}
Random Forest Metrics: {'AUC': 0.8648338537020519, 'Accuracy': 0.797501301405518, 'Precision': 0.7967568268915035, 'Recall': 0.7975013014055179, 'F1 Score': 0.7969682095763184}
Logistic Regression Metrics: {'AUC': 0.8658067573595006, 'Accuracy': 0.7954190525767829, 'Precision': 0.7956100204151382, 'Recall': 0.795419052576783, 'F1 Score': 0.795509081333289}

#### Result: Random Forest and Logistic Regression performed well then the Decison Tree. More Specificially, Random Forest accuracy is slightly more than the Logistic Regresssion while othe metrics are similar between logistic and random forest. Decision Tree is weaker because of lower AUC as well other metrics.

### Step 11: Save Models Predictions

In [109]:
# Save Decision Tree Predictions
dt_predictions.select("placementstatus", "prediction") \
    .write.csv('s3://group4project1/Predictions/DecisionTree/', header=True)

# Save Random Forest Predictions
rf_predictions.select("placementstatus", "prediction") \
    .write.csv('s3://group4project1/Predictions/RandomForest/', header=True)

# Save Logistic Regression Predictions
lr_predictions.select("placementstatus", "prediction") \
    .write.csv('s3://group4project1/Predictions/LogisticRegression/', header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [110]:
import pandas as pd

# Organize your model metrics
model_metrics = {
    "Model": ["Decision Tree", "Random Forest", "Logistic Regression"],
    "AUC": [dt_metrics["AUC"], rf_metrics["AUC"], lr_metrics["AUC"]],
    "Accuracy": [dt_metrics["Accuracy"], rf_metrics["Accuracy"], lr_metrics["Accuracy"]],
    "Precision": [dt_metrics["Precision"], rf_metrics["Precision"], lr_metrics["Precision"]],
    "Recall": [dt_metrics["Recall"], rf_metrics["Recall"], lr_metrics["Recall"]],
    "F1 Score": [dt_metrics["F1 Score"], rf_metrics["F1 Score"], lr_metrics["F1 Score"]]
}

metrics_df = pd.DataFrame(model_metrics)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [112]:
# Save locally
metrics_df.to_csv('model_evaluation_results.csv', index=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [116]:
import os
print(os.getcwd())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

/home/hadoop

In [117]:
import boto3

# Create S3 client
s3 = boto3.client('s3')

# Open and upload the file
with open('/home/hadoop/model_evaluation_results.csv', 'rb') as data:
    s3.upload_fileobj(data, 'group4project1', 'EvaluationResults/model_evaluation_results.csv')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

