<a href="https://colab.research.google.com/github/Shahanas2003/Capstone_project/blob/main/Real_Time_News_Sentiment_Classification_and_Dashboard_using_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
import requests, json, time, uuid, os
from datetime import datetime

API_KEY = "pub_6a78f4ac6f74492996375d87da57ae7f"
NEWS_API_URL = f"https://newsdata.io/api/1/news?apikey={API_KEY}&language=en&country=us"
SAVE_DIR = "/content/news_stream"
os.makedirs(SAVE_DIR, exist_ok=True)

def fetch_and_save_news():
    try:
        resp = requests.get(NEWS_API_URL)
        news = resp.json().get("results", [])
        file_path = os.path.join(SAVE_DIR, f"{uuid.uuid4()}.json")
        with open(file_path, "w") as f:
            for article in news:
                json.dump({
                    "title": article.get("title", ""),
                    "pubDate": article.get("pubDate", "")
                }, f)
                f.write("\n")
        print(f"Saved {len(news)} articles at {datetime.now()}")
    except Exception as e:
        print("Error:", e)

# Fetch once (for testing)
fetch_and_save_news()


Saved 10 articles at 2025-09-28 06:40:57.356807


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("NewsSentiment").getOrCreate()

# Sample labeled training data
train_data = [
    ("Stock market hits all-time high", "positive"),
    ("War breaks out in eastern region", "negative"),
    ("Company reports strong quarterly earnings", "positive"),
    ("Earthquake leaves thousands homeless", "negative"),
]

schema = StructType([
    StructField("text", StringType(), True),
    StructField("label", StringType(), True),
])

df_train = spark.createDataFrame(train_data, schema)

# ML pipeline
label_indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tf = HashingTF(inputCol="filtered", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="labelIndex")

pipeline = Pipeline(stages=[label_indexer, tokenizer, remover, tf, idf, lr])
model = pipeline.fit(df_train)

# Save trained model
model.save("/content/news_sentiment_model")


In [14]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml import PipelineModel

# Load model
model = PipelineModel.load("/content/news_sentiment_model")

# Schema for streaming data
stream_schema = StructType([
    StructField("title", StringType(), True),
    StructField("pubDate", StringType(), True)
])

# Read stream
stream_df = spark.readStream.schema(stream_schema).json("/content/news_stream")

# Rename for consistency
stream_df = stream_df.withColumnRenamed("title", "text")

# Apply model
predictions = model.transform(stream_df)

# Select only useful columns
result = predictions.select("text", "prediction", "pubDate")

# Write predictions to folder
query = result.writeStream.outputMode("append") \
    .format("json") \
    .option("path", "/content/predictions") \
    .option("checkpointLocation", "/content/checkpoint") \
    .start()


In [22]:
%%writefile app.py
import streamlit as st
import pandas as pd
import glob, json

st.set_page_config(layout="wide")
st.title("📰 Real-Time News Sentiment Dashboard")

files = glob.glob("/content/predictions/*.json")
data = []

for file in files:
    with open(file) as f:
        for line in f:
            data.append(json.loads(line))

if not data:
    st.warning("Waiting for predictions...")
else:
    df = pd.DataFrame(data)
    df['sentiment'] = df['prediction'].apply(lambda x: 'Positive' if x == 1.0 else 'Negative')

    st.write("### Latest Predictions")
    st.table(df[['text', 'pubDate', 'sentiment']].tail(10))

    st.write("### Sentiment Distribution")
    st.bar_chart(df['sentiment'].value_counts())


Writing app.py
