In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install pyspark==3.5.1 pandas requests python-dotenv



In [2]:
import os
import pandas as pd
import requests
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RealTimeNewsSentiment").getOrCreate()
spark

In [3]:
import getpass
API_KEY = getpass.getpass("Enter your NewsAPI key (it will be hidden): ")

Enter your NewsAPI key (it will be hidden): ··········


In [4]:
def fetch_latest_news(api_key, page_size=50):
    url = f"https://newsapi.org/v2/top-headlines?language=en&pageSize={page_size}&apiKey={api_key}"
    response = requests.get(url)
    data = response.json().get("articles", [])
    headlines = [(a["title"], a.get("description", "")) for a in data if a.get("title")]
    return pd.DataFrame(headlines, columns=["headline", "description"])

df_news = fetch_latest_news(API_KEY)
df_news.head()

Unnamed: 0,headline,description
0,Czech Republic: Billionaire populist Andrej Ba...,"However, his ANO party falls short of an overa..."
1,'Best Brazilian fighter to ever exist': Alex P...,"Make ""Poatan"" mad at your own risk."
2,Mariners can't come up with big hit late in AL...,The previous postseason game in Seattle produc...
3,Bitcoin Rises to a Record as ‘Debasement’ Trad...,Bitcoin set another all-time high as a broader...
4,Judge temporarily blocks Trump administration ...,A federal judge in Oregon has temporarily bloc...


In [5]:
df_spark = spark.createDataFrame(df_news)
df_spark.show(5, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|headline                                                                                                                          |description                                                                                                                                                                                                                                                         |
+----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------

In [6]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Example training data
train_data = spark.createDataFrame([
    ("Stocks are soaring today after positive market news", 1),
    ("Company profits hit record highs", 1),
    ("Global markets are crashing amid inflation fears", 0),
    ("Investors are worried about economic slowdown", 0),
    ("Tech sector reports major growth", 1),
    ("Unemployment rates rise unexpectedly", 0),
], ["headline", "label"])

# Pipeline setup
tokenizer = Tokenizer(inputCol="headline", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])

# Train the model
model = pipeline.fit(train_data)


In [7]:
predictions = model.transform(df_spark)
predictions.select("headline", "prediction").show(10, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------+----------+
|headline                                                                                                                          |prediction|
+----------------------------------------------------------------------------------------------------------------------------------+----------+
|Czech Republic: Billionaire populist Andrej Babis' party wins parliamentary election - BBC                                        |0.0       |
|'Best Brazilian fighter to ever exist': Alex Pereira's unreal KO of Magmomed Ankalaev at UFC 320 sets off MMA world - Yahoo Sports|0.0       |
|Mariners can't come up with big hit late in ALDS opener vs. Tigers - The Seattle Times                                            |1.0       |
|Bitcoin Rises to a Record as ‘Debasement’ Trade Spurs Risk Rally - Bloomberg.com                                                  |1.0 

In [8]:
preds_pd = predictions.select("headline", "prediction").toPandas()
preds_pd["Sentiment"] = preds_pd["prediction"].map({1: "🟢 Positive", 0: "🔴 Negative"})
preds_pd.head(10)

Unnamed: 0,headline,prediction,Sentiment
0,Czech Republic: Billionaire populist Andrej Ba...,0.0,🔴 Negative
1,'Best Brazilian fighter to ever exist': Alex P...,0.0,🔴 Negative
2,Mariners can't come up with big hit late in AL...,1.0,🟢 Positive
3,Bitcoin Rises to a Record as ‘Debasement’ Trad...,1.0,🟢 Positive
4,Judge temporarily blocks Trump administration ...,1.0,🟢 Positive
5,Recruits react to FSU's loss to Miami - 247Sports,0.0,🔴 Negative
6,Federal agents knock down elderly couple durin...,0.0,🔴 Negative
7,Security video helps lead Oregon detectives to...,1.0,🟢 Positive
8,The Women Who Helped Defeat ISIS Are Fighting ...,0.0,🔴 Negative
9,"Exclusive — Trump on Gaza deal push: ""I said, ...",1.0,🟢 Positive
