# Suicide Ideation Detection (Binary Text Classification)

## Project Goal

The goal of this project is to design and evaluate a binary text classification model
that detects whether a given user-generated text indicates suicidal ideation (`suicidal`)
or does not (`non-suicidal`).

The model is trained offline using historical Reddit posts and subsequently deployed
for real-time inference as part of a streaming data processing pipeline
(Web Application → Apache Kafka → Apache Spark Structured Streaming → Apache Kafka → Web Application).

The system is intended to support early detection and prioritization of potentially
high-risk content in real-time environments.


## Design Assumptions and Research Theses

**Thesis A:**  
Linear models trained on TF-IDF representations (e.g., Logistic Regression or Linear Support Vector Machines)
provide strong and scalable baselines for large-scale text classification tasks.

**Thesis B:**  
More complex models, such as tree-based classifiers, may capture non-linear patterns in data;
however, when applied to high-dimensional and sparse text features, they often incur
significantly higher computational costs with limited performance gains.

**Thesis C:**  
In the application domain of suicide ideation detection, the cost of classification errors
is asymmetric. False negatives (failing to detect suicidal content) are considerably more
harmful than false positives; therefore, recall for the suicidal class is a critical
evaluation criterion alongside overall discrimination performance (AUC).

## Dataset
The dataset is a collection of Reddit posts (e.g., from r/SuicideWatch and r/depression), collected via the Pushshift API and published on Kaggle.  
We treat the dataset as **secondary data** with semi-structured text fields and an explicit binary label.


In [1]:
!pip -q install pyspark==3.5.1

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
         .appName("SuicideWatchBinaryClassification")
         .config("spark.sql.shuffle.partitions", "64")
         .config("spark.default.parallelism", "64")
         .getOrCreate())

spark.sparkContext.setLogLevel("WARN")
spark


### Expected CSV schema (will be verified in EDA)
Typical columns seen in this dataset:
- `text` (merged title + body or post content)
- `class` or `label` (e.g., "suicide"/"non-suicide" or 0/1)


In [2]:
DATA_PATH = "/content/Suicide_Detection.csv"

In [3]:
df_raw = (spark.read
          .option("header", True)
          .option("multiLine", True)
          .option("escape", "\"")
          .option("quote", "\"")
          .csv(DATA_PATH))

# ustaw ręcznie jeśli trzeba:
text_col = "text"
label_col = "class"

df1 = (df_raw
       .withColumn("text_raw", F.col(text_col).cast("string"))
       .withColumn("text_raw", F.regexp_replace(F.col("text_raw"), r"\s+", " "))
       .withColumn("text_raw", F.trim(F.col("text_raw")))
       .filter(F.length("text_raw") > 0)
      )

df2 = df1.withColumn(
    "label",
    F.when(F.col(label_col) == "suicide", F.lit(1.0))
     .when(F.col(label_col) == "non-suicide", F.lit(0.0))
     .otherwise(None)
).filter(F.col("label").isNotNull())

df2 = df2.select("text_raw", "label").cache()
df2.count()

train_df, test_df = df2.randomSplit([0.8, 0.2], seed=42)
train_df = train_df.cache(); test_df = test_df.cache()
train_df.count(); test_df.count()

train_df.groupBy("label").count().show()
test_df.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  1.0|92870|
|  0.0|92532|
+-----+-----+

+-----+-----+
|label|count|
+-----+-----+
|  1.0|23167|
|  0.0|23505|
+-----+-----+



In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

tokenizer = RegexTokenizer(inputCol="text_raw", outputCol="tokens", pattern=r"\W+", minTokenLength=2)
stopwords = StopWordsRemover(inputCol="tokens", outputCol="tokens_clean")

hashingTF = HashingTF(inputCol="tokens_clean", outputCol="tf", numFeatures=2**16)
idf = IDF(inputCol="tf", outputCol="features")


In [5]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

