In [2]:
import requests
import json
import os
from textblob import TextBlob
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, udf
from pyspark.sql.types import FloatType, StringType
import matplotlib.pyplot as plt

In [3]:
# --------------------------
# CONFIG
# --------------------------
GNEWS_API_KEY = '380af545145de6a25107d08ac9c4ac9c'
QUERY = 'stock market'
RAW_JSON_PATH = "news_data.json"
CLEANED_JSON_PATH = "cleaned_news_output"
SENTIMENT_JSON_PATH = "sentiment_output"

In [4]:
# --------------------------
# STEP 1: Fetch news
# --------------------------
def fetch_news():
    print("\n[Step 1] Fetching News from GNews API...")
    url = f"https://gnews.io/api/v4/search?q={QUERY}&lang=en&max=50&token={GNEWS_API_KEY}"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        articles = data.get("articles", [])

        with open(RAW_JSON_PATH, "w", encoding="utf-8") as f:
            for article in articles:
                f.write(json.dumps(article) + "\n")

        print(f"✅ Fetched and saved {len(articles)} articles.")
    else:
        print(f"❌ Error fetching data: {response.status_code}, {response.text}")

In [5]:
# --------------------------
# STEP 2: Preprocessing
# --------------------------
def preprocess_articles(spark):
    print("\n[Step 2] Preprocessing News Data with PySpark...")
    df = spark.read.json(RAW_JSON_PATH)
    cleaned_df = df.select("title", "description") \
        .withColumn("text", lower(regexp_replace(col("description"), "[^a-zA-Z\\s]", "")))

    cleaned_df.write.mode("overwrite").json(CLEANED_JSON_PATH)
    print(f"✅ Saved cleaned data to {CLEANED_JSON_PATH}")
    return cleaned_df

In [6]:
# --------------------------
# STEP 3: Sentiment Analysis
# --------------------------
def get_sentiment(text):
    if text:
        return TextBlob(text).sentiment.polarity
    return 0.0


def classify_sentiment(score):
    if score > 0.1:
        return "Positive"
    elif score < -0.1:
        return "Negative"
    else:
        return "Neutral"


def run_sentiment_analysis(spark):
    print("\n[Step 3] Running Sentiment Analysis...")

    sentiment_udf = udf(get_sentiment, FloatType())
    label_udf = udf(classify_sentiment, StringType())

    df = spark.read.json(CLEANED_JSON_PATH)

    df_with_polarity = df.withColumn("polarity", sentiment_udf(df.text))
    scored_df = df_with_polarity.withColumn("sentiment", label_udf(col("polarity"))) \
                                .drop("description")

    scored_df.write.mode("overwrite").json(SENTIMENT_JSON_PATH)
    print(f"✅ Saved sentiment results to '{SENTIMENT_JSON_PATH}'")
    scored_df.select("text", "polarity", "sentiment").show(truncate=100)

    return scored_df

In [7]:
# --------------------------
# STEP 4: Simple Summary
# --------------------------
def print_sentiment_summary(scored_df):
    counts = scored_df.groupBy("sentiment").count().collect()
    print("\n📊 Sentiment Summary:")
    for row in counts:
        print(f"{row['sentiment']}: {row['count']} articles")


In [8]:
# --------------------------
# STEP 5: Visualization
# --------------------------
def plot_sentiment_pie(scored_df):
    print("\n📈 [Step 5] Generating Pie Chart Visualization...")

    counts = scored_df.groupBy("sentiment").count().collect()
    sentiment_counts = {row["sentiment"]: row["count"] for row in counts}

    labels = []
    sizes = []
    colors = {"Positive": "#66BC6A", "Neutral": "#FFA726", "Negative": "#EF5350"}

    for sentiment in ["Positive", "Neutral", "Negative"]:
        count = sentiment_counts.get(sentiment, 0)
        if count > 0:
            labels.append(f"{sentiment} ({count})")
            sizes.append(count)

    if sizes:
        plt.figure(figsize=(6, 6))
        plt.pie(sizes, labels=labels, autopct='%1.1f%%',
                startangle=140, colors=[colors[label.split()[0]] for label in labels])
        plt.title("News Sentiment Distribution")
        plt.axis("equal")
        plt.tight_layout()
        plt.show()
    else:
        print("⚠️ No data to plot.")

In [9]:
# --------------------------
# MAIN
# --------------------------
if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("RealTimeNewsSentimentPipeline") \
        .master("local[*]") \
        .getOrCreate()

    fetch_news()
    preprocess_articles(spark)
    scored_df = run_sentiment_analysis(spark)
    print_sentiment_summary(scored_df)
    plot_sentiment_pie(scored_df)

    spark.stop()


[Step 1] Fetching News from GNews API...
✅ Fetched and saved 10 articles.

[Step 2] Preprocessing News Data with PySpark...


Py4JJavaError: An error occurred while calling o48.json.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:817)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1415)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1620)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:739)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:961)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2078)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2122)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:194)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:481)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
	at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
	at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:352)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
