## Decision Trees

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load training data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Tokenize text data
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

# Compute TF-IDF
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
df = hashing_tf.transform(df)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(df)  # Fit once and reuse
df = idf_model.transform(df)

# Prepare dataset
train, test = df.randomSplit([0.8, 0.2], seed=42)

# Train Decision Tree model
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dt_model = dt.fit(train)

# Evaluate
predictions = dt_model.transform(test)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedRecall")
evaluator_auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

accuracy = evaluator_accuracy.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
auc = evaluator_auc.evaluate(predictions)

print(f"Decision Tree Accuracy: {accuracy}")
print(f"Decision Tree F1-score: {f1_score}")
print(f"Decision Tree Precision: {precision}")
print(f"Decision Tree Recall: {recall}")
print(f"Decision Tree AUC-ROC: {auc}")

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Apply same preprocessing steps to new data
df_predict = tokenizer.transform(df_predict)
df_predict = hashing_tf.transform(df_predict)
df_predict = idf_model.transform(df_predict)  # Use trained idf_model

# Make Predictions
df_predict = dt_model.transform(df_predict).select("text", "prediction")

# Display results
for row in df_predict.collect():
    text, prediction = row["text"], row["prediction"]
    status = "Stressed" if prediction == 1 else "Not Stressed"
    print(f"Text: {text} --> Prediction: {status}\n")

    if prediction == 1:
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


25/04/04 12:49:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
25/04/04 12:49:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Decision Tree Accuracy: 0.6261682242990654
Decision Tree F1-score: 0.6119050206476854
Decision Tree Precision: 0.6453037574532903
Decision Tree Recall: 0.6261682242990654
Decision Tree AUC-ROC: 0.5911949685534592
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Not Stressed

Text: 📸 (Image of you working or something that repr

## Gradient Boosted Trees

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load training data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Tokenize text data
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

# Compute TF-IDF
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
df = hashing_tf.transform(df)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(df)  # Fit once and reuse
df = idf_model.transform(df)

# Prepare dataset
train, test = df.randomSplit([0.8, 0.2], seed=42)

# Train Gradient Boosted Trees model
gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=10)
gbt_model = gbt.fit(train)

# Evaluate
predictions = gbt_model.transform(test)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedRecall")
evaluator_auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

accuracy = evaluator_accuracy.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
auc = evaluator_auc.evaluate(predictions)

print(f"Gradient Boosted Trees Accuracy: {accuracy}")
print(f"Gradient Boosted Trees F1-score: {f1_score}")
print(f"Gradient Boosted Trees Precision: {precision}")
print(f"Gradient Boosted Trees Recall: {recall}")
print(f"Gradient Boosted Trees AUC-ROC: {auc}")

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Apply same preprocessing steps to new data
df_predict = tokenizer.transform(df_predict)
df_predict = hashing_tf.transform(df_predict)
df_predict = idf_model.transform(df_predict) 

# Make Predictions
df_predict = gbt_model.transform(df_predict).select("text", "prediction")

# Display results
for row in df_predict.collect():
    text, prediction = row["text"], row["prediction"]
    status = "Stressed" if prediction == 1 else "Not Stressed"
    print(f"Text: {text} --> Prediction: {status}\n")

    if prediction == 1:
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Gradient Boosted Trees Accuracy: 0.5981308411214953
Gradient Boosted Trees F1-score: 0.5895139736033284
Gradient Boosted Trees Precision: 0.6054021685664792
Gradient Boosted Trees Recall: 0.5981308411214953
Gradient Boosted Trees AUC-ROC: 0.653214535290007
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Not Stressed

Text: 📸 

## Support Vector Machine

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load training data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Tokenize text data
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

# Compute TF-IDF
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
df = hashing_tf.transform(df)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(df)  # Fit once and reuse
df = idf_model.transform(df)

# Prepare dataset
train, test = df.randomSplit([0.8, 0.2], seed=42)

# Train SVM model
svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=10)
svm_model = svm.fit(train)

# Evaluate
predictions = svm_model.transform(test)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedRecall")
evaluator_auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

accuracy = evaluator_accuracy.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
auc = evaluator_auc.evaluate(predictions)

print(f"SVM Accuracy: {accuracy}")
print(f"SVM F1-score: {f1_score}")
print(f"SVM Precision: {precision}")
print(f"SVM Recall: {recall}")
print(f"SVM AUC-ROC: {auc}")

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Apply same preprocessing steps to new data
df_predict = tokenizer.transform(df_predict)
df_predict = hashing_tf.transform(df_predict)
df_predict = idf_model.transform(df_predict)  