def evaluate_binary_light(pred_df, label_col="label", pred_col="prediction"):
    # AUC
    auc_eval = BinaryClassificationEvaluator(
        labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC"
    )
    auc = auc_eval.evaluate(pred_df)

    # summary metrics
    acc_eval = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=pred_col, metricName="accuracy")
    f1_eval  = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=pred_col, metricName="f1")
    wp_eval  = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=pred_col, metricName="weightedPrecision")
    wr_eval  = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol=pred_col, metricName="weightedRecall")

    metrics = {
        "AUC": float(auc),
        "accuracy": float(acc_eval.evaluate(pred_df)),
        "f1": float(f1_eval.evaluate(pred_df)),
        "weightedPrecision": float(wp_eval.evaluate(pred_df)),
        "weightedRecall": float(wr_eval.evaluate(pred_df)),
    }

    cm = pred_df.select(
        F.sum(F.when((F.col(label_col) == 0.0) & (F.col(pred_col) == 0.0), 1).otherwise(0)).alias("TN"),
        F.sum(F.when((F.col(label_col) == 0.0) & (F.col(pred_col) == 1.0), 1).otherwise(0)).alias("FP"),
        F.sum(F.when((F.col(label_col) == 1.0) & (F.col(pred_col) == 0.0), 1).otherwise(0)).alias("FN"),
        F.sum(F.when((F.col(label_col) == 1.0) & (F.col(pred_col) == 1.0), 1).otherwise(0)).alias("TP"),
    ).collect()[0]

    metrics.update({k: int(cm[k]) for k in ["TN", "FP", "FN", "TP"]})
    return metrics


Naive Bayes (TF-IDF)

In [6]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline

nb = NaiveBayes(featuresCol="features", labelCol="label", modelType="multinomial", smoothing=1.0)

pipeline_nb = Pipeline(stages=[tokenizer, stopwords, hashingTF, idf, nb])

model_nb = pipeline_nb.fit(train_df)
pred_nb = model_nb.transform(test_df)

metrics_nb = evaluate_binary_light(pred_nb)
metrics_nb


{'AUC': 0.2673110505211704,
 'accuracy': 0.8784281796366129,
 'f1': 0.8779492937751632,
 'weightedPrecision': 0.8853231873714257,
 'weightedRecall': 0.8784281796366129,
 'TN': 19119,
 'FP': 4386,
 'FN': 1288,
 'TP': 21879}

In [7]:
pred_nb.sample(False, 0.01, seed=42).groupBy("prediction").count().orderBy("prediction").show()


+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  229|
|       1.0|  268|
+----------+-----+



LinearSVC

In [8]:
from pyspark.ml.classification import LinearSVC

svc = LinearSVC(featuresCol="features", labelCol="label", maxIter=50, regParam=0.1)

pipeline_svc = Pipeline(stages=[tokenizer, stopwords, hashingTF, idf, svc])

model_svc = pipeline_svc.fit(train_df)
pred_svc = model_svc.transform(test_df)

metrics_svc = evaluate_binary_light(pred_svc)
metrics_svc


{'AUC': 0.9698043864831427,
 'accuracy': 0.9084247514569763,
 'f1': 0.9081921155289472,
 'weightedPrecision': 0.9119827931958826,
 'weightedRecall': 0.9084247514569763,
 'TN': 22461,
 'FP': 1044,
 'FN': 3230,
 'TP': 19937}

logistic regression

In [11]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50, regParam=0.0, elasticNetParam=0.0)

pipeline_lr = Pipeline(stages=[tokenizer, stopwords, hashingTF, idf, lr])
model_lr = pipeline_lr.fit(train_df)

pred_lr = model_lr.transform(test_df)
metrics_lr = evaluate_binary_light(pred_lr)
metrics_lr


{'AUC': 0.9276661434088257,
 'accuracy': 0.8751499828591018,
 'f1': 0.8751160155607656,
 'weightedPrecision': 0.8753902303992547,
 'weightedRecall': 0.8751499828591018,
 'TN': 20901,
 'FP': 2604,
 'FN': 3223,
 'TP': 19944}

In [12]:
import pandas as pd

results = pd.DataFrame([
    {"model": "LogisticRegression", **metrics_lr},
    {"model": "NaiveBayes", **metrics_nb},
    {"model": "LinearSVC", **metrics_svc},
]).sort_values(by="AUC", ascending=False)

results


Unnamed: 0,model,AUC,accuracy,f1,weightedPrecision,weightedRecall,TN,FP,FN,TP
2,LinearSVC,0.969804,0.908425,0.908192,0.911983,0.908425,22461,1044,3230,19937
0,LogisticRegression,0.927666,0.87515,0.875116,0.87539,0.87515,20901,2604,3223,19944
1,NaiveBayes,0.267311,0.878428,0.877949,0.885323,0.878428,19119,4386,1288,21879


Based on the comparative evaluation, LinearSVC was selected as the final model.
It achieved the highest AUC and accuracy while maintaining a balanced trade-off
between false positives and false negatives, which is critical in suicide ideation detection.


## Confusion Matrix Analysis (LinearSVC)

The confusion matrix summarizes the classification results:

