# Sentiment Analysis Dataset reddit-depression-dataset Dengan Menggunakan PySpark

## Setup Requirement

In [1]:
!pip install pyspark
!pip install findspark
!pip install matplotlib seaborn scikit-learn

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os

spark = SparkSession.builder \
    .appName('dt_rf_gbt') \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memoryOverhead", "1g") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()


Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=30f3a99b00177475ee2c8e89a736f51c6ef16af50c4c1194f88a6066fdfe07f1
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/12 03:13:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import pandas as pd
import os

## Load Dataset

In [3]:
import pandas as pd
import os

# Load dataset
file_path = "/kaggle/input/reddit-depression-dataset/reddit_depression_dataset.csv"
df_pandas = pd.read_csv(file_path)

file_size = os.path.getsize(file_path)
memory_usage_bytes = df_pandas.memory_usage(deep=True).sum()
memory_usage_mb = memory_usage_bytes / (1024 * 1024)
print(f"Size Dataset: {memory_usage_mb:.2f} MB.")

# Pisahkan data berdasarkan label
df_label_0 = df_pandas[df_pandas['label'] == 0]
df_label_1 = df_pandas[df_pandas['label'] == 1]

# Tentukan jumlah minimum dari kedua label untuk membuat dataset seimbang
min_count = min(len(df_label_0), len(df_label_1))

# Lakukan undersampling pada label mayoritas
df_label_0_balanced = df_label_0.sample(n=min_count, random_state=42)
df_label_1_balanced = df_label_1.sample(n=min_count, random_state=42)

# Gabungkan kembali data yang seimbang
df_balanced = pd.concat([df_label_0_balanced, df_label_1_balanced])

# Tampilkan jumlah data untuk setiap label setelah penyeimbangan
count_label_0 = df_balanced[df_balanced['label'] == 0].shape[0]
count_label_1 = df_balanced[df_balanced['label'] == 1].shape[0]
print(f"Jumlah data setelah penyeimbangan - Label 0: {count_label_0}, Label 1: {count_label_1}")

# Hitung ukuran dataset dan tentukan berapa baris yang diperlukan untuk mencapai 100 MB
file_size = os.path.getsize(file_path)
target_size = 500 * 1024 * 1024  # 100 MB
total_rows = len(df_balanced)
rows_for_100mb = int(total_rows * (target_size / file_size))

# Ambil sampel sebesar 100 MB dari data yang sudah seimbang
df_pandas_sampled = df_balanced.sample(n=rows_for_100mb, random_state=42)

# Cek ukuran dataset setelah disampling
memory_usage_bytes = df_pandas_sampled.memory_usage(deep=True).sum()
memory_usage_mb = memory_usage_bytes / (1024 * 1024)
print(f"Size Dataset setelah dipotong dan seimbang: {memory_usage_mb:.2f} MB.")

# Tampilkan jumlah data dengan label 0 dan 1 pada sampel akhir
count_label_0_sampled = df_pandas_sampled[df_pandas_sampled['label'] == 0].shape[0]
count_label_1_sampled = df_pandas_sampled[df_pandas_sampled['label'] == 1].shape[0]
print(f"Jumlah data pada sampel 100MB - Label 0: {count_label_0_sampled}, Label 1: {count_label_1_sampled}")

output_path = "/kaggle/working/reddit_depression_dataset_sampled.csv"
df_pandas_sampled.to_csv(output_path, index=False)

  df_pandas = pd.read_csv(file_path)


Size Dataset: 1921.14 MB.
Jumlah data setelah penyeimbangan - Label 0: 480411, Label 1: 480411
Size Dataset setelah dipotong dan seimbang: 452.18 MB.
Jumlah data pada sampel 100MB - Label 0: 216280, Label 1: 216018


## Analysis Dataset

### Info Dataset

In [None]:
df_pandas.info()

### Info Data Kosong

In [None]:
df_pandas.isnull().sum()

### Visualisasi Distribusi Label

In [None]:
sns.countplot(x='label', data=df_pandas_sampled)
plt.title('Distribusi Label')
plt.xlabel('Label')
plt.ylabel('Jumlah')
plt.show()

# Pre-Processing

## Drop data yang kosong

In [None]:
df = spark.createDataFrame(df_pandas_sampled)

df_clean = df.dropna(subset=['label', 'title', 'body'])
df_clean = df.dropna(subset=['upvotes', 'num_comments'])

## Tokenisasi

In [None]:
df_clean = df_clean.withColumn('text', F.concat(F.col('title'), F.lit(' '), F.col('body')))

tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
df_tokenized = tokenizer.transform(df_clean)

## Visualisasi Distribusi Panjang Teks

In [None]:
df_pandas_clean = df_clean.toPandas()
df_pandas_clean['text_length'] = df_pandas_clean['text'].apply(lambda x: len(x.split()))

plt.figure(figsize=(10,6))
sns.histplot(df_pandas_clean['text_length'], bins=30, kde=True)
plt.title('Distribusi Panjang Teks')
plt.xlabel('Jumlah Kata dalam Teks')
plt.ylabel('Frekuensi')
plt.show()

## Hapus StopWords

In [None]:
remover = StopWordsRemover(inputCol='tokens', outputCol='filtered_tokens')
df_no_stopwords = remover.transform(df_tokenized)

## Vektorisasi

