<a href="https://colab.research.google.com/github/Shahanas2003/Real-Time-News-Sentiment-Classification/blob/main/Real_Time_News_Sentiment_Dashboard.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import requests

# -----------------------------
# NewsData.io API Configuration
# -----------------------------
API_KEY = "pub_7c7f72f816dc47c28a889f0c0a5b371f"
BASE_URL = "https://newsdata.io/api/1/news"

# Allowed categories
VALID_CATEGORIES = ["business", "entertainment", "environment", "food", "health",
                    "politics", "science", "sports", "technology", "top"]

def fetch_news(api_key, language="en", category="top"):
    if category not in VALID_CATEGORIES:
        raise ValueError(f"Invalid category '{category}'. Valid options: {VALID_CATEGORIES}")

    params = {
        "apikey": api_key,
        "language": language,
        "category": category
    }
    response = requests.get(BASE_URL, params=params)
    if response.status_code == 200:
        return response.json().get("results", [])
    else:
        print(f"Error fetching news: {response.status_code}, {response.text}")
        return []

# -----------------------------
# Fetch global top news
# -----------------------------
articles = fetch_news(API_KEY, category="top")

# -----------------------------
# Display fetched news
# -----------------------------
if articles:
    for idx, article in enumerate(articles, start=1):
        print(f"{idx}. {article.get('title')}")
        print(f"   Description: {article.get('description')}")
        print(f"   Link: {article.get('link')}")
        print(f"   Published: {article.get('pubDate')}")
        print(f"   Source: {article.get('source_id')}")
        print("-" * 80)
else:
    print("No articles fetched.")


1. Malta Guinness Partners Iri Ji Festival in Lagos
   Description: Malta Guinness has announced its partnership with this year’s Iri Ji Festival, scheduled for Sunday, September 28, at FHA Ground, Festac. The event will bring together families and community members
   Link: https://www.thisdaylive.com/2025/09/27/malta-guinness-partners-iri-ji-festival-in-lagos/
   Published: 2025-09-27 21:07:00
   Source: thisdaylive
--------------------------------------------------------------------------------
2. Could the United Nations be dying?
   Description: The UN tried to reach for the heavens. But now we must wonder whether it has sunk to the pits...
   Link: https://nation.africa/kenya/blogs-opinion/opinion/could-the-united-nations-be-dying--5208482
   Published: 2025-09-27 21:07:00
   Source: nation
--------------------------------------------------------------------------------
3. Cyclist dies in crash with vehicle north of Longmont
   Description: The cyclist was pronounced dead at the 

In [15]:
# -----------------------------
# Step 2: News Sentiment Classification
# -----------------------------

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Initialize Spark
spark = SparkSession.builder.appName("NewsSentimentClassification").getOrCreate()

# -----------------------------
# 1. Labeled Training Data
# 0 = Negative, 1 = Positive
# -----------------------------
data = [
    ("Stock markets crash amid economic fears", 0),
    ("Local community celebrates festival with joy", 1),
    ("Earthquake kills hundreds in city", 0),
    ("New vaccine brings hope to millions", 1),
    ("Government faces backlash over corruption scandal", 0),
    ("Breakthrough in clean energy technology announced", 1)
]

columns = ["title", "label"]
df = spark.createDataFrame(data, columns)

# -----------------------------
# 2. Build Pipeline
# -----------------------------
# Clean text for training data
df_clean = df.withColumn("clean_title", lower(regexp_replace(col("title"), "[^a-zA-Z0-9\\s]", "")))

