# Hacker News Sentiment Analysis with Kafka + PySpark + MongoDB

## Project Overview

This notebook demonstrates a complete big data pipeline:

1. **Data Collection**: Stream Hacker News posts + comments via API
2. **Kafka Streaming**: Push posts to Kafka topic. We will be using cloud kafka with confluence, as otherwise this would not be possible.
3. **PySpark Processing**: 
   - Subscribe to Kafka stream
   - Fetch comments for each post
   - Calculate sentiment scores on comments
   - Aggregate overall post sentiment
4. **MongoDB Integration**: Store enriched results with sentiment analysis
5. **Analytics**: Query and visualize sentiment trends

### Use Case
**Predict community reaction to HN posts** by analyzing comment sentiment in real-time.

### What This Notebook Demonstrates:

1. **Kafka Streaming** - Real-time data ingestion
2. **PySpark DataFrames** - Distributed data processing
3. **Spark SQL** - SQL queries on streaming data
4. **MongoDB Integration** - NoSQL persistence
5. **NLP/Sentiment Analysis** - TextBlob for comment analysis
6. **Windowed Aggregations** - Time-based analytics
7. **UDFs** - Custom sentiment calculation functions
8. **Pipeline Architecture** - Complete streaming + batch analytics

## 0. Confluent Cloud Configuration



In [1]:
# Activate venv
!source .venv/bin/activate

In [2]:
# We need to downgrade pyspark to this version
!pip install pyspark==3.5.0
!pip install kafka-python textblob pymongo




In [3]:
# Install required packages
!pip install -r requirements_streming.txt

# Download TextBlob corpora 
!python -m textblob.download_corpora

[nltk_data] Downloading package brown to
[nltk_data]     /teamspace/studios/this_studio/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     /teamspace/studios/this_studio/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /teamspace/studios/this_studio/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /teamspace/studios/this_studio/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package conll2000 to
[nltk_data]     /teamspace/studios/this_studio/nltk_data...
[nltk_data]   Package conll2000 is already up-to-date!
[nltk_data] Downloading package movie_reviews to
[nltk_data]     /teamspace/studios/this_studio/nltk_data...
[nltk_data]   Package movie_reviews is alr

In [4]:
import os
import sys


# MongoDB Configuration. The Passwords are hidden for security reasons, as I was unable to set up a .env file for demonstration purposes
MONGO_URI = "mongodb+srv://carolina:PASSWORD@cluster0.32afvrm.mongodb.net/?appName=Cluster0"
MONGO_DATABASE = "hackernews_analytics"
MONGO_COLLECTION = "post_sentiment"

# Confluent Cloud Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"  
KAFKA_TOPIC = "hackernews_posts"
KAFKA_SASL_USERNAME = "5SJO32G3QUXNEE6P" 
KAFKA_SASL_PASSWORD = PASSWORD

# Hacker News API
HN_API_BASE = "https://hacker-news.firebaseio.com/v0"


# Configure Java for PySpark
JAVA_HOME_LINUX = "/usr/lib/jvm/java-17-openjdk-amd64"
JAVA_HOME_MACOS = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"

if os.path.exists(JAVA_HOME_LINUX):
    JAVA_HOME = JAVA_HOME_LINUX
    print("Detected Linux environment (Lightning AI)")
elif os.path.exists(JAVA_HOME_MACOS):
    JAVA_HOME = JAVA_HOME_MACOS
    print("✓ Detected macOS environment (local)")
else:
    print("Java not found. Installing...")
    import subprocess
    subprocess.run(["apt-get", "update"], check=False, capture_output=True)
    subprocess.run(["apt-get", "install", "-y", "openjdk-17-jdk"], check=False, capture_output=True)
    JAVA_HOME = JAVA_HOME_LINUX
    print("Java 17 installed")

os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PATH"] = f"{JAVA_HOME}/bin:{os.environ.get('PATH', '')}"

# Set environment variables
os.environ["MONGO_URI"] = MONGO_URI
os.environ["MONGO_DATABASE"] = MONGO_DATABASE
os.environ["MONGO_COLLECTION"] = MONGO_COLLECTION
os.environ["KAFKA_BOOTSTRAP_SERVERS"] = KAFKA_BOOTSTRAP_SERVERS
os.environ["KAFKA_TOPIC"] = KAFKA_TOPIC
if KAFKA_SASL_USERNAME:
    os.environ["KAFKA_SASL_USERNAME"] = KAFKA_SASL_USERNAME
if KAFKA_SASL_PASSWORD:
    os.environ["KAFKA_SASL_PASSWORD"] = KAFKA_SASL_PASSWORD

