In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import nltk
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
import re

# Initialize SparkSession
spark = SparkSession.builder.appName("NewsClassification").getOrCreate()

# Load the datasets into Spark DataFrames
true_df = spark.read.csv('/content/True.csv', header=True, inferSchema=True)
fake_df = spark.read.csv('/content/Fake.csv', header=True, inferSchema=True)

# Add label columns
true_df = true_df.withColumn('label', lit(1))
fake_df = fake_df.withColumn('label', lit(0))

# Combine the sub-datasets into one
news_df = true_df.union(fake_df)
#randomize the news_df then take only the top 200
news_df = news_df.orderBy(rand())
news_df = news_df.limit(200)

# Define a function for text preprocessing using NLTK
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))
ps = PorterStemmer()

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
def preprocess_text(text):
    text = re.sub('[^a-zA-Z]', ' ', text)  # Remove non-alphabetic characters
    text = text.lower().split()  # Convert to lowercase and split into words
    text = [ps.stem(word) for word in text if word not in stop_words]  # Stemming and remove stopwords
    return ' '.join(text)

In [None]:
# Apply text preprocessing to the 'text' column
preprocess_udf = udf(preprocess_text, StringType())
news_df = news_df.withColumn('processed_text', preprocess_udf(col('text')))

In [None]:
# TF-IDF Vectorization using Spark MLlib
tokenizer = Tokenizer(inputCol='processed_text', outputCol='words')
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')
hashing_tf = HashingTF(inputCol='filtered_words', outputCol='raw_features', numFeatures=5000)
idf = IDF(inputCol='raw_features', outputCol='features')


In [None]:
pipeline = Pipeline(stages=[tokenizer, remover, hashing_tf, idf])
pipeline_model = pipeline.fit(news_df)
news_df = pipeline_model.transform(news_df)

# Select the features and label columns
news_df = news_df.select('features', 'label')

In [None]:
# Split the dataset into training and test sets
train_df, test_df = news_df.randomSplit([0.8, 0.2], seed=0)

# Train a logistic regression model
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

# Make predictions on the test set
predictions = lr_model.transform(test_df)

# Evaluate the model using BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy *100}")

# Optionally, print more evaluation metrics
predictions.show()

Accuracy: 94.23076923076923
+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(5000,[6,88,95,98...|    1|[-0.9061859304392...|[0.28778094747659...|       1.0|
|(5000,[6,122,158,...|    0|[0.32763983290708...|[0.58118500126319...|       0.0|
|(5000,[7,19,132,1...|    0|[0.32763983290708...|[0.58118500126319...|       0.0|
|(5000,[7,29,51,64...|    1|[-0.9061859304392...|[0.28778094747659...|       1.0|
|(5000,[7,63,89,12...|    1|[-0.9061859304392...|[0.28778094747659...|       1.0|
|(5000,[8,30,33,41...|    0|[0.32763983290708...|[0.58118500126319...|       0.0|
|(5000,[8,46,64,95...|    1|[-0.2892730487661...|[0.42818184607372...|       1.0|
|(5000,[8,146,171,...|    1|[-0.9061859304392...|[0.28778094747659...|       1.0|
|(5000,[15,29,87,9...|    1|[-0.9061859304392...|[0.28778094747659...|