# **Classification using Pyspark**

## **Configuration**

In [None]:
#Installation
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
#spark
import findspark
findspark.init()

In [None]:
#Creating Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Fakenews").getOrCreate()

## **Data Load and Explore**

In [None]:
#import necessary functions
from itertools import chain
from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract

In [None]:
#Data loading
df1 = spark.read.csv('/content/train.csv',
                     header=True, inferSchema=True)
df2 = spark.read.csv('/content/test.csv',
                     header=True, inferSchema=True)

In [None]:
#Printing information about each column
df1.printSchema()

root
 |-- Statement: string (nullable = true)
 |-- Label: string (nullable = true)



In [None]:
#Showing the dataframe
df1.show(4)

+--------------------+-----+
|           Statement|Label|
+--------------------+-----+
|Says the Annies L...|FALSE|
|When did the decl...| TRUE|
|"Hillary Clinton ...| TRUE|
|Health care refor...|FALSE|
+--------------------+-----+
only showing top 4 rows



In [None]:
#Converting dataset into pandas to observe the data in a more formatted way
df1.limit(100).toPandas()

Unnamed: 0,Statement,Label
0,Says the Annies List political group supports ...,FALSE
1,When did the decline of coal start? It started...,TRUE
2,"""Hillary Clinton agrees with John McCain """"by ...",TRUE
3,Health care reform legislation is likely to ma...,FALSE
4,The economic turnaround started at the end of ...,TRUE
...,...,...
95,Says David Jolly supports privatizing Social S...,FALSE
96,Did you know US population growth is at its lo...,TRUE
97,"The average student in Florida, what they actu...",TRUE
98,"Since 1968, more Americans have died from gunf...",TRUE


In [None]:
#Summarizing key information of the selected columns (describe() method can also be used)
df1.select('Statement', 'Label').summary().show()

+-------+--------------------+--------------------+
|summary|           Statement|               Label|
+-------+--------------------+--------------------+
|  count|               10269|               10260|
|   mean|                null|               564.5|
| stddev|                null|  232.63813101037414|
|    min|"(McCain) says ""...| 'The American pe...|
|    25%|                null|               400.0|
|    50%|                null|               400.0|
|    75%|                null|               729.0|
|    max|﻿﻿Since Mayor Ken...|transparency	news...|
+-------+--------------------+--------------------+



In [None]:
#To know the shape of the dataframe
print('Number of rows: \t', df1.count())
print('Number of columns: \t', len(df1.columns))

Number of rows: 	 10269
Number of columns: 	 2


## **Exploratory Data Analysis**

In [None]:
#Grouping a column based on the type of data present there
df1.groupBy('Label').count().show()

+--------------------+-----+
|               Label|count|
+--------------------+-----+
| offering basic c...|    1|
| against accounta...|    1|
|               FALSE| 4449|
| we could balance...|    1|
|               crime|    1|
| the percentage u...|    1|
| if that dorm has...|    1|
| better airport' ...|    1|
| but left it with...|    1|
| an award for 'Co...|    1|
|            preacher|    1|
|poverty	bernie-s	...|    1|
|"" says the solut...|    1|
| "" ""out of context|    1|
|             pundits|    1|
| which indicates ...|    1|
|000 to Al Gore's ...|    1|
|             however|    1|
|                Ohio|    1|
| but the threat t...|    1|
+--------------------+-----+
only showing top 20 rows



In [None]:
#Grouping a column based on categories of data present there (categorial variable)
# df1.groupBy('Label').pivot('Label').count().show()

In [None]:
#Printing number of column values that has null (Train set)
for col in df1.columns:
    print(col.ljust(15), df1.filter(df1[col].isNull()).count())

Statement       0
Label           9


In [None]:
#Printing number of column values that has null (Test set)
for col in df2.columns:
    print(col.ljust(15), df2.filter(df2[col].isNull()).count())

Statement       0
Label           4


In [None]:
#As only 2 value missing in embarked column, it can be filled with the most repeated value (one way)
df1.select('Label').summary('max', "50%", "mean").show()

+-------+--------------------+
|summary|               Label|
+-------+--------------------+
|    max|transparency	news...|
|    50%|               400.0|
|   mean|               564.5|
+-------+--------------------+



In [None]:
#The test dataset has some missing value in fare column
df1 = df1.fillna({'Label':'FALSE'})

In [None]:
#Printing number of column values that has null (Train set)
for col in df1.columns:
    print(col.ljust(15), df1.filter(df1[col].isNull()).count())

Statement       0
Label           0


## **Feature Engineering**

In [None]:
#Dropping unwanted columns
df1 = df1.drop('Title')

In [None]:
df1.show(5)

+--------------------+-----+
|           Statement|Label|
+--------------------+-----+
|Says the Annies L...|FALSE|
|When did the decl...| TRUE|
|"Hillary Clinton ...| TRUE|
|Health care refor...|FALSE|
|The economic turn...| TRUE|
+--------------------+-----+
only showing top 5 rows



## **Model Build**

