## Fake News prediction using Spark

### Read data set
1.	Kaggle fake news data set
    a. True.csv – 21192 real unique records
    b.	Fake.csv – 22851 fake unique records
2.	Research Articles
    a.	249 research articles
3.	Data (Some news data) 
    a.	72103 unique fake and real news
4.	 News data set 
    a.	166355 unique fake and real news
5.	 Politifacts data set
    a.	625 real unique records
    b.	433 fake unique records
6.	 Train and test data set
    a.	146373 unique fake and real news
    
Reading combined data from all above data using pandas.read_csv method and convert to Spark Dataframe.

In [0]:
allData_df = spark.read.csv("/FileStore/tables/all_data.csv", header="true", inferSchema="true")
allData_df.count()

In [0]:
from pyspark.sql.functions import rand
data = allData_df.orderBy(rand())

In [0]:
data.groupBy('label').count().show()

### Data Exploration and Cleaning Preprocessing

In [0]:
from pyspark.ml.feature import SQLTransformer, RegexTokenizer, StopWordsRemover, CountVectorizer, Imputer, IDF
from pyspark.ml.feature import StringIndexer, VectorAssembler
StopWordsRemover.loadDefaultStopWords('english')

data = data.dropna()
# Extract tokens from text
text_tokenizer= RegexTokenizer(inputCol= 'text', outputCol= 'text_words',
                                pattern= '\\W', toLowercase= True)
# Remove stop words from text
text_sw_remover= StopWordsRemover(inputCol= 'text_words', outputCol= 'text_sw_removed')
# Compute Term frequency from text
text_count_vectorizer= CountVectorizer(inputCol= 'text_sw_removed', outputCol= 'tf_text')
# Compute Term frequency-inverse document frequency text
text_tfidf= IDF(inputCol= 'tf_text', outputCol= 'tf_idf_text')

# VectorAssembler
vec_assembler= VectorAssembler(inputCols=['tf_idf_text'], outputCol= 'features')

### Algorithms

In [0]:
from pyspark.ml.classification import RandomForestClassifier
# 10 Random Forest Classifier
rf= RandomForestClassifier(featuresCol= 'features', labelCol= 'label', predictionCol= 'fake_predict', maxDepth= 7, numTrees= 20)


In [0]:
from pyspark.ml import Pipeline
rf_pipe= Pipeline(stages=[
                text_tokenizer,
                text_sw_remover,
                text_count_vectorizer,
                text_tfidf,
                vec_assembler,
                rf])

In [0]:
train, test= data.randomSplit([0.8, 0.2])

In [0]:
rf_model= rf_pipe.fit(train)

In [0]:
# Function for evaluating classification model
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator, BinaryClassificationEvaluator

accuracy= MulticlassClassificationEvaluator(labelCol= 'label', predictionCol= 'fake_predict', metricName= 'accuracy')
f1= MulticlassClassificationEvaluator(labelCol= 'label', predictionCol= 'fake_predict', metricName= 'f1')
areaUnderROC= BinaryClassificationEvaluator(labelCol= 'label', metricName= 'areaUnderROC')

def classification_evaluator(data_result):
    data_result.crosstab(col1= 'fake_predict', col2= 'label').show()
    print('accuracy:' ,accuracy.evaluate(data_result))
    print('f1:' ,f1.evaluate(data_result))
    print('areaUnderROC:' ,areaUnderROC.evaluate(data_result))

In [0]:
# Predict on training data set
rf_train_result= rf_model.transform(train)
classification_evaluator(rf_train_result)

In [0]:
# Predict on test data set
rf_test_result= rf_model.transform(test)
classification_evaluator(rf_test_result)