# **Sarcasm detection on Reddit**

In this notebook, we addressed a binary classification problem aimed at detecting sarcasm in Reddit comments.

We chose 3 feature engineering methods and 4 classification models and compared them to find out which combination performs better:
- Feature engineering: TF-IDF, Word2Vec, and BERT.

- Classification models: Logistic regression, Random forest, Linear SVC, and Multilayer perceptron.

## **Spark Setup**


### 1. Install PySpark and related dependencies

In [None]:
# java
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop2.7.tgz
# !tar xf spark-3.2.4-bin-hadoop2.7.tgz
# !rm -rf /content/spark-3.2.4-bin-hadoop2.7.tgz
!pip install findspark

In [None]:
# !update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version

In [None]:
# setting java path as environment variable
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
!pip install pyspark
!pip install spark-nlp==4.4.2

In [None]:
!python --version

In [None]:
!pyspark --version

### 2. Import useful Python packages

In [None]:
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import DataFrame
from pyspark.ml.feature import VectorAssembler, SQLTransformer, Normalizer
from pyspark.sql.functions import udf, col, transform

import sparknlp

import gc

### 3. Create Spark context

In [None]:
import findspark
findspark.init()

In [None]:
spark = SparkSession.builder \
    .appName("Spark NLP") \
    .master("local[4]") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.executor.memoryOverhead", "1g") \
    .config("spark.driver.memory","32G") \
    .config("spark.python.worker.memory","32G") \
    .config("spark.sql.analyzer.maxIterations", "6000") \
    .config("spark.driver.cores", "10") \
    .config("spark.driver.maxResultSize", "32G") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.2") \
    .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
    .getOrCreate()

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

In [None]:
sc = spark.sparkContext
type(sc)

## **Dataset**