|                | Predicted Non-Suicidal | Predicted Suicidal |
|----------------|------------------------|--------------------|
| **Actual Non-Suicidal** | TN = 22,461 | FP = 1,044 |
| **Actual Suicidal**     | FN = 3,230  | TP = 19,937 |

This matrix provides insight into the types of classification errors made by the model.


### Interpretation of Classification Errors

**False Positives (FP = 1,044)**  
These are non-suicidal texts incorrectly classified as suicidal.
In practice, this may result in:
- unnecessary escalation or moderation actions,
- increased workload for human reviewers.

However, in the application domain of suicide prevention, false positives are generally
considered less harmful than false negatives.

**False Negatives (FN = 3,230)**  
These are suicidal texts incorrectly classified as non-suicidal.
This type of error is particularly critical because:
- it may lead to missed detection of individuals at risk,
- it can prevent timely intervention or support.

Therefore, minimizing false negatives is a key priority in this project.


### Cost-Sensitive Perspective

In suicide ideation detection, the cost of classification errors is asymmetric:

- Cost(FN) >> Cost(FP)

A false negative may result in missing a person in immediate need of help,
while a false positive typically results only in additional review or alerting.

From this perspective, a model with slightly higher false positive rate
but lower false negative rate may be preferred, depending on system requirements.


### Comparison with Other Models

Compared to Logistic Regression, LinearSVC achieved a higher AUC and lower number of false positives
while maintaining a similar number of false negatives.
Naive Bayes, although producing fewer false negatives, generated an excessive number of false positives
and exhibited poor ranking ability (low AUC), making it unsuitable for threshold-based decision systems.


### Conclusion

The confusion matrix analysis confirms that LinearSVC provides the most balanced trade-off
between detection capability and error cost.
Its performance characteristics make it suitable for deployment in a real-time
suicide ideation monitoring system.


## Hyperparameter Tuning (LinearSVC)

The goal of hyperparameter tuning is to improve model performance and stability
by selecting optimal regularization strength and convergence settings.

For LinearSVC, the most important hyperparameters are:
- `regParam` — regularization strength (controls overfitting),
- `maxIter` — number of optimization iterations.


In [15]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

svc_tune = LinearSVC(
    featuresCol="features",
    labelCol="label"
)

pipeline_svc_tune = Pipeline(stages=[
    tokenizer,
    stopwords,
    hashingTF,
    idf,
    svc_tune
])


In [16]:
paramGrid = (ParamGridBuilder()
    .addGrid(svc_tune.regParam, [0.01, 0.1, 1.0])
    .addGrid(svc_tune.maxIter, [20, 50])
    .build()
)


In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

cv = CrossValidator(
    estimator=pipeline_svc_tune,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2,
    seed=42
)


In [19]:
cv_model_svc = cv.fit(train_df)


In [20]:
best_svc_model = cv_model_svc.bestModel

pred_svc_tuned = best_svc_model.transform(test_df)
metrics_svc_tuned = evaluate_binary_light(pred_svc_tuned)

metrics_svc_tuned


{'AUC': 0.9698063835803823,
 'accuracy': 0.9084247514569763,
 'f1': 0.9081921155289472,
 'weightedPrecision': 0.9119827931958826,
 'weightedRecall': 0.9084247514569763,
 'TN': 22461,
 'FP': 1044,
 'FN': 3230,
 'TP': 19937}

In [21]:
import pandas as pd

comparison = pd.DataFrame([
    {"variant": "LinearSVC (baseline)", **metrics_svc},
    {"variant": "LinearSVC (tuned)", **metrics_svc_tuned},
])

comparison


Unnamed: 0,variant,AUC,accuracy,f1,weightedPrecision,weightedRecall,TN,FP,FN,TP
0,LinearSVC (baseline),0.969804,0.908425,0.908192,0.911983,0.908425,22461,1044,3230,19937
1,LinearSVC (tuned),0.969806,0.908425,0.908192,0.911983,0.908425,22461,1044,3230,19937


## Hyperparameter Tuning Results

Hyperparameter tuning of the LinearSVC model did not lead to a significant
change in evaluation metrics compared to the baseline configuration.

This indicates that:
- the baseline model was already close to an optimal solution,
- LinearSVC exhibits high stability with respect to changes in regularization
  strength and number of iterations on this dataset.

The tuning process therefore confirmed the robustness of the selected model
rather than improving raw performance metrics.


## Final Model Selection

The tuned LinearSVC model was selected as the final classifier.
It achieved the best balance between detection capability and error cost,
with high AUC and acceptable false negative rate.

The model is suitable for deployment in a real-time streaming architecture
based on Apache Kafka and Apache Spark Structured Streaming.