# Make Predictions
df_predict = svm_model.transform(df_predict).select("text", "prediction")

# Display results
for row in df_predict.collect():
    text, prediction = row["text"], row["prediction"]
    status = "Stressed" if prediction == 1 else "Not Stressed"
    print(f"Text: {text} --> Prediction: {status}\n")

    if prediction == 1:
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")

SVM Accuracy: 0.6074766355140186
SVM F1-score: 0.6043690437825145
SVM Precision: 0.6120618737441168
SVM Recall: 0.6074766355140186
SVM AUC-ROC: 0.6502445842068483
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Stressed

[91mALERT: The text is classified as 'Stressed'![0m

Text: 📸 (Image of you working or something that rep

## Logistic Regression

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load training data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Tokenize text data
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

# Compute TF-IDF
hashing_tf = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
df = hashing_tf.transform(df)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(df)  # Fit once and reuse
df = idf_model.transform(df)

# Prepare dataset
train, test = df.randomSplit([0.8, 0.2], seed=42)

# Train Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
lr_model = lr.fit(train)

# Evaluate
predictions = lr_model.transform(test)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", metricName="f1")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedRecall")
evaluator_auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

accuracy = evaluator_accuracy.evaluate(predictions)
f1_score = evaluator_f1.evaluate(predictions)
precision = evaluator_precision.evaluate(predictions)
recall = evaluator_recall.evaluate(predictions)
auc = evaluator_auc.evaluate(predictions)

print(f"Logistic Regression Accuracy: {accuracy}")
print(f"Logistic Regression F1-score: {f1_score}")
print(f"Logistic Regression Precision: {precision}")
print(f"Logistic Regression Recall: {recall}")
print(f"Logistic Regression AUC-ROC: {auc}")

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Apply same preprocessing steps to new data
df_predict = tokenizer.transform(df_predict)
df_predict = hashing_tf.transform(df_predict)
df_predict = idf_model.transform(df_predict)  

# Make Predictions
df_predict = lr_model.transform(df_predict).select("text", "prediction")

# Display results
for row in df_predict.collect():
    text, prediction = row["text"], row["prediction"]
    status = "Stressed" if prediction == 1 else "Not Stressed"
    print(f"Text: {text} --> Prediction: {status}\n")

    if prediction == 1:
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Logistic Regression Accuracy: 0.6074766355140186
Logistic Regression F1-score: 0.6064458493677845
Logistic Regression Precision: 0.6092586725803897
Logistic Regression Recall: 0.6074766355140186
Logistic Regression AUC-ROC: 0.6401118099231305
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Stressed

[91mALERT: The text is classified as 'Stressed'![0m

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning -

## Hard Voting Classifier

#### Combine the predictions of multiple classifiers. The algorithm works by first having each classifier make a prediction. The ensemble’s prediction is then simply the majority vote of the individual classifiers.

In [14]:
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType

# Initialize models
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=10)
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

# Train models
dt_model = dt.fit(train)
gbt_model = gbt.fit(train)
lr_model = lr.fit(train)

# Get predictions for each model
dt_pred = dt_model.transform(test).select("features", "label", "prediction").withColumnRenamed("prediction", "dt_pred")
gbt_pred = gbt_model.transform(test).select("features", "prediction").withColumnRenamed("prediction", "gbt_pred")
lr_pred = lr_model.transform(test).select("features", "prediction").withColumnRenamed("prediction", "lr_pred")

# Combine predictions using "features" as the join key
combined = dt_pred.join(gbt_pred, "features").join(lr_pred, "features").select("features", "label", "dt_pred", "gbt_pred", "lr_pred")

# Ensure `label` is of DoubleType
combined = combined.withColumn("label", col("label").cast(DoubleType()))

# Majority voting (hard voting)
combined = combined.withColumn(
    "final_prediction",
    when((col("dt_pred") + col("gbt_pred") + col("lr_pred")) >= 2, 1).otherwise(0).cast(DoubleType())
)

# Evaluate majority vote accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="final_prediction", metricName="accuracy")
accuracy = evaluator.evaluate(combined)

# Compute F1-score, Precision, Recall, AUC-ROC
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="final_prediction", metricName="f1")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="final_prediction", metricName="precisionByLabel")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="final_prediction", metricName="recallByLabel")
auc_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="final_prediction", metricName="areaUnderROC")

