In [7]:
!pip install pyspark requests pyarrow streamlit nltk vaderSentiment -q


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/126.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m126.0/126.0 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [8]:
import requests
import pandas as pd
from datetime import datetime
import nltk
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NewsSentimentColab").getOrCreate()

print("Spark version:", spark.version)


Spark version: 3.5.1


NewsAPI fetch function

In [9]:
# Replace with your API key
NEWSAPI_KEY = "56835162ca1a40828d0773a213c0d26b"

def fetch_news_newsapi(q="technology OR ai OR climate", page_size=20):
    url = "https://newsapi.org/v2/everything"
    params = {
        "q": q,
        "pageSize": page_size,
        "language": "en",
        "sortBy": "publishedAt",
        "apiKey": NEWSAPI_KEY
    }
    r = requests.get(url, params=params, timeout=20)
    r.raise_for_status()
    items = []
    for a in r.json().get("articles", []):
        items.append({
            "title": a.get("title"),
            "description": a.get("description"),
            "source": a.get("source", {}).get("name"),
            "publishedAt": a.get("publishedAt"),
            "url": a.get("url"),
            "fetchedAt": datetime.utcnow().isoformat()
        })
    return items

# Quick test
fetched = fetch_news_newsapi(q="technology", page_size=5)
pd.DataFrame(fetched)


  "fetchedAt": datetime.utcnow().isoformat()


Unnamed: 0,title,description,source,publishedAt,url,fetchedAt
0,"BSNL Launches Indigenous 4G, Eyes 5G",BSNL Launches 4G Network: A Boost for Digital ...,Nep123.com,2025-09-28T14:49:20Z,https://nep123.com/bsnl-launches-indigenous-4g...,2025-09-29T14:53:42.150755
1,Wattbike Atom,Very solid smart bike with good power accuracy...,road.cc,2025-09-28T14:45:00Z,https://road.cc/content/review/wattbike-atom-3...,2025-09-29T14:53:42.150783
2,"Swipes, Screenshots, & Success: AI InnoVision ...",The “Queen of AI” is opening up about how a de...,Bossip,2025-09-28T14:42:28Z,https://bossip.com/3915318/alicia-lyttle-queen...,2025-09-29T14:53:42.150791
3,"Business leaders call for tax relief, access t...",Business leaders have a wide-ranging wish list...,CBC News,2025-09-28T14:40:30Z,https://www.cbc.ca/news/canada/toronto/sunday-...,2025-09-29T14:53:42.150797
4,Canada Joins The Global Push For Sovereign AI ...,TELUS launched Canada’s first fully sovereign ...,Forbes,2025-09-28T14:40:10Z,https://www.forbes.com/sites/ronschmelzer/2025...,2025-09-29T14:53:42.150803


Weak-label dataset with VADER

In [10]:
nltk.download("vader_lexicon")
analyzer = SentimentIntensityAnalyzer()

def label_with_vader(text):
    if not text:
        return None
    s = analyzer.polarity_scores(str(text))
    comp = s["compound"]
    return 1 if comp >= 0.05 else 0  # 1 = positive, 0 = negative

# Fetch multiple queries to build dataset
queries = ["technology", "ai", "climate", "economy"]
all_records = []
for q in queries:
    all_records.extend(fetch_news_newsapi(q=q, page_size=20))

df = pd.DataFrame(all_records).dropna(subset=["title"]).reset_index(drop=True)
df["label"] = df["title"].apply(label_with_vader)
df = df.rename(columns={"title":"text"})
df = df[["text","label","description","source","publishedAt","url","fetchedAt"]]

df.to_csv("labeled_headlines.csv", index=False)
df.head()