USE_KAFKA_AUTH = bool(KAFKA_SASL_USERNAME and KAFKA_SASL_PASSWORD)

print("=" * 80)
print("CONFLUENT CLOUD CONFIGURATION")
print("=" * 80)
print(f"Python: {sys.version.split()[0]}")
print(f"Java: {JAVA_HOME.split('/')[-1]}")
print(f"MongoDB: {MONGO_DATABASE}.{MONGO_COLLECTION}")
print(f"Kafka Bootstrap: {KAFKA_BOOTSTRAP_SERVERS}")
print(f"Kafka Topic: {KAFKA_TOPIC}")
print(f"Kafka Auth: {'✓ Enabled (SASL_SSL)' if USE_KAFKA_AUTH else '✗ NOT CONFIGURED - Set API Key and Secret!'}")
print("=" * 80)

# Validation
if not USE_KAFKA_AUTH:
    print("\n  WARNING: Confluent Cloud credentials not set!")
    print("")
    print("To set up Confluent Cloud:")
    print("1. Go to https://confluent.cloud")
    print("2. Sign up (free, no credit card)")
    print("3. Create 'Basic' cluster")
    print("4. Create topic: hackernews_posts")
    print("5. Generate API keys (Cluster → API Keys)")
    print("6. Get Bootstrap server (Cluster Settings)")
    print("7. Update Cell 0 above with your credentials")
    print("")
    print("See KAFKA_CLOUD_SETUP.md for detailed instructions")
    print("")
else:
    print("\n ✓ Confluent Cloud configured! Ready to connect.\n")


✓ Detected Linux environment (Lightning AI)
CONFLUENT CLOUD CONFIGURATION
Python: 3.12.11
Java: java-17-openjdk-amd64
MongoDB: hackernews_analytics.post_sentiment
Kafka Bootstrap: pkc-921jm.us-east-2.aws.confluent.cloud:9092
Kafka Topic: hackernews_posts
Kafka Auth: ✓ Enabled (SASL_SSL)

✅ Confluent Cloud configured! Ready to connect.



In [5]:
# Import libraries
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructType, StringType, IntegerType, StructField, 
    TimestampType, DoubleType, ArrayType
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode, col, avg, count, lit

import requests
import json
import time
from datetime import datetime
from kafka import KafkaProducer
from textblob import TextBlob
import pymongo

import re
from html import unescape

# Configuration
KAFKA_TOPIC = "hackernews_posts"
HN_API_BASE = "https://hacker-news.firebaseio.com/v0"

print("All libraries imported successfully!")

✓ All libraries imported successfully!


## 1. MongoDB Setup

Configure MongoDB connection for storing sentiment analysis results.

In [6]:
# MongoDB Configuration
MONGO_URI = "mongodb+srv://carolina:carolina@cluster0.32afvrm.mongodb.net/?appName=Cluster0"
MONGO_DATABASE = "hackernews_analytics"
MONGO_COLLECTION = "post_sentiment"

print("MongoDB configuration set")

MongoDB configuration set


In [7]:
# Test MongoDB connection
try:
    client = pymongo.MongoClient(MONGO_URI)
    # Test connection
    client.admin.command('ping')
    print("✓ Successfully connected to MongoDB")
    
    # Create database and collection
    db = client[MONGO_DATABASE]
    collection = db[MONGO_COLLECTION]
    
    print(f"✓ Database: {MONGO_DATABASE}")
    print(f"✓ Collection: {MONGO_COLLECTION}")
    
    client.close()
except Exception as e:
    print(f"✗ MongoDB connection failed: {e}")
    print("Make sure to update your credentials above!")

✓ Successfully connected to MongoDB
✓ Database: hackernews_analytics
✓ Collection: post_sentiment


## 2. Initialize Spark Session with Kafka + MongoDB Support

In [8]:
# Initialize Spark with both Kafka and MongoDB connectors
spark = SparkSession \
    .builder \
    .appName("HackerNews_Sentiment_MongoDB") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages',
            'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,'
            'org.mongodb.spark:mongo-spark-connector_2.12:10.4.0') \
    .config("spark.mongodb.read.connection.uri", MONGO_URI) \
    .config("spark.mongodb.write.connection.uri", MONGO_URI) \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"✓ Spark {spark.version} initialized with Kafka + MongoDB connectors")

:: loading settings :: url = jar:file:/system/conda/miniconda3/envs/cloudspace/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/zeus/.ivy2/cache
The jars for the packages stored in: /home/zeus/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-430aba07-b642-4355-968f-4e7c14086bb2;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central

✓ Spark 3.5.0 initialized with Kafka + MongoDB connectors


## 3. Hacker News API Functions