f1_score = f1_evaluator.evaluate(combined)
precision = precision_evaluator.evaluate(combined)
recall = recall_evaluator.evaluate(combined)
auc_roc = auc_evaluator.evaluate(combined)

print(f"Voting Classifier Accuracy: {accuracy}")
print(f"Voting Classifier F1-score: {f1_score}")
print(f"Voting Classifier Precision: {precision}")
print(f"Voting Classifier Recall: {recall}")
print(f"Voting Classifier AUC-ROC: {auc_roc}")

# **Make Predictions on New Data**
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Ensure text column exists
if "text" not in df_predict.columns:
    print("\033[91mError: 'text' column missing in prediction dataset.\033[0m")
    exit()

# Apply preprocessing steps
df_predict = tokenizer.transform(df_predict)
df_predict = hashing_tf.transform(df_predict)
df_predict = idf_model.transform(df_predict)

# Get predictions from each model
dt_pred = dt_model.transform(df_predict).select("features", "prediction").withColumnRenamed("prediction", "dt_pred")
gbt_pred = gbt_model.transform(df_predict).select("features", "prediction").withColumnRenamed("prediction", "gbt_pred")
lr_pred = lr_model.transform(df_predict).select("features", "prediction").withColumnRenamed("prediction", "lr_pred")

# Combine predictions for voting
df_predict = df_predict.join(dt_pred, "features").join(gbt_pred, "features").join(lr_pred, "features")
df_predict = df_predict.withColumn(
    "final_prediction",
    when((col("dt_pred") + col("gbt_pred") + col("lr_pred")) >= 2, 1).otherwise(0).cast(DoubleType())
)

# Display results
for row in df_predict.select("text", "final_prediction").collect():
    text, final_pred = row["text"], row["final_prediction"]
    status = "Stressed" if final_pred == 1 else "Not Stressed"
    
    print(f"Text: {text} --> Prediction: {status}\n")
    if final_pred == 1:
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Voting Classifier Accuracy: 0.616822429906542
Voting Classifier F1-score: 0.6086063469241039
Voting Classifier Precision: 0.6578947368421053
Voting Classifier Recall: 0.4716981132075472
Voting Classifier AUC-ROC: 0.6154786862334032
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthink

## Stacking Classifier

#### Stacked generalization consists in stacking the output of individual estimator and use a classifier to compute the final prediction. Stacking allows to use the strength of each individual estimator by using their output as input of a final estimator.

In [15]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.sql.functions import col

# Train base models
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)
rf_model = rf.fit(train)

gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=10)
gbt_model = gbt.fit(train)

# Get predictions from base models
rf_pred = rf_model.transform(test).select("features", "prediction").withColumnRenamed("prediction", "rf_pred")
gbt_pred = gbt_model.transform(test).select("features", "prediction").withColumnRenamed("prediction", "gbt_pred")

# Prepare meta-training data (Ensure all required columns exist)
meta_train = test.select("features", "label").join(rf_pred, "features").join(gbt_pred, "features")

# Assemble meta-features
meta_assembler = VectorAssembler(inputCols=["rf_pred", "gbt_pred"], outputCol="meta_features")
meta_train = meta_assembler.transform(meta_train)

# Train meta-classifier (Logistic Regression)
meta_lr = LogisticRegression(featuresCol="meta_features", labelCol="label", maxIter=10)
meta_model = meta_lr.fit(meta_train)

# Evaluate stacking classifier
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(meta_model.transform(meta_train))

f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precisionByLabel")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="recallByLabel")
auc_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")

f1_score = f1_evaluator.evaluate(meta_model.transform(meta_train))
precision = precision_evaluator.evaluate(meta_model.transform(meta_train))
recall = recall_evaluator.evaluate(meta_model.transform(meta_train))
auc_roc = auc_evaluator.evaluate(meta_model.transform(meta_train))

print(f"Stacking Classifier Accuracy: {accuracy}")
print(f"Stacking Classifier F1-score: {f1_score}")
print(f"Stacking Classifier Precision: {precision}")
print(f"Stacking Classifier Recall: {recall}")
print(f"Stacking Classifier AUC-ROC: {auc_roc}")

# **Make Predictions on New Data**
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Ensure `text` column exists
if "text" not in df_predict.columns:
    print("\033[91mError: 'text' column missing in prediction dataset.\033[0m")
    exit()