The original dataset used in this notebook is available [here](https://www.kaggle.com/datasets/toygarr/datasets-for-natural-language-processing), in the *sarcasm* folder.
This dataset contains 30k sarcastic comments from the Internet commentary website Reddit.
The comments are pre-processed by making lower case and removing punctuations, hashtags, usernames and html tags.

### Download dataset from Kaggle

In order to get the dataset from Kaggle library, a kaggle.json file is required. It can be downloaded from our personal account page and uploaded in the same folder of this notebook.

In [None]:
# 1. Install the Kaggle library
! pip install kaggle

# 2. Make a directory named “.kaggle”
! mkdir ~/.kaggle

# 3. Copy the “kaggle.json” into this new directory
! cp kaggle.json ~/.kaggle/

# 4. Allocate the required permission for this file.
! chmod 600 ~/.kaggle/kaggle.json

# 5. Download the dataset
! kaggle datasets download toygarr/datasets-for-natural-language-processing

# unzip the directory
! unzip /content/datasets-for-natural-language-processing.zip

# delete zip file
! rm -rf /content/datasets-for-natural-language-processing.zip

# Make a directory to save final models
! mkdir models

### Data loading

In [None]:
def load_dataset():

    df1 = spark.read.csv("data/sarcasm/train.csv", header=True)
    df2 = spark.read.csv("data/sarcasm/test.csv", header=True)

    df = df1.union(df2)

    #####################################

    # rename columns
    print("Dataframe size (n. of rows): {:d}\n".format(df.count()))
    df = df.withColumnRenamed("Y", "label")
    df = df.withColumnRenamed("text", "comment")
    print("Dataframe schema:")
    df.printSchema()
    df.show(10)

    #####################################

    print("Number of NULL comments: {:d}".format(df.where(col("comment").isNull()).count()))
    print("Number of NULL labels: {:d}\n".format(df.where(col("label").isNull()).count()))
    # remove NULL entry/ies
    df = df.na.drop(subset=["comment"])
    # remove NULL labels
    df = df.na.drop(subset=["label"])

    return df

### Data Pre-processing

In [None]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col, rand

In [None]:
def clean_text(df, column_name="comment"):

    # Clean text
    print("***** Clean text *****\n")

    cleaned_df = df.dropDuplicates(["comment", "label"])

    # remove rows with same comments but different labels
    cleaned_df_2 = cleaned_df.groupby("comment").agg(count("comment").alias("polletti")).filter(col("polletti")>1)
    cleaned_df = cleaned_df.join(cleaned_df_2, ["comment"], 'left_anti')

    cleaned_df_2.unpersist()
    del cleaned_df_2

    cleaned_df.show(10)

    print("Let's see our dataset's class distribution:")
    cleaned_df.groupBy('label').count().show()

    # The dataset is not balanced, we apply a downsampling so that the two classes had the same number of samples.
    df_0 = cleaned_df.filter(col("label") == 0).orderBy(rand()).limit(cleaned_df.filter(col("label") == 1).count())
    df_1 = cleaned_df.filter(col("label") == 1)

    cleaned_df = df_0.union(df_1)

    print("Let's check if our dataset is now balanced:")
    # Now we have 13_552 samples per class
    cleaned_df.groupBy('label').count().show()

    return cleaned_df

## **Feature Engineering**

### **BERT Sentence**

*Bidirectional Encoder Representations from Transformers* (BERT) is a state-of-the-art transformer-based model for natural language processing (NLP). It is designed to capture contextual information from text by using a deep bidirectional architecture.
BERT generates contextualized word embeddings, known as BERT embeddings, which are pre-trained on large amounts of unlabeled text and then fine-tuned for specific NLP tasks.

The BERT model used in this notebook provides a **sentence-level embedding** using the BERT architecture. It takes as input a sequence of sentences and outputs a single embedding vector representing the entire sentence. The model captures the contextual information of the entire sentence and produces a fixed-length representation that captures the overall meaning or sentiment.

In [None]:
from sparknlp.annotator import BertSentenceEmbeddings
from sparknlp import EmbeddingsFinisher

In [None]:
def bert_sentence_embedding(df, save_parquet=False):

    # convert the text into a Spark-NLP annotator-ready form
    documentAssembler = DocumentAssembler() \
     .setInputCol('comment') \
     .setOutputCol('document')

    # sent_bert_base_uncased -> the model is imported from https://tfhub.dev/google/bert_uncased_L-12_H-768_A-12/1
    embeddings = BertSentenceEmbeddings.pretrained("sent_bert_base_uncased", "en") \
                .setInputCols("document") \
                .setOutputCol("embeddings")

    finisher = EmbeddingsFinisher() \
              .setInputCols("embeddings") \
              .setOutputCols("embeddings_result")

    bert_pipeline = Pipeline(stages=[documentAssembler, embeddings, finisher])

    df_bert = bert_pipeline.fit(df).transform(df) \
                         .select(col('comment'), col('label').cast(FloatType()), explode("embeddings_result").alias('features')) \
                         .select(['comment'] + ['label'] + [expr('features[' + str(x) + ']') for x in range(768)])

    ### VectorAssembler ###
    vector_assembler = VectorAssembler(inputCols=df_bert.columns[2:], outputCol='features').setHandleInvalid("skip")

    features_assembled = vector_assembler.transform(df_bert).select('comment', 'features', 'label')

    df_bert.unpersist()
    del df_bert

    print("Dataframe schema:")
    features_assembled.printSchema()

    # garbage collector
    gc.collect()

    if save_parquet:
        path_parquet = "bert_sentence_features.parquet"
        features_assembled.write.mode("overwrite").parquet(path_parquet)

    return features_assembled

### Split dataset
Split original dataset into 2 subdatasets: training and testing.

In [None]:
def data_split(df_features):
    print("Dataframe size in data_split: {:d} instances".format(df_features.count()))

    # Randomly split our original dataset into 80÷20 for training and test, respectively
    train_df, test_df = df_features.randomSplit([0.8, 0.2], seed = 42)

    df_features.unpersist()
    del df_features

    print("Training set size: {:d} instances".format(train_df.count()))
    print("Test set size: {:d} instances".format(test_df.count()))

    print("\nLet's verify our datasets are still balanced:")
    train_df.groupBy('label').count().show()
    test_df.groupBy('label').count().show()

    return train_df, test_df

## **Classification models**

### **Logistic regression**

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
def logistic_regression_classifier(train_df, test_df, suffix_path="", save_model=False):

    lr = LogisticRegression(featuresCol='features',
                          labelCol='label',
                          maxIter=100,
                          regParam=0.3,
                          elasticNetParam=0.8)

    ### Search for the best model's parameters. ###

    # We use a ParamGridBuilder to construct a grid of parameters to search over
    param_grid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.0, 0.05, 0.1]) \
                .addGrid(lr.elasticNetParam, [0.5, 0.8, 1.0]) \
                .build()

    cross_val = CrossValidator(estimator=lr,
                             estimatorParamMaps=param_grid,
                             evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                             numFolds=5,
                             parallelism=5)

    # Run cross-validation, and choose the best set of parameters.
    cv_model = cross_val.fit(train_df)

    best_lr = cv_model.bestModel

    print("--- Best model's parameters: ---\n- reParam = {:f}\n- elasticNetParam = {:f}".format(best_lr.getRegParam(), best_lr.getElasticNetParam()))

    lr_predictions = best_lr.transform(test_df)
    lr_prediction.show(10)

    if save_model:
        path = "models/logisticRegression_" + suffix_path
        best_lr.write().overwrite().save(path)

    # garbage collector
    gc.collect()

    return lr_predictions

