# DSCI632 - Fake News Detection

## PySpark Setup

In [1]:
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop2.7.tgz
!tar xf spark-3.2.0-bin-hadoop2.7.tgz

In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop2.7"

!pip install -q findspark
!pip install pyspark

!git clone https://github.com/avivfaraj/DSCI632-Project.git

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=0d8144446f0984369cfdf074276e1382ef311216cd22c9e486e286ba3700262f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Cloning into 'DSCI632-Project'...
remote: Enumerating objects: 85, done.[K
remote: Counting objects: 100% (85/85), done.[K
remote: Compressing objects: 100% (57/57), done.[K
remote: Total 85 (delta 17), reused 65 (d

## Important packages

In [3]:
# NLP
import nltk
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords


# Pandas is required to read the data.
# For some reason pyspark can't read the csv file correctly
# So we have to read using pandas and then convert to spark DF
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.ml.feature import IDF, Tokenizer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import when, col, regexp_replace, concat, lit, length
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml.classification import NaiveBayesModel, NaiveBayes
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [4]:
def evaluate(df, labelCol = "label", predCol = "prediction"):
    TP = df.filter((col(labelCol) == 0) & (col(predCol) == 0)).count()
    FN = df.filter((col(labelCol) == 1) & (col(predCol) == 0)).count()
    FP = df.filter((col(labelCol) == 0) & (col(predCol) == 1)).count()
    TN = df.filter((col(labelCol) == 1) & (col(predCol) == 1)).count()

    precision = (TP)/(TP+FP)
    recall = (TP)/(TP+FN)
    print("Accuracy: %.3f" % float((TP+TN)/(TP+TN+FP+FN)))
    print("Recall: %.3f" % float(recall))
    print("Precision: %.3f" % float(precision))
    print("F1 Score: %.3f" % float(2*(precision * recall)/(precision +recall)))

    (df
        .crosstab('label','prediction')
        .withColumnRenamed("label_prediction", "label\prediction")
        .orderBy("label\prediction", asceding = False)
        .show()
    )

    return ([[TP, FP], [FN, TN]], precision, recall)

## Spark Session \& Reading Dataset

In [5]:
spark = SparkSession.builder.appName("Final Project").master("local[*]").getOrCreate()

mySchema = StructType([ StructField("index", IntegerType(), True)\
                       ,StructField("title", StringType(), True)\
                       ,StructField("author", StringType(), True)\
                       ,StructField("text", StringType(), True)\
                       ,StructField("label", IntegerType(), True)])

path = "./DSCI632-Project/data/train.csv"
pandas_df = pd.read_csv(path, header = 0)

spark_df = spark.createDataFrame(pandas_df, schema = mySchema)


## Pre-processing

### Cleaning Dataset

In [6]:
# Deleting all rows that are missing text
# df_rmv_nan_text = spark_df.filter(col("text") != "NaN")
df_rmv_nan_text = spark_df.filter(length(col("text")) > 60)

# There are a lot of NaN in the dataset.
# Those are Null values in pandas that were
# Converted to NaN string in spark df.
# Since it is a string, it will not be recognized by na() methods
# So, we have to manually change their value:
df_no_nan = (df_rmv_nan_text
             .withColumn("title", when(col("title") == "NaN", " ")
                                            .otherwise(col("title")))
             )


## NOTE: Later on we will use Tokenizer from PySpark MLlib. This tokenizer
##       takes care of converting all characters to lowercase, so it is
##       not required in this step.

# Remove non-character from title and text
df_clean = (df_no_nan

                 ## Removing any non-character from title
                .withColumn("title", 
                            regexp_replace(
                                col('title'),
                                r'[^\w\’ ]',''))
                
                ## Removing any non-character from text
                .withColumn("text", 
                            regexp_replace(
                                col('text'),
                                r'[^\w\’ ]',''))
                
                ## Replacing 2 or more whitespaces with 1 whitespace
                .withColumn("text", 
                            regexp_replace(
                                col('text'),
                                r'[ ]{2,}',' '))
                
                ## Replacing 2 or more whitespaces with 1 whitespace
                .withColumn("title", 
                            regexp_replace(
                                col('text'),
                                r'[ ]{2,}',' '))
                )


# Concatenation of title and text when title doesn't appear in text
df_combined = (df_clean
                    .withColumn('full_text',
                                  when(col("text").contains(
                                                    concat(col("title"))),
                                                    col("text"))
                                  
                                  .otherwise(concat(col("title"),
                                                    lit(" "),
                                                    col("text"))))
                    .select(["full_text","label"])
                    .withColumn("label", col("label").cast(DoubleType()))
                    .dropDuplicates()
                )


# Clean memory             
del df_rmv_nan_text, df_no_nan, df_clean

# Sanity Check
print(df_combined.count())
df_combined.show(7)

20210
+--------------------+-----+
|           full_text|label|
+--------------------+-----+
|0 0 AP N1 26 27 1...|  1.0|
|GREENBELT Md The ...|  0.0|
|The Minnesota off...|  0.0|
|GeoEngineering Un...|  1.0|
|Following a fight...|  0.0|
|The military indu...|  1.0|
|Insists Russia De...|  1.0|
+--------------------+-----+
only showing top 7 rows



### Check Class Balance

Still balanced!

In [7]:
df_combined.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|10385|
|  1.0| 9825|
+-----+-----+



### Stopwords

In [8]:
try:
    stopwords_ls = stopwords.words('english')
except:
    nltk.download("stopwords")
    stopwords_ls = stopwords.words('english')

# Sanity Check
stopwords_ls[:10]

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're"]

### Stemmer Class

In [9]:
####################### Code Citation #######################
# Author: Clare S. Y. Huang
# Date: 01 Aug 2020
# Title: Custom Transformer that can be fitted into Pipeline
# URL: https://csyhuang.github.io/2020/08/01/custom-transformer/
#############################################################

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable


class Stemmer(Transformer, 
                 HasInputCol, 
                 HasOutputCol,
                 DefaultParamsReadable, 
                 DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol = "input", outputCol = "output"):
        super(Stemmer, self).__init__()
        kwargs = self._input_kwargs
        self.set_params(**kwargs)

    @keyword_only
    def set_params(self, inputCol = "input", outputCol = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)
    
    def get_input_col(self):
        return self.getOrDefault(self.inputCol)

    def get_output_col(self):
        return self.getOrDefault(self.outputCol)

    def _transform(self, df):

        # Input and output column
        input_col = self.get_input_col()
        output_col = self.get_output_col()

        # Initialize stemmer from nltk package
        ps = PorterStemmer()
        
        # User Defined Function: stemming every word in the input column
        transform_udf = F.udf(lambda x: [ps.stem(word) for word in x], ArrayType(StringType(), False))

        # Return the new df with the new column
        return df.withColumn(output_col, transform_udf(input_col))

# Sanity check
# words = Tokenizer(inputCol="text", outputCol="words").transform(spark_df)
# test = Stem(inputCol = "words", outputCol = "test").transform(words)
# test.select(["words", "test"]).show(4)

## Dummy Classifier

In [10]:
df_dummy = (spark_df
                .withColumn("prediction", 
                    when((col("author") == "NaN") | (col("title") == "NaN") , 1.0)
                    .otherwise(0.0))
                .withColumn("label", col("label").cast(FloatType()))
                .select(["label","prediction"])
            )

df_dummy.show(7)

# Sanity Check
evaluate(df_dummy, predCol = "prediction")

+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       0.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
+-----+----------+
only showing top 7 rows

Accuracy: 0.618
Recall: 0.567
Precision: 0.997
F1 Score: 0.723
+----------------+-----+----+
|label\prediction|  0.0| 1.0|
+----------------+-----+----+
|             0.0|10361|  26|
|             1.0| 7924|2489|
+----------------+-----+----+



([[10361, 26], [7924, 2489]], 0.9974968710888611, 0.5666393218485097)

## Machine Learning

### Split Data

In [11]:
# Split data to train and test
train, test = df_combined.randomSplit([0.7,0.3], seed=2)

### Pipeline

***NOTE:*** Fitting the pipeline on the train set takes a while, so I ran it and saved it to a folder "pipeline" within the folder "model and pipeline". To test it, you can load the pipeline model and use it to transform the test and train dataframes in [Load Pipeline](#load). 

#### Pipeline Stages
Uncomment the below code to run define stages, and fit pipeline. If loading pipeline from the saved model, then skip this step.

In [12]:

# Define Stages for pipeline 

# # Stage 1 - Tokenizing words
# tokenizer = Tokenizer(inputCol="full_text", outputCol="full_text_words")

# # Stage 2 - Removing stop words (using nltk stop words)
# word_remover = StopWordsRemover(stopWords = stopwords_ls,
#                                 inputCol = "full_text_words",
#                                 outputCol = "full_text_words_clean")

# # Stage 3 - Lemmatizing each word using custom lemmatizer class
# stemmer = Stemmer(inputCol = "full_text_words_clean", outputCol = "stemmed")

# # Stage 4 - Term Frequency of every word
# tf = CountVectorizer(inputCol="stemmed", outputCol="features", vocabSize = 1e6)

# pipeline = Pipeline(stages= [tokenizer, word_remover, stemmer, tf]).fit(train)
# train_df = pipeline.transform(train).select(["full_text","features","label"])
# test_df = pipeline.transform(test).select(["full_text","features","label"])

#### Save Pipeline

In [13]:
# # #### Save Pipeline! ####
# pipeline.save("./DSCI632-Project/model and pipeline/pipeline")

<a name="load"></a>
#### Load Pipeline 

In [14]:
pipeline = PipelineModel.load("./DSCI632-Project/model and pipeline/pipeline")
train_df = pipeline.transform(train).select(["full_text","features","label"])
test_df = pipeline.transform(test).select(["full_text","features","label"])

### Naive Bayes

***NOTE:*** Fitting Naive Bayes model takes a while. I ran it and saved the model to a folder "Naive Bayes Model" within the folder "model and pipeline". 
To test it, skip the code under *Fit Naive Bayes*, and run [*Load Naive Bayes*](#load-na) instead.

#### Fit Naive Bayes

In [15]:
# from pyspark.ml.classification import NaiveBayes
# nb = NaiveBayes(labelCol="label", featuresCol="features", thresholds = [0.6, 0.4])
# nb_model = nb.fit(train_df)
# predictions_nb = nb_model.transform(test_df)

<a name="load-na"></a>
#### Load Naive Bayes

In [16]:
nb_model = NaiveBayesModel.load("./DSCI632-Project/model and pipeline/Naive Bayes Model")
predictions_nb = nb_model.transform(test_df)
predictions_nb.select(["label", "prediction"]).show()

+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows



#### Save Naive Bayes

In [17]:
# nb_model.save("./DSCI632-Project/model and pipeline/Naive Bayes Model")

### Testing

In [18]:
evaluate(predictions_nb.select(["label","prediction"]))

Accuracy: 0.921
Recall: 0.885
Precision: 0.973
F1 Score: 0.927
+----------------+----+----+
|label\prediction| 0.0| 1.0|
+----------------+----+----+
|             0.0|2985|  84|
|             1.0| 387|2540|
+----------------+----+----+



([[2985, 84], [387, 2540]], 0.9726295210166178, 0.8852313167259787)

In [19]:
predictions_dummy_rdd = df_dummy.select(["label", "prediction"]).rdd.map(tuple)
predictions_nb_rdd = predictions_nb.select(["label", "prediction"]).rdd.map(tuple)

# Instantiate metrics object
metrics_dummy = BinaryClassificationMetrics(predictions_dummy_rdd)
metrics_nb = BinaryClassificationMetrics(predictions_nb_rdd)

print("Area under ROC Baseline: " , metrics_dummy.areaUnderROC)
print("Area under ROC Naive Bayes: " , metrics_nb.areaUnderROC)



Area under ROC Baseline:  0.7781506748407558
Area under ROC Naive Bayes:  0.9266095608020137
