In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Step 1: Initialize Spark Session
spark = SparkSession.builder \
    .appName("Fake News Detection") \
    .master("local[*]") \
    .getOrCreate()

# Step 2: Load Data
file_paths = {
    "train": "Constraint_Train.csv",
    "test": "english_test_with_labels.csv"
}
data_train = spark.read.csv(file_paths["train"], header=True, inferSchema=True)
data_test = spark.read.csv(file_paths["test"], header=True, inferSchema=True)

# Step 3: Data Cleaning
# Replace nulls in 'tweet' column
data_train = data_train.fillna({"tweet": "empty"}).dropna(subset=["label"])
data_test = data_test.fillna({"tweet": "empty"}).dropna(subset=["label"])

# Map string labels to numerical values
data_train = data_train.withColumn(
    "label",
    when(col("label") == "real", 1).when(col("label") == "fake", 0)
)
data_test = data_test.withColumn(
    "label",
    when(col("label") == "real", 1).when(col("label") == "fake", 0)
)

# Filter out rows where the label is null
data_train = data_train.filter(col("label").isNotNull())
data_test = data_test.filter(col("label").isNotNull())

# Verify label transformation and null removal
data_train.select("label").distinct().show()
data_test.select("label").distinct().show()

# Step 4: Define Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="tweet", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")

pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, lr])

# Step 5: Train Model
model = pipeline.fit(data_train)

# Step 6: Save the Model
model.write().overwrite().save("fake_news_detection_model")

# Step 7: Predict on User Input
def predict_news_label(news_text, model, spark_session):
    from pyspark.sql import Row
    input_data = spark_session.createDataFrame([Row(tweet=news_text)])
    predictions = model.transform(input_data)
    predicted_label_index = int(predictions.select("prediction").first()[0])
    label_mapping = {0: "fake", 1: "real"}
    return label_mapping.get(predicted_label_index, "unknown")

if __name__ == "__main__":
    from pyspark.ml.pipeline import PipelineModel

    # Load the trained model
    model = PipelineModel.load("fake_news_detection_model")

    # User Input Prediction Loop
    while True:
        user_input = input("Enter a news text (or type 'exit' to quit): ")
        if user_input.lower() == "exit":
            print("Exiting...")
            break
        try:
            prediction = predict_news_label(user_input, model, spark)
            print(f"Prediction: {prediction}")
        except Exception as e:
            print(f"Error: {e}")



+-----+
|label|
+-----+
|    1|
|    0|
+-----+

+-----+
|label|
+-----+
|    1|
|    0|
+-----+

Enter a news text (or type 'exit' to quit): Enter the news headline or content to predict (type 'exit' to quit): News: Chinese converting to Islam after realising that no muslim was affected by #Coronavirus #COVD19 in the country Prediction: The news is likely 'Unknown (could not process the input)'
Prediction: fake
Enter a news text (or type 'exit' to quit): exit
Exiting...


In [2]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Step 1: Make Predictions on the Test Set
predictions = model.transform(data_test)

# Step 2: Initialize the Evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

# Step 3: Calculate Accuracy
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Step 4: Calculate Precision
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision"
)
precision = precision_evaluator.evaluate(predictions)
print(f"Precision: {precision}")

# Step 5: Calculate Recall
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall"
)
recall = recall_evaluator.evaluate(predictions)
print(f"Recall: {recall}")

# Step 6: Calculate F1-Score
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)
f1_score = f1_evaluator.evaluate(predictions)
print(f"F1 Score: {f1_score}")


Accuracy: 0.9182481751824818
Precision: 0.9242072432415607
Recall: 0.9182481751824818
F1 Score: 0.9184006217467591


In [3]:
pip install gradio


Collecting gradio
  Downloading gradio-5.9.0-py3-none-any.whl.metadata (16 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.6-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.4.0-py3-none-any.whl.metadata (2.9 kB)
Collecting gradio-client==1.5.2 (from gradio)
  Downloading gradio_client-1.5.2-py3-none-any.whl.metadata (7.1 kB)
Collecting markupsafe~=2.0 (from gradio)
  Downloading MarkupSafe-2.1.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.19-py3-none-any.whl.metadata (1.8 kB)
Collecting ruff>=0.2.2 (from gradio)
  Downloading ruff-0.8.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metad

In [4]:
import gradio as gr
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Fake News Detection") \
    .master("local[*]") \
    .getOrCreate()

# Load your trained model (assuming you have saved the model in the previous steps)
model = PipelineModel.load("fake_news_detection_model")

# Function to predict if the news is true or fake
def predict_news(text):
    # Prepare the input as a DataFrame for Spark
    input_data = spark.createDataFrame([(text,)], ["tweet"])

    # Transform the input data with the model pipeline
    predictions = model.transform(input_data)

    # Get the prediction (binary label 0 or 1)
    predicted_label = predictions.select("prediction").head()[0]

    # Map the label to True or False (assuming 0 is fake, 1 is real)
    result = "Real News" if predicted_label == 1 else "Fake News"

    return result

# Create the Gradio Interface
iface = gr.Interface(fn=predict_news,
                     inputs=gr.Textbox(label="Enter News Text", lines=2, placeholder="Type news here..."),
                     outputs="text",
                     title="Fake News Detection",
                     description="Enter a news article and the model will predict whether it's Real or Fake.")

# Launch the interface
iface.launch()


Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://b507a091a7e25f243e.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




In [None]:
# prompt: give me the shape

from pyspark.sql import SparkSession

# Initialize Spark Session (if not already initialized)
spark = SparkSession.builder.appName("Check Data Shape").getOrCreate()

# Load the data
file_path = "Constraint_Train.csv"  # Replace with your actual file path
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Print the shape (number of rows and columns)
print(f"Number of rows: {data.count()}")
print(f"Number of columns: {len(data.columns)}")

Number of rows: 7022
Number of columns: 3
