# We will build a NaiveBayes Classifier for detecting fake news
## For this we will use a tf-idf strategy.
## Some of the labels are not correct, because they were saved without stripping punctuation, some may include commas, which create different values than true or fake. Fortunately, they are not too many and we can drop them. Keep in mind that this notebook will run only with PySpark installed correctly on a machine and with the required dependencies installed. 
## The data has been imported from kaggle and preprocessed in the other notebook. You can find the repository here: https://www.kaggle.com/datasets/clmentbisaillon/fake-and-real-news-dataset

In [78]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tweet_classifier').getOrCreate()

In [79]:
data = spark.read.csv("data/fake_true_curated.csv",header=True)

In [80]:
data.show()

+--------------------+-----+
|               title|label|
+--------------------+-----+
|Ben Stein Calls O...| fake|
|Trump drops Steve...| true|
|Puerto Rico expec...| true|
| OOPS: Trump Just...| fake|
|Donald Trump head...| true|
| Paul Ryan Respon...| fake|
|AWESOME! DIAMOND ...| fake|
|STAND UP AND CHEE...| fake|
|North Korea shows...| true|
|Trump signals wil...| true|
|New Jersey's Chri...| true|
|WHERE’S HILLARY? ...| fake|
|France, Germany w...| true|
|Aide to EU Commis...| true|
|Trump Issues Warn...| fake|
|U.S. gives Laos e...| true|
|JUDGE DECLARES BA...| fake|
| Paul Ryan Takes ...| fake|
| Republicans Dine...| fake|
|U.S. House panel ...| true|
+--------------------+-----+
only showing top 20 rows



In [81]:
#Keep only the correctly labeled values

data = data.filter((data["label"] == "fake") | (data["label"] == "true"))

In [82]:
data.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
| fake|23477|
| true|21411|
+-----+-----+



In [83]:
from pyspark.sql.functions import length
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer
from pyspark.ml import Pipeline

In [84]:
# get the length of every title and compute the mean
data = data.withColumn("length",length(data["title"]))
data.head(5)

[Row(title='Ben Stein Calls Out 9th Circuit Court: Committed a ‘Coup d’état’ Against the Constitution', label='fake', length=89),
 Row(title='Trump drops Steve Bannon from National Security Council', label='true', length=55),
 Row(title='Puerto Rico expects U.S. to lift Jones Act shipping restrictions', label='true', length=64),
 Row(title=' OOPS: Trump Just Accidentally Confirmed He Leaked Israeli Intelligence To Russia (VIDEO)', label='fake', length=89),
 Row(title='Donald Trump heads for Scotland to reopen a golf resort', label='true', length=55)]

In [85]:
data.groupby('label').mean().show()

+-----+-----------------+
|label|      avg(length)|
+-----+-----------------+
| fake|94.19623461259957|
| true|64.67885666246322|
+-----+-----------------+



# Preprocess the data
We will create an input pipeline that is supposed to:
- take every title, 
- tokenizes it, 
- removes stopwords, 
- create a bag of words vector,
- computes TermFrequency-InverseDocumentFrequency
- creates a vector with only the tf-idf values and length values into a features vector

In [86]:

tokenizer = Tokenizer(inputCol="title", outputCol="token_title")

stopremove = StopWordsRemover(inputCol='token_title',outputCol='stop_tokens')

count_vec = CountVectorizer(inputCol='stop_tokens',outputCol='c_vec')

idf = IDF(inputCol="c_vec", outputCol="tf_idf")

true_fake_num = StringIndexer(inputCol='label',outputCol='class')

In [99]:
assembler = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [100]:
nb = NaiveBayes(labelCol="class")

In [101]:
pipeline = Pipeline(stages=[true_fake_num,tokenizer,stopremove,count_vec,idf,assembler])

In [102]:
cleaner = pipeline.fit(data)

                                                                                

In [103]:
clean_data = cleaner.transform(data)

In [104]:
clean_data = clean_data.select(['class','features'])

In [105]:
clean_data.show()

+-----+--------------------+
|class|            features|
+-----+--------------------+
|  0.0|(42584,[23,597,97...|
|  1.0|(42584,[0,71,133,...|
|  1.0|(42584,[3,257,342...|
|  0.0|(42584,[0,1,5,18,...|
|  1.0|(42584,[0,22,954,...|
|  0.0|(42584,[1,5,118,1...|
|  0.0|(42584,[2,169,442...|
|  0.0|(42584,[2,53,62,7...|
|  1.0|(42584,[3,17,29,9...|
|  1.0|(42584,[0,3,893,1...|
|  1.0|(42584,[8,16,174,...|
|  0.0|(42584,[12,3369,5...|
|  1.0|(42584,[46,124,37...|
|  1.0|(42584,[10,39,62,...|
|  0.0|(42584,[0,2,89,35...|
|  1.0|(42584,[3,88,140,...|
|  0.0|(42584,[80,665,10...|
|  0.0|(42584,[1,171,177...|
|  0.0|(42584,[1,5,33,98...|
|  1.0|(42584,[3,7,190,4...|
+-----+--------------------+
only showing top 20 rows



24/05/07 23:54:32 WARN DAGScheduler: Broadcasting large task binary with size 1207.5 KiB


In [106]:
(training,testing) = clean_data.randomSplit([0.7,0.3])
fake_predictor = nb.fit(training)

24/05/07 23:54:33 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
24/05/07 23:54:35 WARN DAGScheduler: Broadcasting large task binary with size 3.5 MiB
                                                                                

In [107]:
test_results = fake_predictor.transform(testing)

In [108]:
test_results.show()

24/05/07 23:54:35 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
[Stage 118:>                                                        (0 + 1) / 1]

+-----+--------------------+--------------------+--------------------+----------+
|class|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(42584,[0,1,3,5,5...|[-677.26912394716...|[1.0,2.6866504884...|       0.0|
|  0.0|(42584,[0,1,3,234...|[-572.50698495562...|[0.05714634598650...|       1.0|
|  0.0|(42584,[0,1,3,200...|[-666.30832030604...|[1.64458583267815...|       1.0|
|  0.0|(42584,[0,1,4,5,6...|[-572.81706032244...|[1.0,1.5070499112...|       0.0|
|  0.0|(42584,[0,1,4,5,1...|[-670.18614278553...|[1.0,1.4497711665...|       0.0|
|  0.0|(42584,[0,1,4,5,3...|[-651.51560832364...|[1.0,5.6352274027...|       0.0|
|  0.0|(42584,[0,1,4,5,1...|[-875.34878753948...|[1.42131778547011...|       1.0|
|  0.0|(42584,[0,1,4,5,4...|[-423.73744738305...|[0.99998284724447...|       0.0|
|  0.0|(42584,[0,1,4,5,5...|[-608.09492588725...|[1.0,5.0776477565...|       0.0|
|  0.0|(42584,[0

                                                                                

In [109]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [110]:
acc_eval = MulticlassClassificationEvaluator(labelCol="class")
acc = acc_eval.evaluate(test_results)
print("We have an accuracy of predicting fake news of: {}".format(acc))

24/05/07 23:54:37 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
[Stage 119:>                                                        (0 + 1) / 1]

We have an accuracy of predicting fake news of: 0.9583390605161378


                                                                                