Functions to fetch posts and comments from HN API

In [9]:
def fetch_item(item_id):
    """
    Fetch a single item (post or comment) from HN API.
    """
    try:
        response = requests.get(f"{HN_API_BASE}/item/{item_id}.json")
        return response.json() if response.status_code == 200 else None
    except:
        return None


def fetch_comments(comment_ids, max_comments=20):
    """
    Fetch multiple comments by their IDs.
    Returns list of comment texts.
    """
    if not comment_ids:
        return []
    
    comments = []
    for cid in comment_ids[:max_comments]:  # Limit to avoid rate limits
        comment = fetch_item(cid)
        if comment and comment.get('text'):
            comments.append(comment['text'])
        time.sleep(0.05)  # Rate limiting
    
    return comments


def calculate_comment_sentiment(comments):
    """
    Calculate average sentiment from list of comments.
    Returns: (avg_sentiment, sentiment_label, comment_count)
    """
    
    
    if not comments:
        return 0.0, "neutral", 0
    
    sentiments = []
    for comment_text in comments:
        try:
            # Clean HTML entities and tags from HN comments
            cleaned = unescape(comment_text)  # Convert &lt; to <, etc.
            cleaned = re.sub('<[^<]+?>', '', cleaned)  # Remove HTML tags
            cleaned = cleaned.strip()
            
            if not cleaned:
                continue
            
            blob = TextBlob(cleaned)
            sentiments.append(blob.sentiment.polarity)
        except Exception as e:
            # Print error for debugging
            print(f"Warning: Failed to analyze comment: {str(e)[:50]}")
            continue
    
    if not sentiments:
        return 0.0, "neutral", 0
    
    avg_sentiment = sum(sentiments) / len(sentiments)
    
    # Classify sentiment
    if avg_sentiment > 0.1:
        label = "positive"
    elif avg_sentiment < -0.1:
        label = "negative"
    else:
        label = "neutral"
    
    return avg_sentiment, label, len(sentiments)


print("✓ HN API functions defined")

✓ HN API functions defined


## 4. Kafka Producer - Stream HN Posts with Comment Sentiment

In [10]:
def hn_sentiment_producer(num_posts=50, delay=3):
    """
    Fetch HN posts, analyze comment sentiment, and stream to Kafka.
    
    For each post:
    1. Fetch post metadata
    2. Fetch top comments
    3. Calculate sentiment from comments
    4. Stream enriched data to Kafka
    """
    # Configure Kafka producer
    producer_config = {
        'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
        'value_serializer': lambda v: json.dumps(v).encode('utf-8')
    }
    
    # Add SASL authentication if configured
    if USE_KAFKA_AUTH:
        producer_config.update({
            'security_protocol': 'SASL_SSL',
            'sasl_mechanism': 'PLAIN',
            'sasl_plain_username': KAFKA_SASL_USERNAME,
            'sasl_plain_password': KAFKA_SASL_PASSWORD
        })
    
    producer = KafkaProducer(**producer_config)
    
    print("Fetching latest HN stories...")
    story_ids = requests.get(f"{HN_API_BASE}/topstories.json").json()[:num_posts]
    
    print(f"Processing {len(story_ids)} posts with sentiment analysis...")
    print("-" * 80)
    
    successful = 0
    
    for idx, story_id in enumerate(story_ids, 1):
        try:
            # Fetch post
            story = fetch_item(story_id)
            
            if not story or story.get('type') != 'story':
                continue
            
            # Extract post metadata
            title = story.get('title', '')
            url = story.get('url', '')
            score = story.get('score', 0)
            by = story.get('by', 'unknown')
            comment_ids = story.get('kids', [])
            comment_count = len(comment_ids)
            
            # Fetch and analyze comments
            print(f"[{idx}/{len(story_ids)}] Analyzing: {title[:50]}...")
            comments = fetch_comments(comment_ids, max_comments=20)
            
            avg_sentiment, sentiment_label, analyzed_comments = calculate_comment_sentiment(comments)
            
            # Create enriched message
            message = {
                "post_id": story_id,
                "title": title,
                "url": url,
                "score": score,
                "author": by,
                "total_comments": comment_count,
                "analyzed_comments": analyzed_comments,
                "avg_sentiment_score": round(avg_sentiment, 4),
                "sentiment_label": sentiment_label,
                "timestamp": datetime.now().isoformat(),
                "sample_comments": comments[:3]  # Store sample for reference
            }
            
            # Send to Kafka
            producer.send(KAFKA_TOPIC, message)
            successful += 1
            
            print(f"  → Score: {score} | Comments: {analyzed_comments}/{comment_count} | "
                  f"Sentiment: {sentiment_label} ({avg_sentiment:.3f})")
            
            if idx % 10 == 0:
                print(f"\n Progress: {idx}/{len(story_ids)} ({successful} successful)\n")
            
            time.sleep(delay)
            
        except Exception as e:
            print(f"  ✗ Error processing story {story_id}: {e}")
            continue
    
    producer.flush()
    producer.close()
    
    print("-" * 80)
    print(f"\n Finished! Streamed {successful} posts with sentiment analysis to Kafka")