# Pipeline stages
tokenizer = Tokenizer(inputCol="clean_title", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# -----------------------------
# 3. Train the model
# -----------------------------
model = pipeline.fit(df_clean)

# -----------------------------
# 4. Test Predictions
# -----------------------------
test_data = [
    ("The economy shows signs of recovery",),
    ("Massive floods destroy farmlands",),
    ("Breakthrough in cancer treatment gives hope",)
]
test_df = spark.createDataFrame(test_data, ["title"])

# Apply the same cleaning step to the test data
test_df_clean = test_df.withColumn("clean_title", lower(regexp_replace(col("title"), "[^a-zA-Z0-9\\s]", "")))

predictions = model.transform(test_df_clean)
predictions.select("title", "prediction").show(truncate=False)

+-------------------------------------------+----------+
|title                                      |prediction|
+-------------------------------------------+----------+
|The economy shows signs of recovery        |0.0       |
|Massive floods destroy farmlands           |0.0       |
|Breakthrough in cancer treatment gives hope|1.0       |
+-------------------------------------------+----------+



In [16]:
import os
import json
from datetime import datetime

SAVE_DIR = "/content/news_stream_folder"
os.makedirs(SAVE_DIR, exist_ok=True)

def save_articles(articles):
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = os.path.join(SAVE_DIR, f"news_{ts}.json")
    with open(filename, "w", encoding="utf-8") as f:
        for article in articles:
            json.dump({"title": article.get("title", "")}, f)
            f.write("\n")
    print(f"Saved {len(articles)} articles → {filename}")


In [17]:
from pyspark.sql.types import StructType, StringType

# Define schema for streaming JSON files
schema = StructType().add("title", StringType(), True)

# Read stream
news_stream = spark.readStream.schema(schema).json(SAVE_DIR)


In [19]:
from pyspark.sql.functions import col, lower, regexp_replace

# Apply the same cleaning step to the news stream
news_stream_clean = news_stream.withColumn("clean_title", lower(regexp_replace(col("title"), "[^a-zA-Z0-9\\s]", "")))

# The trained 'model' from Step 2
predictions_stream = model.transform(news_stream_clean).select("title", "prediction")

In [24]:
# Output to console for debugging

# Stop any existing query with the same name
for q in spark.streams.active:
    if q.name == "news_predictions":
        q.stop()
        print(f"Stopped existing query: {q.name}")

query = predictions_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("news_predictions") \
    .start()

# Keep the stream running for a limited time for this example
# In a real application, you might want a different termination strategy
query.awaitTermination(timeout=60)

Stopped existing query: news_predictions


False

In [25]:
! pip install streamlit



In [31]:
# app.py
%%writefile app.py

import streamlit as st
import pandas as pd
import os
import json
import requests
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.sql.types import StructType, StringType
from pyspark.ml import PipelineModel
from streamlit_autorefresh import st_autorefresh

# -----------------------------
# 1. Initialize Spark and load model
# -----------------------------
@st.experimental_singleton
def init_spark():
    spark = SparkSession.builder.appName("NewsSentimentDashboard").getOrCreate()
    model = PipelineModel.load("/content/news_sentiment_model")  # update path if needed
    return spark, model

spark, model = init_spark()

# -----------------------------
# 2. NewsData.io API
# -----------------------------
API_KEY = "pub_7c7f72f816dc47c28a889f0c0a5b371f"
BASE_URL = "https://newsdata.io/api/1/news"
SAVE_DIR = "news_stream_folder"
os.makedirs(SAVE_DIR, exist_ok=True)

VALID_CATEGORIES = ["business","entertainment","environment","food","health",
                    "politics","science","sports","technology","top"]

def fetch_news(category="top"):
    params = {"apikey": API_KEY, "language":"en", "category":category}
    r = requests.get(BASE_URL, params=params)
    if r.status_code == 200:
        return r.json().get("results", [])
    return []

def save_articles(articles):
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = os.path.join(SAVE_DIR, f"news_{ts}.json")
    with open(filename, "w", encoding="utf-8") as f:
        for article in articles:
            json.dump({"title": article.get("title","")}, f)
            f.write("\n")

# -----------------------------
# 3. Spark Structured Streaming
# -----------------------------
schema = StructType().add("title", StringType(), True)
news_stream = spark.readStream.schema(schema).json(SAVE_DIR)
predictions_stream = model.transform(news_stream).select("title", "prediction")

# Memory sink for dashboard
query = predictions_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("news_predictions") \
    .start()

# -----------------------------
# 4. Streamlit Dashboard
# -----------------------------
st.title("Real-Time News Sentiment Dashboard")

category = st.selectbox("Select News Category:", VALID_CATEGORIES)
if st.button("Fetch Latest News"):
    articles = fetch_news(category)
    if articles:
        save_articles(articles)
        st.success(f"Fetched and saved {len(articles)} articles!")
    else:
        st.warning("No articles fetched.")

# Auto-refresh every 5 seconds
st_autorefresh(interval=5000, limit=None, key="news_refresh")

# Function to read memory table
def get_latest_predictions():
    try:
        sdf = spark.sql("SELECT * FROM news_predictions")
        return sdf.toPandas()
    except:
        return pd.DataFrame(columns=["title", "prediction"])

df = get_latest_predictions()
if not df.empty:
    df["Sentiment"] = df["prediction"].map({0:"Negative", 1:"Positive"})
    st.bar_chart(df["Sentiment"].value_counts())
    st.dataframe(df[["title","Sentiment"]].tail(20))
else:
    st.write("No news processed yet.")


Overwriting app.py