[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
  "fetchedAt": datetime.utcnow().isoformat()


Unnamed: 0,text,label,description,source,publishedAt,url,fetchedAt
0,"BSNL Launches Indigenous 4G, Eyes 5G",0,BSNL Launches 4G Network: A Boost for Digital ...,Nep123.com,2025-09-28T14:49:20Z,https://nep123.com/bsnl-launches-indigenous-4g...,2025-09-29T14:55:29.411599
1,Wattbike Atom,0,Very solid smart bike with good power accuracy...,road.cc,2025-09-28T14:45:00Z,https://road.cc/content/review/wattbike-atom-3...,2025-09-29T14:55:29.411637
2,"Swipes, Screenshots, & Success: AI InnoVision ...",1,The “Queen of AI” is opening up about how a de...,Bossip,2025-09-28T14:42:28Z,https://bossip.com/3915318/alicia-lyttle-queen...,2025-09-29T14:55:29.411647
3,"Business leaders call for tax relief, access t...",1,Business leaders have a wide-ranging wish list...,CBC News,2025-09-28T14:40:30Z,https://www.cbc.ca/news/canada/toronto/sunday-...,2025-09-29T14:55:29.411654
4,Canada Joins The Global Push For Sovereign AI ...,0,TELUS launched Canada’s first fully sovereign ...,Forbes,2025-09-28T14:40:10Z,https://www.forbes.com/sites/ronschmelzer/2025...,2025-09-29T14:55:29.411661


Train PySpark ML pipeline

In [18]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, isnan, when, sum

# -----------------------------
# Load labeled dataset into Spark
# -----------------------------
labeled_df = spark.read.csv("labeled_headlines.csv", header=True, inferSchema=True)

# Drop rows with null or NaN labels from the original DataFrame
labeled_df = labeled_df.filter(col("label").isNotNull() & ~isnan(col("label")))

# Convert 'label' column to integer (required by PySpark ML)
labeled_df = labeled_df.withColumn("label", col("label").cast("integer"))

print("Data types after cast:")
labeled_df.printSchema()
print("Total rows after dropping null and NaN labels:", labeled_df.count())

# -----------------------------
# Split train/test
# -----------------------------
train, test = labeled_df.randomSplit([0.8,0.2], seed=42)
print("Train count before dropping null/NaN labels:", train.count())
print("Test count:", test.count())

# Explicitly drop rows with null or NaN labels from the training DataFrame
train = train.filter(col("label").isNotNull() & ~isnan(col("label")))
print("Train count after dropping null/NaN labels:", train.count())


# Add this to check for nulls/NaNs in the training data before fitting
print("Checking train DataFrame for null/NaN labels before training:")
train.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in train.columns]).show()
train.select([sum(when(isnan(col(c)), 1).otherwise(0)).alias(c) for c in train.columns]).show()


# -----------------------------
# Define ML pipeline
# -----------------------------
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hash_tf = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20)

pipeline = Pipeline(stages=[tokenizer, remover, hash_tf, idf, lr])

# -----------------------------
# Train the model
# -----------------------------
model = pipeline.fit(train)

# -----------------------------
# Evaluate on test set
# -----------------------------
preds = model.transform(test)

# Drop rows with null labels from predictions before evaluation
preds = preds.filter(col("label").isNotNull())

evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(preds)
print("Test AUC:", auc)

# -----------------------------
# Save the trained pipeline
# -----------------------------
MODEL_DIR = "sentiment_model"
model.write().overwrite().save(MODEL_DIR)
print("Saved trained pipeline to", MODEL_DIR)

Data types after cast:
root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- source: string (nullable = true)
 |-- publishedAt: string (nullable = true)
 |-- url: string (nullable = true)
 |-- fetchedAt: string (nullable = true)

Total rows after dropping null and NaN labels: 77
Train count before dropping null/NaN labels: 61
Test count: 16
Train count after dropping null/NaN labels: 56
Checking train DataFrame for null/NaN labels before training:
+----+-----+-----------+------+-----------+---+---------+
|text|label|description|source|publishedAt|url|fetchedAt|
+----+-----+-----------+------+-----------+---+---------+
|   0|    0|          1|     6|          6|  6|        7|
+----+-----+-----------+------+-----------+---+---------+

