**Mounting google drive so i can access the data**

In [1]:
from google.colab import drive
import os
drive.mount('/content/gdrive')
data_dir = os.path.join(os.getcwd(),'gdrive','My Drive','Colab Notebooks','Text_Data')
file_path = os.path.join(data_dir,'sarcasm_headline_dataset.csv')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


**Setting up pyspark and spark-nlp on google-colabs**

In [2]:
# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version
# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4
# Install Spark NLP
! pip install --ignore-installed -q spark-nlp==2.4.5 

openjdk version "1.8.0_272"
OpenJDK Runtime Environment (build 1.8.0_272-8u272-b10-0ubuntu1~18.04-b10)
OpenJDK 64-Bit Server VM (build 25.272-b10, mixed mode)


**Importing spark-nlp libraries**

In [3]:
import sparknlp
from sparknlp.base import DocumentAssembler,Finisher
from sparknlp.annotator import SentenceDetector, Tokenizer, Normalizer, StopWordsCleaner, Lemmatizer

**Importing pyspark libraries**

In [4]:
from pyspark.sql.functions import * 
from pyspark.sql.types import * 
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import *
from pyspark.ml.feature import Word2Vec,StringIndexer
from pyspark.ml import Pipeline

**Loading data and doing some sanity checking**

In [5]:
spark = sparknlp.start() 

df = spark.read.csv(file_path,inferSchema=True,header=True)
df.printSchema()

root
 |-- HEADLINE: string (nullable = true)
 |-- IS_SARCASTIC: string (nullable = true)



In [6]:
print("Number of rows(sentences): {}".format(df.count()))


Number of rows(sentences): 26709


In [7]:
df.toPandas()['IS_SARCASTIC'].value_counts()

0                                                      14976
1                                                      11723
 honey?"""                                                 1
 report finds"                                             1
 hate the sin"" won't cut it anymore"                      1
"" says stern woman who likely does not menstruate"        1
 i'm sorry                                                 1
"" sugar ""daddy                                           1
 they're a part of it"""                                   1
 at $40                                                    1
"" but nobody's laughing"                                  1
"" ""the mapmaker's opera                                  1
Name: IS_SARCASTIC, dtype: int64

**The "IS_SARCASTIC" column should contain either a 0 (not a sarcastic headline) or 1 (a sarcastic headline). Use filter to drop rows(10) that aren't a 0 or 1**

In [8]:
df = df.filter((df["IS_SARCASTIC"] == '0') | (df["IS_SARCASTIC"] == '1'))
df.toPandas()['IS_SARCASTIC'].value_counts()
print("Number of rows(sentences) after filtering: {}".format(df.count()))

# Splitting the data into a train and test split.
# My main focus here is on preprocessing not model performance.
# If model performance were my main focus, i would do a train, test, validation split
train,test = df.randomSplit([0.7,0.3],seed=42)

Number of rows(sentences) after filtering: 26699


**Building the spark-nlp pipeline that will preprocess the sentences into the format they need to be in before deriving word2vec embeddings from them**

In [9]:
docAssembler = DocumentAssembler().setInputCol("HEADLINE") \
                                  .setOutputCol("document") \
                                  .setCleanupMode("shrink_full")

sentence_detector = SentenceDetector().setInputCols(["document"]).setOutputCol("sentence")

tokenizer = Tokenizer().setInputCols(['sentence']).setOutputCol("tokens")

normalizer = Normalizer().setInputCols(["tokens"]) \
                         .setOutputCol("normalized_tokens") \
                         .setLowercase(True) \
                         .setCleanupPatterns(["[^\w\d\s]"])

remove_stopwords = StopWordsCleaner().setInputCols(["normalized_tokens"]) \
                                    .setOutputCol("clean_tokens") \
                                    .setCaseSensitive(False)

# Get lemmatizer dictionary
!wget -q https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt
lemmatizer = Lemmatizer().setInputCols(["clean_tokens"]) \
                         .setOutputCol("lemmatized_token") \
                         .setDictionary("./AntBNC_lemmas_ver_001.txt", value_delimiter ="\t", key_delimiter = "->")

finisher = Finisher().setInputCols(["lemmatized_token"]) \
                     .setOutputCols("lemma_bow") \
                     .setIncludeMetadata(False)

pipeline_stages =[docAssembler, sentence_detector, 
                  tokenizer, normalizer, remove_stopwords, 
                  lemmatizer, finisher]

In [10]:
# fitting the preprocessing pipeline to the train data
nlpPipeline = Pipeline(stages=pipeline_stages)
preprocessingModel = nlpPipeline.fit(train)

# using the preprocessing pipeline to transform the train and test split
preprocessed_train = preprocessingModel.transform(train)
preprocessed_test = preprocessingModel.transform(test)

preprocessed_train.printSchema()
preprocessed_test.printSchema()

root
 |-- HEADLINE: string (nullable = true)
 |-- IS_SARCASTIC: string (nullable = true)
 |-- lemma_bow: array (nullable = true)
 |    |-- element: string (containsNull = true)

root
 |-- HEADLINE: string (nullable = true)
 |-- IS_SARCASTIC: string (nullable = true)
 |-- lemma_bow: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [14]:
#Identifying the longest sentence (by number of words after preprocessing) and using the size to set the
#word2vec vector size.
vector_dimension = preprocessed_train.withColumn("lemma_bow_size",size("lemma_bow")).agg({"lemma_bow_size":"max"}).collect()[0][0]

print(vector_dimension)

27


**Building the Spark MLLib Binary Text Classification pipe line**

In [15]:
indexer = StringIndexer(inputCol="IS_SARCASTIC", outputCol="label")

word2Vec = Word2Vec(vectorSize= vector_dimension, minCount=0, inputCol="lemma_bow", outputCol="features")

classifier = LogisticRegression()

**Training and evaluating a basic logistic regression model for binary classification**

In [16]:
ml_stages =[indexer,word2Vec,classifier]

ml_pipeline = Pipeline(stages=ml_stages)

In [17]:
model = ml_pipeline.fit(preprocessed_train)
predictionAndLabels = model.transform(preprocessed_test)

In [18]:
auc_roc_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',metricName='areaUnderROC')
pr_roc_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',metricName='areaUnderPR')
precision_evaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(metricName="f1")

In [20]:
print("AUC ROC: {0:.2f}".format(auc_roc_evaluator.evaluate(predictionAndLabels)))
print("AUC PR: {0:.2f}".format(pr_roc_evaluator.evaluate(predictionAndLabels)))
print("Precision: {0:.2f}".format(precision_evaluator.evaluate(predictionAndLabels)))
print("Recall ROC: {0:.2f}".format(recall_evaluator.evaluate(predictionAndLabels)))
print("F1: {0:.2f}".format(f1_evaluator.evaluate(predictionAndLabels)))

AUC ROC: 0.60
AUC PR: 0.54
Precision: 0.62
Recall ROC: 0.62
F1: 0.61