print("✓ Kafka producer function defined")

✓ Kafka producer function defined


In [11]:
# Test Kafka connection
try:
    # Configure producer with authentication if needed
    kafka_config = {
        'bootstrap_servers': KAFKA_BOOTSTRAP_SERVERS,
        'request_timeout_ms': 5000,
        'api_version_auto_timeout_ms': 5000
    }
    
    # Add SASL authentication if using cloud Kafka
    if USE_KAFKA_AUTH:
        kafka_config.update({
            'security_protocol': 'SASL_SSL',
            'sasl_mechanism': 'PLAIN',
            'sasl_plain_username': KAFKA_SASL_USERNAME,
            'sasl_plain_password': KAFKA_SASL_PASSWORD
        })
    
    test_producer = KafkaProducer(**kafka_config)
    test_producer.close()
    print("✓ Successfully connected to Kafka")
    print(f"  Bootstrap Servers: {KAFKA_BOOTSTRAP_SERVERS}")
    print(f"  Authentication: {'Enabled (SASL_SSL)' if USE_KAFKA_AUTH else 'Disabled'}")
except Exception as e:
    print(f"✗ Could not connect to Kafka: {e}")

✓ Successfully connected to Kafka
  Bootstrap Servers: pkc-921jm.us-east-2.aws.confluent.cloud:9092
  Authentication: Enabled (SASL_SSL)


---
## STEP 1: PRODUCE DATA FIRST

**Now we'll produce data to Kafka BEFORE setting up consumers.**

This follows the logical flow:
1. Setup  (MongoDB, Spark, API functions)
2. **Produce data** - Stream HN posts with sentiment to Kafka
3. ⏭Then consume - Set up PySpark to read from Kafka

**What this does:**
- Fetches 30 Hacker News posts from the API
- Analyzes comment sentiment for each post
- Streams enriched data to Kafka topic `hackernews_posts`

---

In [12]:
# Start streaming posts with sentiment analysis
# Uncomment and run when ready:

hn_sentiment_producer(num_posts=30, delay=3)

Fetching latest HN stories...
Processing 30 posts with sentiment analysis...
--------------------------------------------------------------------------------
[1/30] Analyzing: Size of Life...
  → Score: 1138 | Comments: 20/79 | Sentiment: positive (0.270)
[2/30] Analyzing: Australia begins enforcing world-first teen social...
  → Score: 434 | Comments: 20/85 | Sentiment: neutral (0.033)
[3/30] Analyzing: Getting a Gemini API key is an exercise in frustra...
  → Score: 88 | Comments: 17/19 | Sentiment: positive (0.136)
[4/30] Analyzing: Super Mario 64 for the PS1...
  → Score: 123 | Comments: 9/9 | Sentiment: neutral (0.051)
[5/30] Analyzing: Auto-grading decade-old Hacker News discussions wi...
  → Score: 223 | Comments: 19/39 | Sentiment: positive (0.181)
[6/30] Analyzing: When would you ever want bubblesort? (2023)...
  → Score: 29 | Comments: 10/10 | Sentiment: neutral (0.006)
[7/30] Analyzing: How Google Maps allocates survival across London's...
  → Score: 61 | Comments: 7/7 | Sen

## 5. PySpark Streaming - Read from Kafka