# Apply preprocessing steps
df_predict = tokenizer.transform(df_predict)
df_predict = hashing_tf.transform(df_predict)
df_predict = idf_model.transform(df_predict)

# Get predictions from base models
rf_pred = rf_model.transform(df_predict).select("features", "prediction").withColumnRenamed("prediction", "rf_pred")
gbt_pred = gbt_model.transform(df_predict).select("features", "prediction").withColumnRenamed("prediction", "gbt_pred")

# Prepare data for meta-classifier
df_predict = df_predict.join(rf_pred, "features").join(gbt_pred, "features")
df_predict = meta_assembler.transform(df_predict)

# Get final predictions
final_predictions = meta_model.transform(df_predict)

# Display results
for row in final_predictions.select("text", "prediction").collect():
    text, final_pred = row["text"], row["prediction"]
    status = "Stressed" if final_pred == 1 else "Not Stressed"
    
    print(f"Text: {text} --> Prediction: {status}\n")
    if final_pred == 1:
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")

Stacking Classifier Accuracy: 0.6261682242990654
Stacking Classifier F1-score: 0.6251865232074139
Stacking Classifier Precision: 0.6101694915254238
Stacking Classifier Recall: 0.6792452830188679
Stacking Classifier AUC-ROC: 0.626659678546471
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)

In [21]:
import torch
torch.cuda.empty_cache()

## XLNet

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Convert Spark DataFrame to Pandas
data_pd = df.select("text", "label").toPandas()

# Choose XLNet model
model_name = "xlnet-base-cased"

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)

# Tokenize text data
encodings = tokenizer(list(data_pd["text"]), truncation=True, padding=True, max_length=128, return_tensors="pt")
labels = list(data_pd["label"])

# Create dataset class
class StressDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = torch.tensor(labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

dataset = StressDataset(encodings, labels)

# Split dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Load the XLNet model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    per_device_train_batch_size=4,  
    per_device_eval_batch_size=4,  
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    learning_rate=2e-5,
    fp16=True 
)


# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

# Train the model
trainer.train()

# Evaluate
results = trainer.evaluate()
predictions = trainer.predict(test_dataset)
y_pred = torch.argmax(torch.tensor(predictions.predictions), dim=-1).tolist()
y_true = [data["labels"].item() for data in test_dataset]

accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
auc_roc = roc_auc_score(y_true, y_pred)