+----+-----+-----------+------+-----------+---+---------+
|text|label|description|source|publishedAt|url|fetchedAt|
+----+-----+-----------+------+-----------+---+---------+
|   0|    

Predict on fresh headlines

In [19]:
from pyspark.ml import PipelineModel

# Load trained model
pipeline_model = PipelineModel.load(MODEL_DIR)

# Fetch fresh headlines
fresh = fetch_news_newsapi(q="technology OR ai OR climate", page_size=15)
fresh_df = pd.DataFrame(fresh).dropna(subset=["title"]).rename(columns={"title":"text"})

# Create Spark DataFrame
spark_fresh = spark.createDataFrame(fresh_df)

# Predict sentiment
predicted = pipeline_model.transform(spark_fresh)

# Select and format output
out = predicted.select("text","source","publishedAt","url","prediction","probability")

# Convert to Pandas
out_pd = out.toPandas()

# Add probability of positive class and sentiment label
out_pd["prob_pos"] = out_pd["probability"].apply(lambda v: float(v[1]))
out_pd["sentiment"] = out_pd["prediction"].map({0:"Negative",1:"Positive"})
out_pd = out_pd.drop(columns=["probability"])

# Save predictions for dashboard
out_pd.to_csv("predictions.csv", index=False)
out_pd.head()


  "fetchedAt": datetime.utcnow().isoformat()


Unnamed: 0,text,source,publishedAt,url,prediction,prob_pos,sentiment
0,CAPtured tech-art headpiece protects social me...,Designboom,2025-09-28T15:01:09Z,https://www.designboom.com/technology/captured...,1.0,0.501843,Positive
1,Ray-Ban Meta Gen 2 Review: Still the Best Non-...,Gizmodo.com,2025-09-28T15:00:40Z,https://gizmodo.com/ray-ban-meta-gen-2-review-...,0.0,0.000172,Negative
2,How Gen AI Could Benefit Neurodiverse Learners,Elearningindustry.com,2025-09-28T15:00:32Z,https://elearningindustry.com/how-gen-ai-could...,0.0,0.004318,Negative
3,Walmart CEO explains what the most coveted ski...,Fortune,2025-09-28T15:00:24Z,https://fortune.com/2025/09/28/walmart-ceo-dou...,0.0,0.26441,Negative
4,'The new normal': Wall Street says high stock ...,Yahoo Entertainment,2025-09-28T15:00:22Z,https://finance.yahoo.com/news/the-new-normal-...,0.0,0.000356,Negative


Check saved outputs

In [20]:
!ls -lh labeled_headlines.csv predictions.csv

import pandas as pd
pd.read_csv("predictions.csv").head()


-rw-r--r-- 1 root root  29K Sep 29 14:55 labeled_headlines.csv
-rw-r--r-- 1 root root 3.4K Sep 29 15:03 predictions.csv


Unnamed: 0,text,source,publishedAt,url,prediction,prob_pos,sentiment
0,CAPtured tech-art headpiece protects social me...,Designboom,2025-09-28T15:01:09Z,https://www.designboom.com/technology/captured...,1.0,0.501843,Positive
1,Ray-Ban Meta Gen 2 Review: Still the Best Non-...,Gizmodo.com,2025-09-28T15:00:40Z,https://gizmodo.com/ray-ban-meta-gen-2-review-...,0.0,0.000172,Negative
2,How Gen AI Could Benefit Neurodiverse Learners,Elearningindustry.com,2025-09-28T15:00:32Z,https://elearningindustry.com/how-gen-ai-could...,0.0,0.004318,Negative
3,Walmart CEO explains what the most coveted ski...,Fortune,2025-09-28T15:00:24Z,https://fortune.com/2025/09/28/walmart-ceo-dou...,0.0,0.26441,Negative
4,'The new normal': Wall Street says high stock ...,Yahoo Entertainment,2025-09-28T15:00:22Z,https://finance.yahoo.com/news/the-new-normal-...,0.0,0.000356,Negative