In [None]:
hashing_tf = HashingTF(inputCol='filtered_tokens', outputCol='raw_features', numFeatures=10000)
df_featurized = hashing_tf.transform(df_no_stopwords)

idf = IDF(inputCol='raw_features', outputCol='features')
idf_model = idf.fit(df_featurized)
df = idf_model.transform(df_featurized)

df.show()

## Hapus Column yang tidak perlu

In [None]:
df = df.drop("Unnamed: 0", "subreddit", "title", "body", "tokens", "filtered_tokens", "text", "raw_features", "created_utc")
df.printSchema()

## Cek Data

In [None]:
from pyspark.sql.functions import col, sum

# Count null or NaN values in each column
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

In [None]:
df.show()

# Processing

## Splitting

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['upvotes', 'num_comments', 'features'], outputCol='final_features')

df_combined = assembler.transform(df)

df_combined = df_combined.select('final_features', 'label')

train_data, test_data = df_combined.randomSplit([0.8, 0.2], seed=42)

## Random Forest

### Import Modul

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Jalankan Algo

In [None]:
rf_classifier = RandomForestClassifier(labelCol='label', featuresCol='final_features', numTrees=100)

rf_model = rf_classifier.fit(train_data)

predictions = rf_model.transform(test_data)

predictions.show(10)

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")

f1_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
f1_score = f1_evaluator.evaluate(predictions)
print(f"F1-Score: {f1_score:.4f}")

### Visulisasi Confusion Matrix 

In [None]:
from sklearn.metrics import confusion_matrix

# Mengambil prediksi dan label sebenarnya dari PySpark ke Pandas
predictions_pd = predictions.select('label', 'prediction').toPandas()

# Membuat confusion matrix
cm = confusion_matrix(predictions_pd['label'], predictions_pd['prediction'])

# Visualisasi confusion matrix
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Prediksi')
plt.ylabel('Label Sebenarnya')
plt.show()

### Visualisasi ROC Curve and AUC

In [None]:
from sklearn.metrics import roc_curve, auc

# Mengambil probabilitas prediksi dari PySpark ke Pandas
y_true = predictions_pd['label']
y_scores = predictions.select('probability').toPandas()['probability'].apply(lambda x: x[1])

# Menghitung ROC Curve dan AUC
fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

# Visualisasi ROC Curve
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

### Save Model

In [None]:
rf_model_path = "/kaggle/working/random_forest_model"
rf_model.save(rf_model_path)

print(f"Random Forest model saved at {rf_model_path}")

## Decision Tree

### Import Modul

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

### Jalankan Algo

In [None]:
dt_classifier = DecisionTreeClassifier(labelCol='label', featuresCol='final_features')

dt_model = dt_classifier.fit(train_data)

predictions = dt_model.transform(test_data)

predictions.select('label', 'prediction', 'probability').show(10)

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")

f1_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
f1_score = f1_evaluator.evaluate(predictions)
print(f"F1-Score: {f1_score:.4f}")

### Visualisasi Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix

# Mengambil prediksi dan label sebenarnya dari PySpark ke Pandas
predictions_pd = predictions.select('label', 'prediction').toPandas()

# Membuat confusion matrix
cm = confusion_matrix(predictions_pd['label'], predictions_pd['prediction'])

# Visualisasi confusion matrix
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Prediksi')
plt.ylabel('Label Sebenarnya')
plt.show()

### Visualisasi ROC Curve and AUC

In [None]:
from sklearn.metrics import roc_curve, auc

# Mengambil probabilitas prediksi dari PySpark ke Pandas
y_true = predictions_pd['label']
y_scores = predictions.select('probability').toPandas()['probability'].apply(lambda x: x[1])

# Menghitung ROC Curve dan AUC
fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

# Visualisasi ROC Curve
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

### Simpan Model

In [None]:
dt_model_path = "/kaggle/working/decision_tree_model"
dt_model.save(dt_model_path)

print(f"Decision Tree model saved at {dt_model_path}")

## Gradient Boosted Tree

### Import Modul

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Jalankan Algo

In [None]:
gbt_classifier = GBTClassifier(labelCol='label', featuresCol='final_features', maxIter=10)

gbt_model = gbt_classifier.fit(train_data)

predictions = gbt_model.transform(test_data)

predictions.select('label', 'prediction', 'probability').show(10)

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")

f1_evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='f1')
f1_score = f1_evaluator.evaluate(predictions)
print(f"F1-Score: {f1_score:.4f}")

### Visualisasi Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix

# Mengambil prediksi dan label sebenarnya dari PySpark ke Pandas
predictions_pd = predictions.select('label', 'prediction').toPandas()

# Membuat confusion matrix
cm = confusion_matrix(predictions_pd['label'], predictions_pd['prediction'])

# Visualisasi confusion matrix
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Prediksi')
plt.ylabel('Label Sebenarnya')
plt.show()

### Visualisasi ROC Curve and AUC

In [None]:
from sklearn.metrics import roc_curve, auc

# Mengambil probabilitas prediksi dari PySpark ke Pandas
y_true = predictions_pd['label']
y_scores = predictions.select('probability').toPandas()['probability'].apply(lambda x: x[1])

# Menghitung ROC Curve dan AUC
fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

# Visualisasi ROC Curve
plt.figure(figsize=(8,6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

### Simpan Model

In [None]:
gbt_model_path = "/kaggle/working/gbt_model"
gbt_model.save(gbt_model_path)

print(f"GBT model saved at {gbt_model_path}")