print(f"Test Accuracy: {accuracy}")
print(f"F1-score: {f1}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"AUC-ROC: {auc_roc}")


# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Prediction function
def predict_stress(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    outputs = model(**inputs)
    probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_label = torch.argmax(probs).item()
    return "Stressed" if pred_label == 1 else "Not Stressed"

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Convert Spark DataFrame to Pandas for easier processing
prediction_pd = df_predict.select("text").toPandas()

# **Make Predictions on the New Data**
for text in prediction_pd["text"]:
    prediction = predict_stress(text)
    print(f"Text: {text} --> Prediction: {prediction}\n")

    if prediction == "Stressed":
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Some weights of XLNetForSequenceClassification were not initialized from the model checkpoint at xlnet-base-cased and are newly initialized: ['logits_proj.bias', 'logits_proj.weight', 'sequence_summary.summary.bias', 'sequence_summary.summary.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Epoch,Training Loss,Validation Loss
1,0.691,0.64748
2,0.5309,0.718872
3,0.43,1.06704


Test Accuracy: 0.7803030303030303
F1-score: 0.7913669064748201
Precision: 0.7142857142857143
Recall: 0.8870967741935484
AUC-ROC: 0.7864055299539171
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Not Stressed

Text: 📸 (Image of you working or something that represents your current journey)" --> Prediction: Not Stressed

Text:

In [24]:
device = torch.device("cpu")
model.to(device)

XLNetForSequenceClassification(
  (transformer): XLNetModel(
    (word_embedding): Embedding(32000, 768)
    (layer): ModuleList(
      (0-11): 12 x XLNetLayer(
        (rel_attn): XLNetRelativeAttention(
          (layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
          (dropout): Dropout(p=0.1, inplace=False)
        )
        (ff): XLNetFeedForward(
          (layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
          (layer_1): Linear(in_features=768, out_features=3072, bias=True)
          (layer_2): Linear(in_features=3072, out_features=768, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (activation_function): GELUActivation()
        )
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (sequence_summary): SequenceSummary(
    (summary): Linear(in_features=768, out_features=768, bias=True)
    (activation): Tanh()
    (first_dropout): Identity()
    (last

## BERT

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Convert Spark DataFrame to Pandas
data_pd = df.select("text", "label").toPandas()

# Choose a model
model_name = "bert-base-uncased"

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Tokenize text data
encodings = tokenizer(list(data_pd["text"]), truncation=True, padding=True, max_length=128, return_tensors="pt")
labels = list(data_pd["label"])

# Create dataset class
class StressDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = torch.tensor(labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

dataset = StressDataset(encodings, labels)

# Split dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Load the model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    per_device_train_batch_size=8,  
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    learning_rate=2e-5,
    fp16=True  
)

# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

# Train the model
trainer.train()

# **Evaluate the model**
results = trainer.predict(test_dataset)
y_pred = torch.argmax(torch.tensor(results.predictions), dim=-1).tolist()
y_true = [data['labels'].item() for data in test_dataset]

# Compute evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
auc_roc = roc_auc_score(y_true, y_pred)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"AUC-ROC: {auc_roc:.4f}")

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# **Prediction function**
def predict_stress(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    outputs = model(**inputs)
    probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_label = torch.argmax(probs).item()
    return "Stressed" if pred_label == 1 else "Not Stressed"

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Convert Spark DataFrame to Pandas for easier processing
prediction_pd = df_predict.select("text").toPandas()

# **Make Predictions on the New Data**
for text in prediction_pd["text"]:
    prediction = predict_stress(text)
    print(f"Text: {text} --> Prediction: {prediction}\n")

    if prediction == "Stressed":
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Epoch,Training Loss,Validation Loss
1,No log,0.518986
2,0.574500,0.437745
3,0.574500,0.445013


Test Accuracy: 0.8333
F1-score: 0.8382
Precision: 0.8507
Recall: 0.8261
AUC-ROC: 0.8337
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Not Stressed

Text: 📸 (Image of you working or something that represents your current journey)" --> Prediction: Not Stressed

Text: I'm feeling exhausted and can't focus. --> Prediction: Stre

## DistilBERT

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Convert Spark DataFrame to Pandas
data_pd = df.select("text", "label").toPandas()

# Choose a model
model_name = "distilbert-base-uncased"

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Tokenize text data
encodings = tokenizer(list(data_pd["text"]), truncation=True, padding=True, max_length=128, return_tensors="pt")
labels = list(data_pd["label"])

# Create dataset class
class StressDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = torch.tensor(labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

dataset = StressDataset(encodings, labels)

# Split dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Load the model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    per_device_train_batch_size=8,  # Reduce batch size for memory efficiency
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    learning_rate=2e-5,
    fp16=True  # Use mixed precision for better performance
)

# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

# Train the model
trainer.train()

# **Evaluate the model**
results = trainer.predict(test_dataset)
y_pred = torch.argmax(torch.tensor(results.predictions), dim=-1).tolist()
y_true = [data['labels'].item() for data in test_dataset]

# Compute evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
auc_roc = roc_auc_score(y_true, y_pred)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"AUC-ROC: {auc_roc:.4f}")

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# **Prediction function**
def predict_stress(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    outputs = model(**inputs)
    probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_label = torch.argmax(probs).item()
    return "Stressed" if pred_label == 1 else "Not Stressed"

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Convert Spark DataFrame to Pandas for easier processing
prediction_pd = df_predict.select("text").toPandas()

# **Make Predictions on the New Data**
for text in prediction_pd["text"]:
    prediction = predict_stress(text)
    print(f"Text: {text} --> Prediction: {prediction}\n")

    if prediction == "Stressed":
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Epoch,Training Loss,Validation Loss
1,No log,0.516546
2,0.566200,0.407399
3,0.566200,0.429251


Test Accuracy: 0.7652
F1-score: 0.7669
Precision: 0.7183
Recall: 0.8226
AUC-ROC: 0.7684
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Not Stressed

Text: 📸 (Image of you working or something that represents your current journey)" --> Prediction: Not Stressed

Text: I'm feeling exhausted and can't focus. --> Prediction: Stre

## RoBERTa

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Convert Spark DataFrame to Pandas
data_pd = df.select("text", "label").toPandas()

# Choose a model
model_name = "roberta-base"  # Using RoBERTa for improved performance

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Tokenize text data
encodings = tokenizer(list(data_pd["text"]), truncation=True, padding=True, max_length=128, return_tensors="pt")
labels = list(data_pd["label"])

# Create dataset class
class StressDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = torch.tensor(labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

dataset = StressDataset(encodings, labels)

# Split dataset into training and testing
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Load the model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    per_device_train_batch_size=8,  # Reduced batch size for memory efficiency
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    learning_rate=2e-5,
    fp16=True  # Mixed precision for better performance
)

# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

# Train the model
trainer.train()

# **Evaluate the model**
results = trainer.predict(test_dataset)
y_pred = torch.argmax(torch.tensor(results.predictions), dim=-1).tolist()
y_true = [data['labels'].item() for data in test_dataset]

# Compute evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
auc_roc = roc_auc_score(y_true, y_pred)

# Display results
print(f"Test Accuracy: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"AUC-ROC: {auc_roc:.4f}")

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# **Prediction function**
def predict_stress(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    outputs = model(**inputs)
    probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_label = torch.argmax(probs).item()
    return "Stressed" if pred_label == 1 else "Not Stressed"

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Convert Spark DataFrame to Pandas for easier processing
prediction_pd = df_predict.select("text").toPandas()

# **Make Predictions on the New Data**
for text in prediction_pd["text"]:
    prediction = predict_stress(text)
    print(f"Text: {text} --> Prediction: {prediction}\n")

    if prediction == "Stressed":
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Epoch,Training Loss,Validation Loss
1,No log,0.54394
2,0.614600,0.448078
3,0.614600,0.521296


Test Accuracy: 0.8258
F1-score: 0.8296
Precision: 0.7671
Recall: 0.9032
AUC-ROC: 0.8302
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Stressed

[91mALERT: The text is classified as 'Stressed'![0m

Text: Currently --> Prediction: Not Stressed

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Not Stressed

Text: 📸 (Image of you working or something that represents your current journey)" --> Prediction: Not Stressed

Text: I'm feel

## Electra

In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Convert Spark DataFrame to Pandas
data_pd = df.select("text", "label").toPandas()

# Choose a model
model_name = "google/electra-small-discriminator"  # Using ELECTRA for efficient classification

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Tokenize text data
encodings = tokenizer(list(data_pd["text"]), truncation=True, padding=True, max_length=128, return_tensors="pt")
labels = list(data_pd["label"])

# Create dataset class
class StressDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = torch.tensor(labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

dataset = StressDataset(encodings, labels)

# Split dataset into training and testing
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Load the model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",
    per_device_train_batch_size=8,  # Reduced batch size for memory efficiency
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    learning_rate=2e-5,
    fp16=True  # Mixed precision for better performance
)

# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

# Train the model
trainer.train()

# **Evaluate the model**
results = trainer.predict(test_dataset)
y_pred = torch.argmax(torch.tensor(results.predictions), dim=-1).tolist()
y_true = [data['labels'].item() for data in test_dataset]

# Compute evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
auc_roc = roc_auc_score(y_true, y_pred)

# Display results
print(f"Test Accuracy: {accuracy:.4f}")
print(f"F1-score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"AUC-ROC: {auc_roc:.4f}")

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# **Prediction function**
def predict_stress(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    outputs = model(**inputs)
    probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_label = torch.argmax(probs).item()
    return "Stressed" if pred_label == 1 else "Not Stressed"

# **Load New Data for Prediction**
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Convert Spark DataFrame to Pandas for easier processing
prediction_pd = df_predict.select("text").toPandas()

# **Make Predictions on the New Data**
for text in prediction_pd["text"]:
    prediction = predict_stress(text)
    print(f"Text: {text} --> Prediction: {prediction}\n")

    if prediction == "Stressed":
        print("\033[91mALERT: The text is classified as 'Stressed'!\033[0m\n")


Some weights of ElectraForSequenceClassification were not initialized from the model checkpoint at google/electra-small-discriminator and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Epoch,Training Loss,Validation Loss
1,No log,0.674365
2,0.673300,0.643821
3,0.673300,0.625202


Test Accuracy: 0.6667
F1-score: 0.6986
Precision: 0.6071
Recall: 0.8226
AUC-ROC: 0.6756
Text: hello --> Prediction: Not Stressed

Text: hello --> Prediction: Not Stressed

Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny --> Prediction: Not Stressed

Text: Currently --> Prediction: Stressed

[91mALERT: The text is classified as 'Stressed'![0m

Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" --> Prediction: Not Stressed

Text: """Stress level: 100. But at least I can laugh about it --> Prediction: Not Stressed

Text: """Grateful for all the amazing things happening around me! Learning --> Prediction: Not Stressed

Text: 📸 (Image of you working or something that represents your current journey)" --> Prediction: Not Stressed

Text: I'm feel

### Fine tune Electra

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score

# Initialize Spark session
spark = SparkSession.builder.appName("StressDetection").getOrCreate()

# Load data from HDFS
data_path = "hdfs://localhost:9000/inputs/dreaddit_StressAnalysis-Sheet1.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Clean and preprocess labels
df = df.withColumn("label", when(col("label") == "0", 0).when(col("label") == "1", 1).otherwise(None))
df = df.withColumn("label", col("label").cast("integer"))
df = df.na.drop(subset=["label"])

# Convert Spark DataFrame to Pandas
data_pd = df.select("text", "label").toPandas()

# Load pre-trained model and tokenizer
model_name = "google/electra-small-discriminator"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Create dataset class
class StressDataset(Dataset):
    def __init__(self, texts, labels):
        self.encodings = tokenizer(texts, truncation=True, padding="max_length", max_length=128)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

# Prepare dataset
dataset = StressDataset(data_pd["text"].tolist(), data_pd["label"].tolist())

# Split dataset
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Load the model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    save_strategy="epoch",
    save_total_limit=2,  # Keep only the last 2 checkpoints
    logging_dir="./logs",
    logging_steps=100,
    learning_rate=2e-5,
    report_to="none"  # Disable logging to external services
)

# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset
)

# Train the model
trainer.train()

# Evaluate the model
results = trainer.predict(test_dataset)
y_pred = torch.argmax(torch.tensor(results.predictions), dim=-1).tolist()
y_true = [data["labels"].item() for data in test_dataset]

# Compute evaluation metrics
accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
auc_roc = roc_auc_score(y_true, y_pred)

# Display results
print("\n\033[94mModel Evaluation Metrics:\033[0m")
print(f"🔹 Accuracy: {accuracy:.4f}")
print(f"🔹 F1-score: {f1:.4f}")
print(f"🔹 Precision: {precision:.4f}")
print(f"🔹 Recall: {recall:.4f}")
print(f"🔹 AUC-ROC: {auc_roc:.4f}\n")

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Prediction function
def predict_stress(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding="max_length", max_length=128)
    inputs = {key: val.to(device) for key, val in inputs.items()}
    
    with torch.no_grad():
        outputs = model(**inputs)
        probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
        pred_label = torch.argmax(probs).item()
    
    return "Stressed" if pred_label == 1 else "Not Stressed"

# Load new data for prediction
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Convert Spark DataFrame to Pandas
prediction_pd = df_predict.select("text").toPandas()

# Make predictions on new data
print("\033[94mStress Prediction Results:\033[0m\n")
for text in prediction_pd["text"]:
    prediction = predict_stress(text)
    print(f"🔸 Text: {text} \n   → Prediction: {prediction}\n")

    # Generate alert if stressed
    if prediction == "Stressed":
        print("\033[91m ALERT: This text is classified as 'Stressed'!\033[0m\n")


Some weights of ElectraForSequenceClassification were not initialized from the model checkpoint at google/electra-small-discriminator and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Epoch,Training Loss,Validation Loss
1,No log,0.686057
2,No log,0.678888
3,No log,0.675977



[94mModel Evaluation Metrics:[0m
🔹 Accuracy: 0.6742
🔹 F1-score: 0.7571
🔹 Precision: 0.6204
🔹 Recall: 0.9710
🔹 AUC-ROC: 0.6601

[94mStress Prediction Results:[0m

🔸 Text: hello 
   → Prediction: Not Stressed

🔸 Text: hello 
   → Prediction: Not Stressed

🔸 Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny 
   → Prediction: Not Stressed

🔸 Text: Currently 
   → Prediction: Stressed

[91m ALERT: This text is classified as 'Stressed'![0m

🔸 Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" 
   → Prediction: Not Stressed

🔸 Text: """Stress level: 100. But at least I can laugh about it 
   → Prediction: Not Stressed

🔸 Text: """Grateful for all the amazing things happening around me! Learning 
   → Prediction: Stressed

[91m ALERT: This tex

### Fine tune BERT

In [36]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score
import numpy as np

# Load model & tokenizer
model_name = "bert-base-uncased"  
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Dataset Class
class StressDataset(Dataset):
    def __init__(self, texts, labels):
        self.encodings = tokenizer(texts, truncation=True, padding="longest", max_length=128)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

# Compute class weights for imbalanced dataset
class_weights = compute_class_weight("balanced", classes=np.array([0,1]), y=data_pd["label"])
class_weights = torch.tensor(class_weights, dtype=torch.float).to("cuda")

# Split dataset
dataset = StressDataset(data_pd["text"].tolist(), data_pd["label"].tolist())
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Load the model
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)

# Use class weights in loss function
from torch.nn import CrossEntropyLoss
loss_fn = CrossEntropyLoss(weight=class_weights)

# Training arguments (adjusted)
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=5,  # More epochs
    logging_steps=50,
    learning_rate=5e-6,  # Lower learning rate for better fine-tuning
    warmup_steps=500,
    weight_decay=0.01,
    report_to="none"
)

# Define Trainer with weighted loss
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=lambda pred: {"accuracy": (pred.predictions.argmax(-1) == pred.label_ids).mean()}
)

# Train
trainer.train()

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define prediction function
def predict_stress(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    inputs = {key: val.to(device) for key, val in inputs.items()}

    with torch.no_grad():
        outputs = model(**inputs)
    
    probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
    pred_label = torch.argmax(probs).item()
    
    return "Stressed" if pred_label == 1 else "Not Stressed"

# Load new data for prediction
prediction_data_path = "hdfs://localhost:9000/stress_analyse/input_/output_messages.csv"
df_predict = spark.read.csv(prediction_data_path, header=True, inferSchema=True)

# Convert Spark DataFrame to Pandas
prediction_pd = df_predict.select("text").toPandas()

# Make predictions
print("\n\033[94mStress Prediction Results:\033[0m\n")
for text in prediction_pd["text"]:
    prediction = predict_stress(text)
    print(f"🔹 Text: {text} \n   → Prediction: {prediction}\n")

    if prediction == "Stressed":
        print("\033[91m ALERT: This text is classified as 'Stressed'!\033[0m\n")


# **Evaluation Function with Accuracy, F1-score, Precision, Recall, and AUC-ROC**
def evaluate_model(test_dataset):
    model.eval()
    all_preds, all_labels = [], []

    test_loader = DataLoader(test_dataset, batch_size=16)
    
    for batch in test_loader:
        inputs = {key: val.to(device) for key, val in batch.items() if key != "labels"}
        labels = batch["labels"].to(device)

        with torch.no_grad():
            outputs = model(**inputs)

        preds = torch.argmax(outputs.logits, dim=-1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    # Compute metrics
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    auc_roc = roc_auc_score(all_labels, all_preds)

    return acc, f1, precision, recall, auc_roc

# Compute final metrics
final_accuracy, final_f1, final_precision, final_recall, final_auc = evaluate_model(test_dataset)

# Print final results
print("\n\033[94mModel Evaluation Metrics:\033[0m")
print(f"🔹 Accuracy: {final_accuracy:.4f}")
print(f"🔹 F1-score: {final_f1:.4f}")
print(f"🔹 Precision: {final_precision:.4f}")
print(f"🔹 Recall: {final_recall:.4f}")
print(f"🔹 AUC-ROC: {final_auc:.4f}\n")


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Epoch,Training Loss,Validation Loss,Accuracy
1,No log,0.709467,0.515152
2,0.714000,0.7005,0.537879
3,0.714000,0.693787,0.545455
4,0.697100,0.681428,0.568182
5,0.662700,0.666027,0.613636



[94mStress Prediction Results:[0m

🔹 Text: hello 
   → Prediction: Not Stressed

🔹 Text: hello 
   → Prediction: Not Stressed

🔹 Text: """Trying to decide between a mental breakdown or a stress nap...  #LifeChoices #Overthinking #Stressed""  (A funny 
   → Prediction: Not Stressed

🔹 Text: Currently 
   → Prediction: Not Stressed

🔹 Text: "That 3 AM realization that the deadline is in 3 hours... 😅 Time to activate superhuman focus mode. #DeadlineStress #LateNightWork #FocusMode"" 📸 (Image of a clock showing the time or a shot of your workspace at night)" 
   → Prediction: Not Stressed

🔹 Text: """Stress level: 100. But at least I can laugh about it 
   → Prediction: Stressed

[91m ALERT: This text is classified as 'Stressed'![0m

🔹 Text: """Grateful for all the amazing things happening around me! Learning 
   → Prediction: Stressed

[91m ALERT: This text is classified as 'Stressed'![0m

🔹 Text: 📸 (Image of you working or something that represents your current journey)" 
   → Pre