In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb+srv://****:****@*****.xpwsv.mongodb.net/******.**********?retryWrites=true&w=majority") \
    .config("spark.mongodb.output.uri", "mongodb+srv://****:****@*****.xpwsv.mongodb.net/******.********?retryWrites=true&w=majority") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1608152380437_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
#this will load data from mongoDB database 
df = spark.read.format("mongo").load()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, split
import pyspark.sql.functions
from pyspark.sql import functions as F
from pyspark.sql.functions import lower, col
from pyspark.sql.functions import trim,regexp_replace,concat_ws
from pyspark.ml.feature import SQLTransformer, RegexTokenizer, StopWordsRemover, CountVectorizer, Imputer, IDF
from pyspark.ml.feature import StringIndexer, VectorAssembler
StopWordsRemover.loadDefaultStopWords('english')
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType
from nltk.stem.porter import *
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.functions import udf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
#converting labels data from string to numeric  
df = df.withColumn("label", F.when(F.col("label")=='true', 0).otherwise(F.col("label")))
df = df.withColumn("label", F.when(F.col("label")=='fake', 1).otherwise(F.col("label")))
df = df.withColumn("label", df["label"].cast(IntegerType()))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
#checking null values again to verify
df.filter(df.label.isNull()).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [6]:
#converting data into lowercase
df = df.select("*", lower(col('content'))).drop('content')
df = df.select("*", lower(col('title'))).drop('title')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
#making column names more readable 
df = df.select('*',col("lower(content)").alias("content"), col("lower(title)").alias("title")).drop('lower(content)','lower(title)')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
#function for removing punctuation 
def removePunctuation(column):
     return split(trim(lower(regexp_replace(concat_ws("SEPARATORSTRING", column),'[^\sa-zA-Z0-9]', ''))), "SEPARATORSTRING").alias('stopped')

#making column names more readable
df = df.withColumn("title_r", removePunctuation(col('title')))
df = df.withColumn("content_r", removePunctuation(col('content')))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
#making column names more readable
df = df.withColumn("title_p",concat_ws("",col("title_r"))).drop("title_r")
df = df.withColumn("content_p",concat_ws("",col("content_r"))).drop("content_r")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#splitting sentences to words in title column
split_col = pyspark.sql.functions.split(df['title_p'], ' ')
df = df.withColumn("title_split",split_col).drop('title_p')

#splitting sentences to words in content column
split_col = pyspark.sql.functions.split(df['content_p'], ' ')
df = df.withColumn("content_split",split_col).drop('content_p')
#display(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df = df.withColumn("title_stemmed", stemmer_udf("title_split"))
df = df.withColumn("content_stemmed", stemmer_udf("content_split"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
#Remove stop words from title
title_sw_remover= StopWordsRemover(inputCol= 'title_stemmed', outputCol= 'title_sw_removed')

#computing frequency of the words for title
title_count_vectorizer= CountVectorizer(inputCol= 'title_sw_removed', outputCol= 'tf_title')

#Computing frequency-inverse document frequency from title
title_tfidf= IDF(inputCol= 'tf_title', outputCol= 'tf_idf_title')

#Remove stop words from content
text_sw_remover= StopWordsRemover(inputCol= 'content_stemmed', outputCol= 'text_sw_removed')

#computing frequency of the words for content
text_count_vectorizer= CountVectorizer(inputCol= 'text_sw_removed', outputCol= 'tf_text')

#Computing frequency-inverse document frequency from content
text_tfidf= IDF(inputCol= 'tf_text', outputCol= 'tf_idf_text')

#VectorAssembler
vec_assembler= VectorAssembler(inputCols=['tf_idf_title', 'tf_idf_text'], outputCol= 'features')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20, regParam=0.3, elasticNetParam=0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
from pyspark.ml import Pipeline
lr_pipe= Pipeline(stages=[
                title_sw_remover, 
                title_count_vectorizer, 
                title_tfidf, 
                text_sw_remover, 
                text_count_vectorizer, 
                text_tfidf,
                vec_assembler, 
                lr]) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
#setting up hyper parameters for cross validation 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
train, test= df.randomSplit([0.7, 0.3])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
crossval = CrossValidator(estimator=lr_pipe,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
#fitting the training data into our model 
lr_model=crossval.fit(train).bestModel

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
lr_model.write().overwrite().save("s3://fakenewstkbucket/dataset/finalmodel_cr")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
#lr_model.avgMetrices()
predictions = lr_model.transform(test)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator

rmse_value = RegressionEvaluator(metricName="rmse")
r2_value = RegressionEvaluator(metricName="r2")

rmse_value.evaluate(predictions)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.15723096484614485

In [22]:
r2_value.evaluate(predictions)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.8849717811851903

In [23]:
#trainingSummary = lr_model.bestModel.summary
#predictions.head(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Instantiate metrics object
metrics = BinaryClassificationMetrics(predictions['label','prediction'].rdd)

# Accuracy                                      
print('Accuracy:', evaluator.evaluate(predictions))
                                      
# Area under ROC curve
#print("Area under ROC = %s" % metrics.areaUnderROC)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.9983021783881841

In [25]:
from pyspark.ml.evaluation import  MulticlassClassificationEvaluator, BinaryClassificationEvaluator

accuracy= MulticlassClassificationEvaluator(labelCol= 'label', predictionCol= 'prediction', metricName= 'accuracy')
f1= MulticlassClassificationEvaluator(labelCol= 'label', predictionCol= 'prediction', metricName= 'f1')
areaUnderROC= BinaryClassificationEvaluator(labelCol= 'label', metricName= 'areaUnderROC')

def classification_evaluator(data_result):
    data_result.crosstab(col1= 'prediction', col2= 'label').show()
    print('accuracy:' ,accuracy.evaluate(data_result))
    print('f1:' ,f1.evaluate(data_result))
    print('areaUnderROC:' ,areaUnderROC.evaluate(data_result))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
classification_evaluator(predictions)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
def test_data(test_df):

  
   #lowercase the title and content column
  test_df = test_df.select("*", lower(col('content'))).drop('content')
  test_df = test_df.select("*", lower(col('title'))).drop('title')
  test_df = test_df.select('*',col("lower(content)").alias("content"), col("lower(title)").alias("title")).drop('lower(content)','lower(title)')
  
  #spliting the title and contetnt column
  split_col = pyspark.sql.functions.split(test_df['title'], ' ')
  test_df = test_df.withColumn("title_split",split_col).drop('title')
  split_col = pyspark.sql.functions.split(test_df['content'], ' ')
  test_df = test_df.withColumn("content_split",split_col).drop('content')
  
  # Stem title and content column
  stemmer = SnowballStemmer(language='english')
  stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
  
  test_df = test_df.withColumn("title_stemmed", stemmer_udf("title_split"))
  test_df = test_df.withColumn("content_stemmed", stemmer_udf("content_split"))
    
  return test_df

In [None]:
#Validation

df_test = spark.read.format('csv').options(header='true', inferSchema='true').load('s3://*********/*******')
df_test.show()

In [None]:
result = test_data(df_test)

In [None]:
result = result.withColumn("label", result["label"].cast(IntegerType()))

In [None]:
validation_prediction = lr_model.transform(result)
validation_prediction

In [None]:
classification_evaluator(validation_prediction)