In [13]:
# Define schema for Kafka messages
hn_sentiment_schema = StructType([
    StructField("post_id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("url", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("author", StringType(), True),
    StructField("total_comments", IntegerType(), True),
    StructField("analyzed_comments", IntegerType(), True),
    StructField("avg_sentiment_score", DoubleType(), True),
    StructField("sentiment_label", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("sample_comments", ArrayType(StringType()), True)
])

print("✓ Schema defined")

✓ Schema defined


In [14]:
# Configure Kafka options for Spark
kafka_options = {
    "kafka.bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
    "subscribe": KAFKA_TOPIC,
    "startingOffsets": "earliest"
}

# Add SASL configuration if using authenticated Kafka
if USE_KAFKA_AUTH:
    kafka_options.update({
        "kafka.security.protocol": "SASL_SSL",
        "kafka.sasl.mechanism": "PLAIN",
        "kafka.sasl.jaas.config": f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{KAFKA_SASL_USERNAME}" password="{KAFKA_SASL_PASSWORD}";'
    })

# Subscribe to Kafka topic
streaming_df = spark.readStream.format("kafka") \
    .options(**kafka_options) \
    .load()

print("✓ Connected to Kafka stream")

✓ Connected to Kafka stream


In [15]:
# Parse JSON from Kafka
parsed_df = streaming_df.select(
    F.from_json(F.col("value").cast("string"), hn_sentiment_schema).alias("data")
).select("data.*")

# Convert timestamp
parsed_df = parsed_df.withColumn(
    "timestamp",
    F.to_timestamp(F.col("timestamp"))
)

# Add derived columns
enriched_df = parsed_df \
    .withColumn("engagement_ratio", 
                F.col("total_comments") / (F.col("score") + 1)) \
    .withColumn("is_controversial", 
                (F.col("total_comments") > 50) & (F.col("avg_sentiment_score") < 0)) \
    .withColumn("hour_of_day", F.hour("timestamp"))

print("✓ Parsing and enrichment configured")

✓ Parsing and enrichment configured


## 6. Write Stream to Console (Testing)

In [None]:
# Console output for debugging
console_query = enriched_df \
    .select(
        "title",
        "sentiment_label",
        "avg_sentiment_score",
        "score",
        "total_comments",
        "is_controversial"
    ) \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

print("✓ Console query started")

25/12/10 23:07:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-07563b93-cf14-41e0-a965-a58b3eaa6142. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/12/10 23:07:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


✓ Console query started


25/12/10 23:07:40 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


In [17]:
# Stop console query when done viewing
console_query.stop()

## 7. Write Stream to MongoDB

Store sentiment analysis results in MongoDB for persistence and historical analysis

In [18]:
def write_to_mongodb(batch_df, batch_id):
    """
    Write each micro-batch to MongoDB.
    This function is called for each batch of streaming data.
    """
    if batch_df.isEmpty():
        return
    
    try:
        # Write to MongoDB using the connector
        batch_df.write \
            .format("mongodb") \
            .mode("append") \
            .option("database", MONGO_DATABASE) \
            .option("collection", MONGO_COLLECTION) \
            .save()
        
        count = batch_df.count()
        print(f" Batch {batch_id}: Wrote {count} records to MongoDB")
        
    except Exception as e:
        print(f"✗ Error writing batch {batch_id} to MongoDB: {e}")


print("✓ MongoDB write function defined")

✓ MongoDB write function defined


In [19]:
# Start streaming to MongoDB
mongodb_query = enriched_df \
    .writeStream \
    .foreachBatch(write_to_mongodb) \
    .outputMode("append") \
    .option("checkpointLocation", "checkpoints/hn_mongodb") \
    .trigger(processingTime='10 seconds') \
    .start()

print("✓ MongoDB streaming query started")
print("Data will be written to MongoDB every 10 seconds")

✓ MongoDB streaming query started
Data will be written to MongoDB every 10 seconds


25/12/10 23:07:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


25/12/10 23:07:57 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/12/10 23:08:08 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 10983 milliseconds


✓ Batch 1: Wrote 29 records to MongoDB


## 8. Memory Tables for Real-time SQL Analytics

In [None]:
# Write to memory table for SQL queries
memory_query = enriched_df \
    .writeStream \
    .outputMode("append") \
    .queryName("hn_sentiment_live") \
    .format("memory") \
    .start()

print("✓ Memory table 'hn_sentiment_live' created")

✓ Memory table 'hn_sentiment_live' created


25/12/10 23:09:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9819ddad-7d49-460a-9ee1-7ba6e7191183. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/12/10 23:09:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


25/12/10 23:09:13 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

In [21]:
# Query sentiment distribution
spark.sql("""
    SELECT 
        sentiment_label,
        COUNT(*) as count,
        ROUND(AVG(avg_sentiment_score), 3) as avg_score,
        ROUND(AVG(score), 1) as avg_upvotes,
        ROUND(AVG(total_comments), 1) as avg_comments
    FROM hn_sentiment_live
    GROUP BY sentiment_label
    ORDER BY count DESC
""").show()

+---------------+-----+---------+-----------+------------+
|sentiment_label|count|avg_score|avg_upvotes|avg_comments|
+---------------+-----+---------+-----------+------------+
|        neutral|   33|     0.04|      116.4|        16.4|
|       positive|   30|    0.189|      156.3|        21.5|
+---------------+-----+---------+-----------+------------+



In [None]:
# Find controversial posts (negative sentiment + high engagement)
spark.sql("""
    SELECT 
        title,
        sentiment_label,
        avg_sentiment_score,
        score,
        total_comments
    FROM hn_sentiment_live
    WHERE is_controversial = true
    ORDER BY total_comments DESC
    LIMIT 10
""").show(truncate=60)

# There were no generally negative sentiment posts, oddly enough

+-----+---------------+-------------------+-----+--------------+
|title|sentiment_label|avg_sentiment_score|score|total_comments|
+-----+---------------+-------------------+-----+--------------+
+-----+---------------+-------------------+-----+--------------+



In [23]:
# Top positive posts
spark.sql("""
    SELECT 
        title,
        avg_sentiment_score,
        score,
        total_comments
    FROM hn_sentiment_live
    WHERE sentiment_label = 'positive'
    ORDER BY avg_sentiment_score DESC
    LIMIT 10
""").show(truncate=60)

+------------------------------------------------------------+-------------------+-----+--------------+
|                                                       title|avg_sentiment_score|score|total_comments|
+------------------------------------------------------------+-------------------+-----+--------------+
|I got an Nvidia GH200 server for €7.5k on Reddit and conv...|             0.3027|  132|            17|
|                                                Size of Life|             0.2838| 1101|            77|
|                                                Size of Life|             0.2704| 1138|            79|
|Gundam is just the same as Jane Austen but happens to inc...|             0.2484|  114|            20|
|Gundam is just the same as Jane Austen but happens to inc...|              0.236|  118|            20|
|                                  Factor 0.101 now available|             0.2345|   95|             7|
|                                  Factor 0.101 now available|  

## 9. Windowed Sentiment Analytics

In [24]:
# Windowed aggregations - sentiment trends over time
windowed_sentiment = enriched_df \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        F.window("timestamp", "10 minutes", "5 minutes"),
        "sentiment_label"
    ) \
    .agg(
        F.count("*").alias("post_count"),
        F.avg("avg_sentiment_score").alias("avg_sentiment"),
        F.avg("score").alias("avg_score"),
        F.avg("total_comments").alias("avg_comments"),
        F.sum(F.when(F.col("is_controversial"), 1).otherwise(0)).alias("controversial_count")
    )

In [None]:
# Write windowed results to memory
windowed_query = windowed_sentiment \
    .writeStream \
    .outputMode("complete") \
    .queryName("sentiment_trends") \
    .format("memory") \
    .start()

print("✓ Windowed analytics started")

✓ Windowed analytics started


25/12/10 23:09:57 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6e4ede63-0de0-431f-b396-92b3494fcd34. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/12/10 23:09:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


25/12/10 23:09:57 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

In [26]:
# Query windowed results
spark.sql("""
    SELECT 
        window.start,
        window.end,
        sentiment_label,
        post_count,
        ROUND(avg_sentiment, 3) as avg_sentiment,
        ROUND(avg_score, 1) as avg_score,
        controversial_count
    FROM sentiment_trends
    ORDER BY window.start DESC, post_count DESC
""").show(truncate=False)

+-------------------+-------------------+---------------+----------+-------------+---------+-------------------+
|start              |end                |sentiment_label|post_count|avg_sentiment|avg_score|controversial_count|
+-------------------+-------------------+---------------+----------+-------------+---------+-------------------+
|2025-12-10 23:05:00|2025-12-10 23:15:00|neutral        |15        |0.036        |125.0    |0                  |
|2025-12-10 23:05:00|2025-12-10 23:15:00|positive       |14        |0.181        |159.9    |0                  |
|2025-12-10 23:00:00|2025-12-10 23:10:00|neutral        |15        |0.036        |125.0    |0                  |
|2025-12-10 23:00:00|2025-12-10 23:10:00|positive       |14        |0.181        |159.9    |0                  |
|2025-12-10 22:45:00|2025-12-10 22:55:00|neutral        |16        |0.044        |113.8    |0                  |
|2025-12-10 22:45:00|2025-12-10 22:55:00|positive       |12        |0.195        |84.8     |0   

## 10. Read from MongoDB - Analytics on Stored Data

In [29]:
# Read data back from MongoDB
mongo_df = spark.read \
    .format("mongodb") \
    .option("database", MONGO_DATABASE) \
    .option("collection", MONGO_COLLECTION) \
    .load()

print(f"Records in MongoDB: {mongo_df.count()}")
mongo_df.printSchema()

Records in MongoDB: 92
root
 |-- _id: string (nullable = true)
 |-- analyzed_comments: integer (nullable = true)
 |-- author: string (nullable = true)
 |-- avg_sentiment_score: double (nullable = true)
 |-- engagement_ratio: double (nullable = true)
 |-- hour_of_day: integer (nullable = true)
 |-- is_controversial: boolean (nullable = true)
 |-- post_id: integer (nullable = true)
 |-- sample_comments: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- score: integer (nullable = true)
 |-- sentiment_label: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- total_comments: integer (nullable = true)
 |-- url: string (nullable = true)



In [30]:
# Show sample data from MongoDB
mongo_df.select(
    "title",
    "sentiment_label",
    "avg_sentiment_score",
    "score",
    "total_comments"
).show(10, truncate=60)

+------------------------------------------------------------+---------------+-------------------+-----+--------------+
|                                                       title|sentiment_label|avg_sentiment_score|score|total_comments|
+------------------------------------------------------------+---------------+-------------------+-----+--------------+
|                                        Frank Gehry has died|       positive|             0.1013|   46|            10|
|                              Netflix to Acquire Warner Bros|        neutral|              0.098| 1310|           168|
|                       Cloudflare outage on December 5, 2025|        neutral|             0.0689|  472|            70|
|                     Gemini 3 Pro: the frontier of vision AI|       positive|             0.1513|  258|            27|
|A $20 drug in Europe requires a prescription and $800 in ...|        neutral|             0.0894|   86|            17|
|                Idempotency Keys for Ex

In [31]:
# Sentiment distribution from MongoDB
mongo_df.groupBy("sentiment_label") \
    .agg(
        F.count("*").alias("count"),
        F.round(F.avg("avg_sentiment_score"), 3).alias("avg_sentiment"),
        F.round(F.avg("score"), 1).alias("avg_upvotes")
    ) \
    .orderBy(F.desc("count")) \
    .show()

+---------------+-----+-------------+-----------+
|sentiment_label|count|avg_sentiment|avg_upvotes|
+---------------+-----+-------------+-----------+
|       positive|   47|        0.174|      152.5|
|        neutral|   45|        0.045|      151.3|
+---------------+-----+-------------+-----------+



## 11. MongoDB Aggregation Queries with Spark

Demonstrate MongoDB's aggregation pipeline pushdown capabilities with Spark.

This section shows how Spark can leverage MongoDB's aggregation framework to:
- Push down complex queries to MongoDB (reducing data transfer)
- Perform server-side filtering and aggregation
- Utilize MongoDB's powerful aggregation operators

### 11.1 Simple Filter Query - Find Positive Posts

Use MongoDB's `$match` operator to filter documents before loading into Spark.

In [36]:
# Example 1: Filter positive sentiment posts using MongoDB aggregation pipeline
# This demonstrates query pushdown - filtering happens in MongoDB, not Spark

mongo_db = "hackernews_analytics"
mongo_coll = "post_sentiment"

# Define aggregation pipeline as a list of dictionaries
positive_pipeline = [
    {
        # STAGE 1: Match only positive sentiment posts
        "$match": {
            "sentiment_label": "positive",
            "avg_sentiment_score": {"$gt": 0.15}  # Very positive posts
        }
    },
    {
        # STAGE 2: Sort by sentiment score descending (before grouping)
        "$sort": {"avg_sentiment_score": -1}
    },
    {
        # STAGE 3: Group by title to get unique titles
        # Keep the document with highest sentiment score for each title
        "$group": {
            "_id": "$title",
            "title": {"$first": "$title"},
            "sentiment_label": {"$first": "$sentiment_label"},
            "avg_sentiment_score": {"$first": "$avg_sentiment_score"},
            "score": {"$first": "$score"},
            "total_comments": {"$first": "$total_comments"}
        }
    },
    {
        # STAGE 4: Project only needed fields (remove _id from group)
        "$project": {
            "_id": 0,
            "title": 1,
            "sentiment_label": 1,
            "avg_sentiment_score": 1,
            "score": 1,
            "total_comments": 1
        }
    },
    {
        # STAGE 5: Sort again by sentiment score descending
        "$sort": {"avg_sentiment_score": -1}
    },
    {
        # STAGE 6: Limit to top 10
        "$limit": 10
    }
]

# Read from MongoDB with aggregation pipeline
positive_df = (spark.read.format("mongodb")
    .option("spark.mongodb.read.connection.uri", MONGO_URI)
    .option("database", mongo_db)
    .option("collection", mongo_coll)
    .option("aggregation.pipeline", positive_pipeline)  # Pipeline as list
    .load())

print("✓ Top 10 most positive posts (unique titles):")
positive_df.show(10, truncate=60)

✓ Top 10 most positive posts (unique titles):
+-------------------+-----+---------------+------------------------------------------------------------+--------------+
|avg_sentiment_score|score|sentiment_label|                                                       title|total_comments|
+-------------------+-----+---------------+------------------------------------------------------------+--------------+
|             0.3027|  132|       positive|I got an Nvidia GH200 server for €7.5k on Reddit and conv...|            17|
|             0.2838| 1101|       positive|                                                Size of Life|            77|
|             0.2484|  114|       positive|Gundam is just the same as Jane Austen but happens to inc...|            20|
|             0.2362|  149|       positive|Synadia and TigerBeetle Pledge $512k to the Zig Software ...|            12|
|             0.2345|   91|       positive|                                  Factor 0.101 now available|          

### 11.2 Engagement Analysis

Analyze engagement patterns across different sentiment categories.

In [33]:
# Example 2: Engagement analysis by sentiment and score buckets
engagement_pipeline = [
    {
        # STAGE 1: Filter valid posts
        "$match": {
            "sentiment_label": {"$in": ["positive", "neutral"]},
            "score": {"$gt": 0}
        }
    },
    {
        # STAGE 2: Add computed fields
        "$addFields": {
            "score_bucket": {
                "$switch": {
                    "branches": [
                        {"case": {"$lt": ["$score", 50]}, "then": "Low (0-50)"},
                        {"case": {"$lt": ["$score", 200]}, "then": "Medium (50-200)"},
                        {"case": {"$gte": ["$score", 200]}, "then": "High (200+)"}
                    ],
                    "default": "Unknown"
                }
            }
        }
    },
    {
        # STAGE 3: Group by sentiment and score bucket
        "$group": {
            "_id": {
                "sentiment": "$sentiment_label",
                "bucket": "$score_bucket"
            },
            "post_count": {"$sum": 1},
            "avg_sentiment_score": {"$avg": "$avg_sentiment_score"},
            "avg_comments": {"$avg": "$total_comments"},
            "avg_upvotes": {"$avg": "$score"}
        }
    },
    {
        # STAGE 4: Sort
        "$sort": {"_id.sentiment": 1, "_id.bucket": 1}
    },
    {
        # STAGE 5: Reshape
        "$project": {
            "_id": 0,
            "sentiment_label": "$_id.sentiment",
            "score_bucket": "$_id.bucket",
            "post_count": 1,
            "avg_sentiment_score": {"$round": ["$avg_sentiment_score", 3]},
            "avg_comments": {"$round": ["$avg_comments", 1]},
            "avg_upvotes": {"$round": ["$avg_upvotes", 1]}
        }
    }
]

engagement_df = (spark.read.format("mongodb")
    .option("spark.mongodb.read.connection.uri", MONGO_URI)
    .option("database", mongo_db)
    .option("collection", mongo_coll)
    .option("aggregation.pipeline", engagement_pipeline)
    .load())

print("✓ Engagement analysis by sentiment and score buckets:")
engagement_df.show(20, truncate=False)

✓ Engagement analysis by sentiment and score buckets:
+------------+-------------------+-----------+----------+---------------+---------------+
|avg_comments|avg_sentiment_score|avg_upvotes|post_count|score_bucket   |sentiment_label|
+------------+-------------------+-----------+----------+---------------+---------------+
|63.8        |0.064              |468.0      |10        |High (200+)    |neutral        |
|6.6         |0.034              |22.0       |17        |Low (0-50)     |neutral        |
|12.7        |0.044              |97.6       |18        |Medium (50-200)|neutral        |
|42.8        |0.182              |467.1      |9         |High (200+)    |positive       |
|6.3         |0.165              |28.7       |9         |Low (0-50)     |positive       |
|16.2        |0.174              |93.3       |29        |Medium (50-200)|positive       |
+------------+-------------------+-----------+----------+---------------+---------------+



## 12. Cleanup

In [37]:
# Check active streaming queries
for query in spark.streams.active:
    print(f"Active query: {query.name} - Status: {query.status}")

Active query: None - Status: {'message': 'Waiting for next trigger', 'isDataAvailable': False, 'isTriggerActive': False}
Active query: hn_sentiment_live - Status: {'message': 'Getting offsets from KafkaV2[Subscribe[hackernews_posts]]', 'isDataAvailable': False, 'isTriggerActive': True}
Active query: sentiment_trends - Status: {'message': 'Waiting for data to arrive', 'isDataAvailable': False, 'isTriggerActive': False}


In [38]:
# Stop all streaming queries
for query in spark.streams.active:
    print(f"Stopping query: {query.name}")
    query.stop()

print("All streaming queries stopped.")

Stopping query: None
Stopping query: hn_sentiment_live
Stopping query: sentiment_trends
All streaming queries stopped.


In [39]:
# Stop Spark session
spark.stop()