### **Random forest**

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

In [None]:
def random_forest_classifier(train_df, test_df, suffix_path='', save_model=False):

    rf = RandomForestClassifier(featuresCol='features',
                              labelCol='label')

    ### Search for the best model's parameters. ###

    # We use a ParamGridBuilder to construct a grid of parameters to search over
    param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [3, 5, 8]) \
    .addGrid(rf.numTrees, [5, 10]) \
    .build()

    cross_val = CrossValidator(estimator=rf,
                             estimatorParamMaps=param_grid,
                             evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                             numFolds=5,
                             parallelism=5)

    # Run cross-validation, and choose the best set of parameters.
    cv_model = cross_val.fit(train_df)

    best_rf = cv_model.bestModel

    print("--- Best model's parameters: ---\n- maxDepth = {:d}\n- numTrees = {:d}".format(best_rf.getMaxDepth(), best_rf.getNumTrees()))

    rf_predictions = best_rf.transform(test_df)
    rf_predictions.show(10)

    if save_model:
        path = "models/randomForest_" + suffix_path
        best_rf.write().overwrite().save(path)

    # garbage collector
    gc.collect()

    return rf_predictions

### **Linear Support Vector Machine**

In [None]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
def linear_svc_classifier(train_df, test_df, suffix_path='', save_model=False):

    lsvc = LinearSVC(featuresCol='features', labelCol='label',
                   maxIter=10, regParam=0.1)

    ### Search for the best model's parameters. ###
    
    # We use a ParamGridBuilder to construct a grid of parameters to search over
    param_grid = ParamGridBuilder() \
                .addGrid(lsvc.regParam, [0.0, 0.05, 0.1]) \
                .addGrid(lsvc.maxIter, [50, 100]) \
                .build()

    cross_val = CrossValidator(estimator=lsvc,
                             estimatorParamMaps=param_grid,
                             evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                             numFolds=5,
                             parallelism=5)

    # Run cross-validation, and choose the best set of parameters.
    cv_model = cross_val.fit(train_df)

    best_lsvc = cv_model.bestModel

    print("--- Best model's parameters: ---\n- regParam = {:f}\n- maxIter = {:d}".format(best_lsvc.getRegParam(), best_lsvc.getMaxIter()))

    lsvc_predictions = best_lsvc.transform(test_df)
    lsvc_predictions.show(10)

    if save_model:
        path = "models/linearSVC_" + suffix_path
        best_lsvc.write().overwrite().save(path)

    # garbage collector
    gc.collect()

    return lsvc_predictions

