In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=f35f4bccd46f08e01d941f545eb605bc417dc6a543f2aa4b68dfcae8c407ecbc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:

import nltk
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.ml.feature import IDF, Tokenizer, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.functions import when, col, regexp_replace, concat, lit, length
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml.classification import NaiveBayesModel, NaiveBayes
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix


In [None]:
spark = SparkSession.builder.appName("Fake News Detection").master("local[*]").getOrCreate()

mySchema = StructType([ StructField("index", IntegerType(), True)\
                       ,StructField("title", StringType(), True)\
                       ,StructField("author", StringType(), True)\
                       ,StructField("text", StringType(), True)\
                       ,StructField("label", IntegerType(), True)])

path = "/content/train.csv"
pandas_df = pd.read_csv(path, sep=',', de)

spark_df = spark.createDataFrame(pandas_df, schema = mySchema)




  pandas_df = pd.read_csv(path, sep=',', delimiter=None, header='infer', names=None, index_col=None, usecols=None, squeeze=False,engine=None)


In [None]:
spark_df.show()

+-----+--------------------+--------------------+--------------------+-----+
|index|               title|              author|                text|label|
+-----+--------------------+--------------------+--------------------+-----+
|    0|House Dem Aide: W...|       Darrell Lucus|House Dem Aide: W...|    1|
|    1|FLYNN: Hillary Cl...|     Daniel J. Flynn|Ever get the feel...|    0|
|    2|Why the Truth Mig...|  Consortiumnews.com|Why the Truth Mig...|    1|
|    3|15 Civilians Kill...|     Jessica Purkiss|Videos 15 Civilia...|    1|
|    4|Iranian woman jai...|      Howard Portnoy|Print \nAn Irania...|    1|
|    5|Jackie Mason: Hol...|     Daniel Nussbaum|In these trying t...|    0|
|    6|Life: Life Of Lux...|                 NaN|Ever wonder how B...|    1|
|    7|Benoît Hamon Wins...|     Alissa J. Rubin|PARIS  —   France...|    0|
|    8|Excerpts From a D...|                 NaN|Donald J. Trump i...|    0|
|    9|A Back-Channel Pl...|Megan Twohey and ...|A week before Mic...|    0|

In [None]:
df_rmv_nan_text = spark_df.filter(length(col("text")) > 60)
df_no_nan = (df_rmv_nan_text.withColumn("title", when(col("title") == "NaN", " ").otherwise(col("title"))))
df_clean = (df_no_nan.withColumn("title", regexp_replace(col('title'),r'[^\w\’ ]','')).withColumn("text", regexp_replace(col('text'),r'[^\w\’ ]','')).withColumn("text", regexp_replace(col('text'),r'[ ]{2,}',' ')).withColumn("title", regexp_replace(col('text'),r'[ ]{2,}',' '))
                )
df_combined = (df_clean
                    .withColumn('full_text',
                                  when(col("text").contains(
                                                    concat(col("title"))),
                                                    col("text"))

                                  .otherwise(concat(col("title"),
                                                    lit(" "),
                                                    col("text"))))
                    .select(["full_text","label"])
                    .withColumn("label", col("label").cast(DoubleType()))
                    .dropDuplicates()
                )


del df_rmv_nan_text, df_no_nan, df_clean

print(df_combined.count())
df_combined.show(7)

20210
+--------------------+-----+
|           full_text|label|
+--------------------+-----+
|0 0 AP N1 26 27 1...|  1.0|
|GREENBELT Md The ...|  0.0|
|The Minnesota off...|  0.0|
|GeoEngineering Un...|  1.0|
|Following a fight...|  0.0|
|The military indu...|  1.0|
|Insists Russia De...|  1.0|
+--------------------+-----+
only showing top 7 rows



In [None]:
df_combined.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|10385|
|  1.0| 9825|
+-----+-----+



In [None]:
try:
    stopwords_ls = stopwords.words('english')
except:
    nltk.download("stopwords")
    stopwords_ls = stopwords.words('english')

stopwords_ls[:10]

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're"]

In [None]:

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable


class Stemmer(Transformer,
                 HasInputCol,
                 HasOutputCol,
                 DefaultParamsReadable,
                 DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol = "input", outputCol = "output"):
        super(Stemmer, self).__init__()
        kwargs = self._input_kwargs
        self.set_params(**kwargs)

    @keyword_only
    def set_params(self, inputCol = "input", outputCol = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.inputCol)

    def get_output_col(self):
        return self.getOrDefault(self.outputCol)

    def _transform(self, df):

        input_col = self.get_input_col()
        output_col = self.get_output_col()


        ps = PorterStemmer()

        transform_udf = F.udf(lambda x: [ps.stem(word) for word in x], ArrayType(StringType(), False))

        return df.withColumn(output_col, transform_udf(input_col))

In [None]:
train, test = df_combined.randomSplit([0.7,0.3], seed=2)

In [None]:

tokenizer = Tokenizer(inputCol="full_text", outputCol="full_text_words")

word_remover = StopWordsRemover(stopWords = stopwords_ls,
                                inputCol = "full_text_words",
                                outputCol = "full_text_words_clean")

stemmer = Stemmer(inputCol = "full_text_words_clean", outputCol = "stemmed")

tf = CountVectorizer(inputCol="stemmed", outputCol="features", vocabSize = 1e6)

pipeline = Pipeline(stages= [tokenizer, word_remover, stemmer, tf]).fit(train)
train_df = pipeline.transform(train).select(["full_text","features","label"])
test_df = pipeline.transform(test).select(["full_text","features","label"])

In [None]:
nb = NaiveBayes(labelCol="label", featuresCol="features", thresholds = [0.6, 0.4])
nb_model = nb.fit(train_df)
predictions_nb = nb_model.transform(test_df)

In [None]:
def evaluate(df, labelCol="label", predCol="prediction"):
    labels = df[labelCol].tolist()
    predictions = df[predCol].tolist()

    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions)
    recall = recall_score(labels, predictions)
    f1 = f1_score(labels, predictions)
    confusion = confusion_matrix(labels, predictions)

    print("Accuracy: %.3f" % accuracy)
    print("Recall: %.3f" % recall)
    print("Precision: %.3f" % precision)
    print("F1 Score: %.3f" % f1)
    print("Confusion Matrix:")
    print(confusion)

    return confusion, precision, recall



In [None]:
evaluate(predictions_nb.select(["label","prediction"]).toPandas())

Accuracy: 0.921
Recall: 0.868
Precision: 0.968
F1 Score: 0.915
Confusion Matrix:
[[2985   84]
 [ 387 2540]]


(array([[2985,   84],
        [ 387, 2540]]),
 0.9679878048780488,
 0.867782712675094)