In [None]:
# importing required libraries
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression,\
                    RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import re
import string
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import nltk
nltk.download('stopwords')

# Initialize PorterStemmer
portstem = PorterStemmer()
stop_words = set(stopwords.words('english'))

# Define the stemming function
def stemming(statement):
    # Preprocessing steps
    statement = re.sub('[^a-zA-Z]', ' ', statement)  # Remove non-alphabetic characters
    statement = statement.lower()  # Convert text to lowercase
    words = statement.split()  # Tokenize the text

    # Apply stemming and remove stopwords
    stemmed_words = [portstem.stem(word) for word in words if word not in stop_words]

    return ' '.join(stemmed_words)

# Convert the stemming function to a UDF
stemming_udf = udf(stemming, StringType())

# Apply the UDF to the 'Statement' column
df1 = df1.withColumn("Statement", stemming_udf(col("Statement")))

# Show the updated DataFrame
df1.show(truncate=False)



[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


+-----------------------------------------------------------------------------------------------------------+-----+
|Statement                                                                                                  |Label|
+-----------------------------------------------------------------------------------------------------------+-----+
|say anni list polit group support third trimest abort demand                                               |FALSE|
|declin coal start start natur ga took start begin presid georg w bush administr                            |TRUE |
|hillari clinton agr john mccain vote give georg bush benefit doubt iran                                    |TRUE |
|health care reform legisl like mandat free sex chang surgeri                                               |FALSE|
|econom turnaround start end term                                                                           |TRUE |
|chicago bear start quarterback last year total number tenur uw faculti 

In [None]:
#Converting categorical values to integer values (encoding)
stringIndexer_statement = StringIndexer(inputCol="Statement", outputCol="StatementIndex", handleInvalid="keep")
stringIndexer_label = StringIndexer(inputCol="Label", outputCol="LabelIndex", handleInvalid="keep")

# stringIndex_model = stringIndex.fit(df1)

# df1_ = stringIndex_model.transform(df1).drop('Statement', 'Label')
# df1_.show(4)

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[stringIndexer_statement, stringIndexer_label])

# Fit and transform the data using the pipeline
df1_transformed = pipeline.fit(df1).transform(df1)

# Show the transformed DataFrame
df1_transformed.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------+-----+--------------+----------+
|Statement                                                                                                  |Label|StatementIndex|LabelIndex|
+-----------------------------------------------------------------------------------------------------------+-----+--------------+----------+
|say anni list polit group support third trimest abort demand                                               |FALSE|6109.0        |1.0       |
|declin coal start start natur ga took start begin presid georg w bush administr                            |TRUE |1389.0        |0.0       |
|hillari clinton agr john mccain vote give georg bush benefit doubt iran                                    |TRUE |2767.0        |0.0       |
|health care reform legisl like mandat free sex chang surgeri                                               |FALSE|2713.0        |1.0       |
|econo

In [None]:
#converitng dataset for model
vec_asmbl = VectorAssembler(inputCols=df1_transformed.columns[3:],
                           outputCol='features')

df1_ = vec_asmbl.transform(df1_transformed).select('features', 'LabelIndex')
df1_.show(4, truncate=False)

+--------+----------+
|features|LabelIndex|
+--------+----------+
|[1.0]   |1.0       |
|[0.0]   |0.0       |
|[0.0]   |0.0       |
|[1.0]   |1.0       |
+--------+----------+
only showing top 4 rows



In [None]:
df1_.show(3)

+--------+----------+
|features|LabelIndex|
+--------+----------+
|   [1.0]|       1.0|
|   [0.0]|       0.0|
|   [0.0]|       0.0|
+--------+----------+
only showing top 3 rows



In [None]:
# Creating train and validation set
train_df, valid_df = df1_.randomSplit([0.8, 0.2])

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='LabelIndex',
                                          metricName='accuracy')

In [None]:
#Ridge Regression model
ridge = LogisticRegression(labelCol='LabelIndex',
                        maxIter=100,
                        elasticNetParam=0,
                        regParam=0.03)

model = ridge.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.5612395474667978

In [None]:
#Lasso Regression model
lasso = LogisticRegression(labelCol='LabelIndex',
                           maxIter=100,
                           elasticNetParam=1,
                           regParam=0.0003)

model = lasso.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.9896704377766847

In [None]:
#Random Forest Classifier Model
rf = RandomForestClassifier(labelCol='LabelIndex', numTrees=100, maxDepth=3, maxBins=150)
model = rf.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.9901623216920806

In [None]:
#naive Classifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
nb = NaiveBayes(featuresCol="features", labelCol="LabelIndex", predictionCol="prediction", smoothing=1.0)
model = nb.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.5612395474667978

## **Test Data Prepare**

In [None]:
df2 = spark.read.csv('/content/test.csv',
                     header=True, inferSchema=True)

In [None]:
df2.show(4)