### **Multilayer perceptron**

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [None]:
def mlp_classifier(train_df, test_df, suffix_path='', save_model=False):

    # specify layers for the neural network:
    input_layer = 768 # features
    output_layer = 2 # number of classes
    layers = [input_layer, 64, 32, output_layer]

    mlp = MultilayerPerceptronClassifier(featuresCol='features', labelCol='label',
                                       maxIter=100, layers=layers, blockSize=256, seed=42)

    mlp_model = mlp.fit(train_df)

    print("--- Model's parameters: ---\n- layers = {:s}".format(str(mlp_model.getLayers())))

    mlp_predictions = mlp_model.transform(test_df)
    mlp_predictions.show(10)

    if save_model:
        path = "models/mlp_" + suffix_path
        mlp_model.write().overwrite().save(path)

    # garbage collector
    gc.collect()

    return mlp_predictions

## **Evaluation**

In [None]:
from sklearn.metrics import classification_report, matthews_corrcoef, roc_auc_score, average_precision_score
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
def evaluation(model_predictions):
    print("***** Test Set *****")

    preds_df = model_predictions.select("comment", "label", "prediction").toPandas()

    print("\nShow some examples of miss-predictions: ")
    display(preds_df[preds_df['prediction'] != preds_df['label']].head(100))

    print("\nClassification report:")
    clf_report = classification_report(y_true=preds_df['label'], y_pred=preds_df['prediction'], zero_division=0)
    print(clf_report)

    evaluator = BinaryClassificationEvaluator()
    evaluator.setRawPredictionCol('rawPrediction')

    # calculate AUROC
    print('Area Under ROC Curve (AUROC): %.3f' % evaluator.evaluate(model_predictions, {evaluator.metricName: "areaUnderROC"}))

    # calculate AUPR
    print('Area Under Precision-Recall Curve (AUPR): %.3f' % evaluator.evaluate(model_predictions, {evaluator.metricName: "areaUnderPR"}))

    # calculate MCC
    print("Matthews Correlation Coefficient (MCC): ", matthews_corrcoef(preds_df['label'], preds_df['prediction']))

    print("\n***** Test Set *****")

## **Experiments**

### **Data loading and pre-processing**

In [None]:
df = load_dataset()

In [None]:
cleaned_df = clean_text(df)

In [None]:
df.unpersist()
del df

print("Garbage collector: collected %d objects" % (gc.collect()))

### **Models + BERT Sentence**

In [None]:
# features_df_bert = spark.read.parquet("bert_sentence_features.parquet")
features_df_bert = bert_sentence_embedding(cleaned_df, save_parquet=False)

In [None]:
cleaned_df.unpersist()
del cleaned_df

gc.collect()

In [None]:
train_df_bert, test_df_bert = data_split(features_df_bert)

In [None]:
features_df_bert.unpersist()
del features_df_bert

In [None]:
model_predictions = logistic_regression_classifier(train_df_bert, test_df_bert, "BERT_sentence")
evaluation(model_predictions)

In [None]:
model_predictions = random_forest_classifier(train_df_bert, test_df_bert, "BERT_sentence")
evaluation(model_predictions)

In [None]:
model_predictions = linear_svc_classifier(train_df_bert, test_df_bert, "BERT_sentence")
evaluation(model_predictions)

In [None]:
model_predictions = mlp_classifier(train_df_bert, test_df_bert, "BERT_sentence",save_model=True)
evaluation(model_predictions)

In [None]:
train_df_bert.unpersist()
test_df_bert.unpersist()

del train_df_bert
del test_df_bert