# NLP for detect fake news using PySpark

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load in 

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the "../input/" directory.
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# Any results you write to the current directory are saved as output.

In [2]:
!pip install pyspark



In [3]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [4]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

# Initialize Spark session with increased timeout settings
ss = SparkSession.builder \
    .appName("Fake and Real News") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.network.timeout", "120s") \
    .config("spark.ui.port", "4040") \
    .config("spark.driver.memory", "15g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

sc = ss.sparkContext

## 1. Read data set  
Reading data using the SparkSession.read.csv method causes structural errors for the data file. So read the data using pandas.read_csv method and convert to Spark Dataframe.


In [5]:
# Funtion for conver Pandas Dataframe to Spark Dataframe
from pyspark.sql.types import StringType, IntegerType, StructField, StructType
def read_data(path):
  schema= StructType(
      [StructField('title',StringType(),True),
      StructField('text',StringType(),True),
      StructField('label',IntegerType(),True)])
  pd_df= pd.read_csv(path).drop('Unnamed: 0', axis= 1)
  sp_df= ss.createDataFrame(pd_df, schema= schema)
  return sp_df

In [6]:
# Read data set
path_data= 'WELFake_Dataset.csv'
data= read_data(path_data)

## 2. Check the data set

In [7]:
data["text", "label"].show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|No comment is exp...|    1|
|Did they post the...|    1|
| Now, most of the...|    1|
|A dozen political...|    0|
|The RS-28 Sarmat ...|    1|
+--------------------+-----+
only showing top 5 rows



## 3. Create objects for processing data

In [8]:
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, DoubleType, ArrayType
import numpy as np