+--------------------+-----+
|           Statement|Label|
+--------------------+-----+
|Building a wall o...| TRUE|
|Wisconsin is on p...|FALSE|
|Says John McCain ...|FALSE|
|Suzanne Bonamici ...| TRUE|
+--------------------+-----+
only showing top 4 rows



In [None]:
#Same as for Train set
df2 = df2.withColumn('Title', regexp_extract(df2['Statement'],\
                '([A-Za-z]+)\.', 1))


df2.groupBy('Label').agg(count('Label')).show()

+--------------------+------------+
|               Label|count(Label)|
+--------------------+------------+
|               FALSE|        1158|
| as well as the v...|           1|
| something that I...|           1|
| there were 16 st...|           1|
| it only got wors...|           1|
|  000"" the hardest"|           1|
| it's not Sen. Ob...|           1|
|           education|           1|
|             if ever|           1|
|               quote|           1|
|                null|           0|
|        a Republican|           1|
|                 400|           1|
| make you pay $10...|           1|
|                TRUE|        1363|
| and probably for...|           1|
| but because ""th...|           1|
| because of a dec...|           1|
|000 for AstroTurf...|           1|
| you should just ...|           1|
+--------------------+------------+
only showing top 20 rows



In [None]:
df2 = df2.drop('Title')

In [None]:
for col in df2.columns:
    print(col.ljust(15), df2.filter(df2[col].isNull()).count())

Statement       0
Label           4


In [None]:
df2 = df2.fillna({'Label':'FALSE'})

In [None]:
for col in df2.columns:
    print(col.ljust(15), df2.filter(df2[col].isNull()).count())

Statement       0
Label           0


In [None]:
for col in df1.columns:
    print(col.ljust(15), df1.filter(df1[col].isNull()).count())

Statement       0
Label           0


## **Introduction of Pipeline**

In [None]:
#Creating pipelne
pipeline_rf = Pipeline(stages=[stringIndexer_statement, stringIndexer_label, vec_asmbl, rf])

#Creating Grid
paramGrid = ParamGridBuilder().\
            addGrid(rf.maxDepth, [3]).\
            addGrid(rf.minInfoGain, [0.001]).\
            addGrid(rf.numTrees, [100]).\
            build()

#Initializing crossvalidator
selected_model = CrossValidator(estimator=pipeline_rf,
                                estimatorParamMaps=paramGrid,
                                evaluator=evaluator,
                                numFolds=5)
subset_data = df1.sample(withReplacement=False, fraction=0.1, seed=42)
model_final = selected_model.fit(subset_data)
# model_final = selected_model.fit(df1)
pred_train = model_final.transform(df1)
evaluator.evaluate(pred_train)

0.9892881487973513

In [None]:
#Predicting from Test Data
pred_test = model_final.transform(df2)

#Getting only predictions
predictions = pred_test.select('Statement', 'prediction')
predictions = predictions.\
                withColumn('Label', predictions['prediction'].\
                cast('integer')).drop('prediction')
predictions.show(5)

+--------------------+-----+
|           Statement|Label|
+--------------------+-----+
|Building a wall o...|    0|
|Wisconsin is on p...|    1|
|Says John McCain ...|    1|
|Suzanne Bonamici ...|    0|
|When asked by a r...|    1|
+--------------------+-----+
only showing top 5 rows



In [None]:
import string
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import MultinomialNB  # Assuming you're using Multinomial Naive Bayes

# Assuming you have defined your classifiers (ridge, lasso, rf, nb) elsewhere in your code

def output(n):
    if n == 0:
        return "The news is Real"
    elif n == 1:
        return "The news is Fake"

from pyspark.sql import Row

def manual_testing(news, pipeline, rf, nb):
    # Create a PySpark DataFrame with the user input
    test_data = [Row(Statement=news)]
    test_df = spark.createDataFrame(test_data)

    # Transform the test data using the same pipeline
    test_df_transformed = pipeline.transform(test_df)

    # Make predictions using each model
    pred_rf_value = rf.transform(test_df_transformed)
    pred_nb_value = nb.transform(test_df_transformed)

    # Output the predictions
    print("Random Forest Prediction: {}".format(output(pred_rf_value.select('prediction').collect()[0][0])))
    print("Naive Bayes Prediction: {}".format(output(pred_nb_value.select('prediction').collect()[0][0])))


In [None]:
user_input = input("Enter the news text: ")

# Call manual_testing with the user-provided news text and fitted pipeline and models
manual_testing(user_input, pipeline, model_ridge, model_rf, model_nb)

Enter the news text: cvbnm


NameError: ignored

In [None]:
# Example usage for taking input and performing manual_testing
# Assuming you have initialized and trained your models (ridge, lasso, rf, nb) elsewhere in your code

# Take input from the user for news text


In [None]:
news_input = str(input()) # Replace with your actual news input
manual_testing(news_input, pipeline)

In [None]:
#Saving the Model
model_final.write().save('fake_news_detection.model')

In [None]:
# Load the saved model
from pyspark.ml.tuning import CrossValidatorModel

loaded_model = CrossValidatorModel.load('fake_news_detection.model')