In [0]:
import pyspark

In [0]:
# import the dataset
df_fake = sqlContext.sql("SELECT text FROM fake_3_csv")
df_true = sqlContext.sql("SELECT text FROM true_2_csv")

In [0]:
# add labels
from pyspark.sql.functions import lit
df_true = df_true.withColumn('label',lit(0))
df_fake = df_fake.withColumn('label',lit(1))

In [0]:
# properties of data
df_true.describe().show()
df_fake.describe().show()

In [0]:
# remove duplicates
df_fake = df_fake.drop_duplicates()
df_true = df_true.drop_duplicates()
df_fake.describe().show()
df_true.describe().show()

In [0]:
# drop null values
df_fake = df_fake.dropna()
df_fake.describe().show()
df_true = df_true.dropna()
df_true.describe().show()

In [0]:
# balance the imbalanced part 
df_true = df_true.sample(fraction=17455/21192)
df_true.count()

In [0]:
# merge entire data
df = df_true.union(df_fake)

In [0]:
df.show()

In [0]:
df.describe().show()

In [0]:
# text preprocessing
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="token_text", pattern="\\W")
stop_word_remover = StopWordsRemover(inputCol='token_text',outputCol='cleaned_tokens')
count_vec = CountVectorizer(inputCol='cleaned_tokens',outputCol='c_vec')
idf = IDF(inputCol="c_vec", outputCol="tf_idf")

In [0]:
# import necessary module
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[regex_tokenizer,stop_word_remover,count_vec,idf])

from pyspark.ml.evaluation import MulticlassClassificationEvaluator , BinaryClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
bin_score = BinaryClassificationEvaluator()

In [0]:
# tf-idf encoding
data_gen = data_prep_pipe.fit(df)
transformed_df = data_gen.transform(df)
data = transformed_df.withColumnRenamed('tf_idf','features').select(['label','features'])
data.show()

In [0]:
# split the data to train and test
train_df , test_df = data.randomSplit([0.7,0.3])
print(f'train_df size is {train_df.count()}')
print(f'test_df size is {test_df.count()}')
train_df.groupBy('label').count().show()

In [0]:
# classification with Naive Bayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes()
fake_news_classifier_nb = nb.fit(train_df)
fake_news_classifier_nb_pred = fake_news_classifier_nb.transform(test_df)
fake_news_classifier_nb_pred.show()

In [0]:
# model evaluation
acc_nb = acc_eval.evaluate(fake_news_classifier_nb_pred)
print("Accuracy of Naive Bayes model at predicting fake news is: {}".format(acc_nb))

In [0]:
fake_news_classifier_nb_pred.groupBy('label').count().show()
fake_news_classifier_nb_pred.groupBy('prediction').count().show()

In [0]:
# classification with Linear SVC
from pyspark.ml.classification import LinearSVC
svc = LinearSVC()
fake_news_classifier_svc = svc.fit(train_df)
fake_news_classifier_svc_pred = fake_news_classifier_svc.transform(test_df)
acc_svc = acc_eval.evaluate(fake_news_classifier_svc_pred)
bin_score_svc = bin_score.evaluate(fake_news_classifier_svc_pred)
print("Accuracy of Linear SVC model at predicting fake news is: {} and area under curve is {}".format(acc_svc,bin_score_svc))

In [0]:
# classification with Logistic Regression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
fake_news_classifier_lr = lr.fit(train_df)
fake_news_classifier_lr_pred = fake_news_classifier_lr.transform(test_df)
acc_lr = acc_eval.evaluate(fake_news_classifier_lr_pred)
bin_score_lr = bin_score.evaluate(fake_news_classifier_lr_pred)
print("Accuracy of Logistic Regression model at predicting fake news is: {} and area under curve is {}".format(acc_lr,bin_score_lr))