class TfidfFilter(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol=None, outputCol=None, threshold=0.1):
        super(TfidfFilter, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.threshold = threshold
    
    def _transform(self, dataset):
        def filter_tfidf(vec):
            indices = [i for i, v in enumerate(vec.toArray()) if v > self.threshold]
            values = [v for v in vec.toArray() if v > self.threshold]
            return Vectors.sparse(vec.size, indices, values)
        
        filter_udf = udf(filter_tfidf, VectorUDT())
        return dataset.withColumn(self.outputCol, filter_udf(self.inputCol))

In [9]:
from pyspark.ml.feature import SQLTransformer, RegexTokenizer, StopWordsRemover, CountVectorizer, Imputer, IDF
from pyspark.ml.feature import StringIndexer, VectorAssembler
StopWordsRemover.loadDefaultStopWords('english')

# 0. Extract tokens from title
title_tokenizer= RegexTokenizer(inputCol= 'title', outputCol= 'title_words',
                                pattern= '\\W', toLowercase= True)
# 1. Remove stop words from title
title_sw_remover= StopWordsRemover(inputCol= 'title_words', outputCol= 'title_sw_removed')
# 2. Compute Term frequency from title
title_count_vectorizer= CountVectorizer(inputCol= 'title_sw_removed', outputCol= 'tf_title')
# 3. Compute Term frequency-inverse document frequency from title
title_tfidf= IDF(inputCol= 'tf_title', outputCol= 'tf_idf_title')
# 4. Extract tokens from text
text_tokenizer= RegexTokenizer(inputCol= 'text', outputCol= 'text_words',
                                pattern= '\\W', toLowercase= True)
# 5. Remove stop words from text
text_sw_remover= StopWordsRemover(inputCol= 'text_words', outputCol= 'text_sw_removed')
# 6. Compute Term frequency from text
text_count_vectorizer= CountVectorizer(inputCol= 'text_sw_removed', outputCol= 'tf_text')
# 7. Compute Term frequency-inverse document frequency text
text_tfidf= IDF(inputCol= 'tf_text', outputCol= 'tf_idf_text')

# Create the custom TF-IDF filter transformer
tfidf_filter = TfidfFilter(inputCol='tf_idf_text', outputCol='filtered_tf_idf_text', threshold=0.15)

# 9. VectorAssembler
vec_assembler= VectorAssembler(inputCols=['tf_idf_title', 'filtered_tf_idf_text'], outputCol= 'features')

## 4. Create object for Random Forest Classifier model

In [10]:
from pyspark.ml.classification import RandomForestClassifier
# 10 Random Forest Classifier
rf= RandomForestClassifier(featuresCol= 'features', labelCol= 'label', predictionCol= 'label_predict', maxDepth= 7, numTrees= 20)

### 4.1. Create Pipeline for processing and fitting data to model

In [11]:
from pyspark.ml import Pipeline
rf_pipe= Pipeline(stages=[title_tokenizer, # 0
                title_sw_remover, # 1
                title_count_vectorizer, # 2
                title_tfidf, # 3
                text_tokenizer, # 4
                text_sw_remover, # 5
                text_count_vectorizer, # 6
                text_tfidf, # 7
                tfidf_filter,         # 8
                vec_assembler,        # 9
                rf]) # 10 model

## 5. Create object for Logistic Regression model

In [12]:
from pyspark.ml.classification import LogisticRegression

# Define the Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label', predictionCol='label_predict', maxIter=10)

### 5.1 Create Pipeline for processing and fitting data to model (LR)

In [13]:
# Create the pipeline
lr_pipe = Pipeline(stages=[
    title_tokenizer,      # 0
    title_sw_remover,     # 1
    title_count_vectorizer, # 2
    title_tfidf,          # 3
    text_tokenizer,       # 4
    text_sw_remover,      # 5
    text_count_vectorizer, # 6
    text_tfidf,           # 7
    tfidf_filter,         # 8
    vec_assembler,        # 9
    lr                    # 10 model
])

## 6. Splitting the dataset into the training set and test set

In [14]:
train, test= data.randomSplit([0.8, 0.2])

## 7. Fitting the models

In [15]:
rf_model= rf_pipe.fit(train)

In [16]:
lr_model = lr_pipe.fit(train)

## 8. Evaluate classification model

In [17]:
# Function for evaluating classification model
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator, BinaryClassificationEvaluator

accuracy= MulticlassClassificationEvaluator(labelCol= 'label', predictionCol= 'label_predict', metricName= 'accuracy')
f1= MulticlassClassificationEvaluator(labelCol= 'label', predictionCol= 'label_predict', metricName= 'f1')
areaUnderROC= BinaryClassificationEvaluator(labelCol= 'label', metricName= 'areaUnderROC')

def classification_evaluator(data_result):
    data_result.crosstab(col1= 'label_predict', col2= 'label').show()
    print('accuracy:' ,accuracy.evaluate(data_result))
    print('f1:' ,f1.evaluate(data_result))
    print('areaUnderROC:' ,areaUnderROC.evaluate(data_result))

### 8.1 Evaluation of final model fit on the training data set

#### Random Forest

In [18]:
# Predict on training data set
rf_train_result= rf_model.transform(train)

In [19]:
classification_evaluator(rf_train_result)

+-------------------+-----+-----+
|label_predict_label|    0|    1|
+-------------------+-----+-----+
|                1.0| 8675|26772|
|                0.0|19353| 2883|
+-------------------+-----+-----+

accuracy: 0.7996290068130992
f1: 0.796997785382555
areaUnderROC: 0.9013619254026799


#### Logistic Regression

In [20]:
lr_train_result = lr_model.transform(train)

In [21]:
classification_evaluator(lr_train_result)

+-------------------+-----+-----+
|label_predict_label|    0|    1|
+-------------------+-----+-----+
|                1.0|    7|29655|
|                0.0|28021|    0|
+-------------------+-----+-----+

accuracy: 0.9998786470883969
f1: 0.9998786466709
areaUnderROC: 0.999967578847917


### 8.2 evaluation of final model fit on the test data set

In [22]:
# Predict on test data set
rf_test_result= rf_model.transform(test)

In [23]:
classification_evaluator(rf_test_result)

+-------------------+----+----+
|label_predict_label|   0|   1|
+-------------------+----+----+
|                1.0|2182|6658|
|                0.0|4818| 793|
+-------------------+----+----+

accuracy: 0.7941318939865754
f1: 0.7915708804829462
areaUnderROC: 0.8955413271468835


In [24]:
lr_test_result = lr_model.transform(test)

In [25]:
classification_evaluator(lr_test_result)

+-------------------+----+----+
|label_predict_label|   0|   1|
+-------------------+----+----+
|                1.0| 245|7218|
|                0.0|6755| 233|
+-------------------+----+----+

accuracy: 0.9669227043111204
f1: 0.9669218233789496
areaUnderROC: 0.9908905228444894
