# Reddit Real-Time Analytics - Consumer (Spark Streaming)

**Team Members:** Alina Insam, Sumedh Bamane, Rafael Machado Da Rocha, Kaan Ak

**Project Description:** This consumer receives Germany-related Reddit comments via socket, processes them with Spark Streaming to extract insights including:
- TF-IDF analysis for important words
- Reference extraction (users, subreddits, URLs)
- Sentiment analysis
- Real-time metrics and statistics

**Architecture:**
1. Receive streaming data via socket
2. Store raw data in Spark table "raw"
3. Process data for analytics
4. Store results in Spark table "metrics"
5. Save results to disk
6. Generate visualizations

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
import json
import time
from datetime import datetime, timedelta
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from textblob import TextBlob
import warnings
warnings.filterwarnings('ignore')

print("📦 All libraries imported successfully!")

In [2]:
# 1. Create Spark session
spark = SparkSession.builder \
    .appName("RedditStreamConsumer") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/29 09:06:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, DoubleType

# Create Spark Session with optimized configuration
spark = SparkSession.builder \
    .appName("RedditGermanyAnalytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"✅ Spark Session created: {spark.version}")
print(f"🔧 Spark UI available at: {spark.sparkContext.uiWebUrl}")

schema = StructType() \
    .add("text", StringType()) \
    .add("created_utc", DoubleType()) \
    .add("link", StringType())

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType

# Define comprehensive schema for incoming Reddit data
schema = StructType([
    StructField("id", StringType(), True),
    StructField("text", StringType(), True),
    StructField("created_utc", DoubleType(), True),
    StructField("author", StringType(), True),
    StructField("subreddit", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("link", StringType(), True),
    StructField("user_mentions", ArrayType(StringType()), True),
    StructField("subreddit_references", ArrayType(StringType()), True),
    StructField("urls", ArrayType(StringType()), True),
    StructField("sentiment", StructType([
        StructField("compound", DoubleType(), True),
        StructField("positive", DoubleType(), True),
        StructField("negative", DoubleType(), True),
        StructField("neutral", DoubleType(), True)
    ]), True),
    StructField("word_count", IntegerType(), True),
    StructField("char_count", IntegerType(), True),
    StructField("timestamp", StringType(), True)
])

print("📋 Schema defined with all fields from producer")

raw_stream = spark.readStream \
    .format("socket") \
    .option("host", "host.docker.internal") \
    .option("port", 9998) \
    .load()


25/05/29 09:06:29 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [None]:
# Read streaming data from socket
raw_stream = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9998) \
    .option("includeTimestamp", "true") \
    .load()

print("🔌 Socket stream configured (localhost:9998)")
print("⚠️  Make sure the producer is running before starting the stream!")

# 4. Parse JSON strings from producer
json_df = raw_stream.select(from_json(col("value"), schema).alias("data")).select("data.*")

In [None]:
from pyspark.sql.functions import from_json, col, from_unixtime, monotonically_increasing_id

# 5. Print only if there is data
def process_batch(df, epoch_id):
    count = df.count()
    if count > 0:
        print(f"\n⏰ Received {count} comment(s) in batch {epoch_id}")
        df.show(truncate=False)

# Parse JSON data and add processing timestamp
parsed_df = raw_stream.select(
    from_json(col("value"), schema).alias("data"),
    col("timestamp").alias("processing_time")
).select("data.*", "processing_time")

# Add additional computed columns
processed_df = parsed_df.withColumn("created_datetime", 
                                   from_unixtime(col("created_utc")).cast("timestamp")) \
                      .withColumn("processing_datetime", 
                                 col("processing_time").cast("timestamp")) \
                      .withColumn("batch_id", monotonically_increasing_id())

print("🔄 JSON parsing and timestamp processing configured")

In [None]:
# 6. Start stream
# Setup buffers and output directories before starting
raw_data_buffer = []
metrics_buffer = []
batch_counter = 0
os.makedirs("output/raw_data", exist_ok=True)
os.makedirs("output/metrics", exist_ok=True)
os.makedirs("output/visualizations", exist_ok=True)

# Start streaming query on processed DataFrame
query = processed_df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", "./checkpoint") \
    .start()

# Keep the query running
query.awaitTermination()

# Global variables for storing data
raw_data_buffer = []
metrics_buffer = []
batch_counter = 0

# Create output directories
os.makedirs("output/raw_data", exist_ok=True)
os.makedirs("output/metrics", exist_ok=True)
os.makedirs("output/visualizations", exist_ok=True)

def extract_top_words_tfidf(texts, top_n=10):
    """Extract top N words using TF-IDF"""
    try:
        if not texts or len(texts) == 0:
            return []
        
        # Create DataFrame from texts
        text_df = spark.createDataFrame([(i, text) for i, text in enumerate(texts)], 
                                       ["id", "text"])
        
        # TF-IDF Pipeline
        tokenizer = Tokenizer(inputCol="text", outputCol="words")
        remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
        hashingTF = HashingTF(inputCol="filtered_words", outputCol="tf_features", numFeatures=1000)
        idf = IDF(inputCol="tf_features", outputCol="tfidf_features")
        
        pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])
        model = pipeline.fit(text_df)
        result = model.transform(text_df)
        
        # Extract top words (simplified approach)
        words_freq = {}
        for row in result.select("filtered_words").collect():
            for word in row.filtered_words:
                if len(word) > 2:  # Skip short words
                    words_freq[word] = words_freq.get(word, 0) + 1
        
        # Return top N words
        top_words = sorted(words_freq.items(), key=lambda x: x[1], reverse=True)[:top_n]
        return top_words
        
    except Exception as e:
        print(f"❌ TF-IDF Error: {e}")
        return []

def analyze_references(df_batch):
    """Analyze user mentions, subreddit references, and URLs"""
    try:
        data = df_batch.collect()
        
        user_mentions = {}
        subreddit_refs = {}
        url_domains = {}
        
        for row in data:
            # Count user mentions
            if row.user_mentions:
                for user in row.user_mentions:
                    user_mentions[user] = user_mentions.get(user, 0) + 1
            
            # Count subreddit references
            if row.subreddit_references:
                for sub in row.subreddit_references:
                    subreddit_refs[sub] = subreddit_refs.get(sub, 0) + 1
            
            # Count URL domains
            if row.urls:
                for url in row.urls:
                    try:
                        domain = url.split('//')[1].split('/')[0]
                        url_domains[domain] = url_domains.get(domain, 0) + 1
                    except:
                        pass
        
        return {
            'top_user_mentions': sorted(user_mentions.items(), key=lambda x: x[1], reverse=True)[:10],
            'top_subreddit_refs': sorted(subreddit_refs.items(), key=lambda x: x[1], reverse=True)[:10],
            'top_url_domains': sorted(url_domains.items(), key=lambda x: x[1], reverse=True)[:10]
        }
    except Exception as e:
        print(f"❌ Reference analysis error: {e}")
        return {'top_user_mentions': [], 'top_subreddit_refs': [], 'top_url_domains': []}

def calculate_metrics(df_batch, batch_id):
    """Calculate comprehensive metrics for the batch"""
    global raw_data_buffer, metrics_buffer, batch_counter
    
    try:
        batch_counter += 1
        current_time = datetime.now()
        
        # Collect batch data
        batch_data = df_batch.collect()
        
        if not batch_data:
            print(f"📭 Batch {batch_id}: No data received")
            return
        
        print(f"\n🔄 Processing Batch {batch_id} - {len(batch_data)} comments")
        print(f"⏰ Timestamp: {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
        
        # Add to raw data buffer
        raw_data_buffer.extend(batch_data)
        
        # Keep only last 1000 records to manage memory
        if len(raw_data_buffer) > 1000:
            raw_data_buffer = raw_data_buffer[-1000:]
        
        # Basic statistics
        texts = [row.text for row in batch_data if row.text]
        sentiments = [row.sentiment.compound for row in batch_data if row.sentiment]
        scores = [row.score for row in batch_data if row.score is not None]
        subreddits = [row.subreddit for row in batch_data if row.subreddit]
        
        # Calculate metrics
        avg_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0
        avg_score = sum(scores) / len(scores) if scores else 0
        total_words = sum([row.word_count for row in batch_data if row.word_count])
        
        # Sentiment distribution
        positive_comments = len([s for s in sentiments if s > 0.1])
        negative_comments = len([s for s in sentiments if s < -0.1])
        neutral_comments = len([s for s in sentiments if -0.1 <= s <= 0.1])
        
        # Subreddit distribution
        subreddit_counts = {}
        for sub in subreddits:
            subreddit_counts[sub] = subreddit_counts.get(sub, 0) + 1
        
        # Get TF-IDF top words from all accumulated data
        all_texts = [row.text for row in raw_data_buffer if row.text]
        top_words_tfidf = extract_top_words_tfidf(all_texts[-500:])  # Use last 500 texts
        
        # Analyze references
        references = analyze_references(df_batch)
        
        # Time range
        created_times = [row.created_utc for row in batch_data if row.created_utc]
        time_range = {
            'start': min(created_times) if created_times else 0,
            'end': max(created_times) if created_times else 0,
            'span_minutes': (max(created_times) - min(created_times)) / 60 if len(created_times) > 1 else 0
        }
        
        # Create metrics record
        metrics = {
            'batch_id': batch_id,
            'timestamp': current_time.isoformat(),
            'total_comments': len(batch_data),
            'avg_sentiment': avg_sentiment,
            'sentiment_distribution': {
                'positive': positive_comments,
                'negative': negative_comments,
                'neutral': neutral_comments
            },
            'avg_score': avg_score,
            'total_words': total_words,
            'avg_words_per_comment': total_words / len(batch_data) if batch_data else 0,
            'subreddit_distribution': dict(sorted(subreddit_counts.items(), key=lambda x: x[1], reverse=True)[:5]),
            'top_words_tfidf': top_words_tfidf[:10],
            'references': references,
            'time_range': time_range,
            'processing_stats': {
                'total_batches_processed': batch_counter,
                'total_comments_accumulated': len(raw_data_buffer)
            }
        }
        
        metrics_buffer.append(metrics)
        
        # Print summary
        print(f"📊 Metrics Summary:")
        print(f"   💬 Comments: {len(batch_data)}")
        print(f"   😊 Avg Sentiment: {avg_sentiment:.3f}")
        print(f"   ⭐ Avg Score: {avg_score:.1f}")
        print(f"   📝 Total Words: {total_words}")
        print(f"   🏷️  Top Subreddits: {list(subreddit_counts.keys())[:3]}")
        if top_words_tfidf:
            print(f"   🔤 Top Words: {[word for word, count in top_words_tfidf[:5]]}")
        print(f"   👥 User Mentions: {len(references['top_user_mentions'])}")
        print(f"   🔗 URLs: {len(references['top_url_domains'])}")
        
        # Save data every 10 batches
        if batch_counter % 10 == 0:
            save_data_to_files()
        
        return metrics
        
    except Exception as e:
        print(f"❌ Error in calculate_metrics: {e}")
        return None

def save_data_to_files():
    """Save accumulated data to files"""
    try:
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        # Save raw data
        if raw_data_buffer:
            raw_df = spark.createDataFrame(raw_data_buffer)
            raw_df.write.mode('append').json(f"output/raw_data/raw_data_{timestamp}")
            raw_df.createOrReplaceTempView("raw")
            print(f"💾 Raw data saved: {len(raw_data_buffer)} records")
        
        # Save metrics
        if metrics_buffer:
            with open(f"output/metrics/metrics_{timestamp}.json", 'w') as f:
                json.dump(metrics_buffer, f, indent=2)
            
            # Create Spark DataFrame for metrics
            metrics_df = spark.createDataFrame([json.dumps(m) for m in metrics_buffer], StringType())
            metrics_df.createOrReplaceTempView("metrics")
            metrics_df.write.mode('append').text(f"output/metrics/spark_metrics_{timestamp}")
            print(f"📈 Metrics saved: {len(metrics_buffer)} records")
            
    except Exception as e:
        print(f"❌ Error saving data: {e}")

print("🔧 Processing functions defined successfully!")


25/05/29 09:06:35 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9ad2e73a-5ea1-4906-b2a6-95983b7b41ae. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/29 09:06:35 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/29 09:06:36 ERROR MicroBatchExecution: Query [id = 0c9f0ba2-1c19-40e9-92b3-173488d4c142, runId = 8aa6425c-7b63-4f3e-bda1-4b3e513c9e3f] terminated with error
java.net.ConnectException: Connection refused
	at java.base/sun.nio.ch.Net.connect0(Native Method)
	at java.base/sun.nio.ch.Net.connect(Net.java:579)
	at java.base/sun.nio.ch.Net.connect(Net.java:568)
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:593)
	at java.base/java.net.SocksSocketIm

StreamingQueryException: [STREAM_FAILED] Query [id = 0c9f0ba2-1c19-40e9-92b3-173488d4c142, runId = 8aa6425c-7b63-4f3e-bda1-4b3e513c9e3f] terminated with exception: Connection refused

In [None]:
# Main streaming processing function
def process_batch(df, epoch_id):
    """Process each batch of streaming data"""
    try:
        if df.count() == 0:
            print(f"📭 Batch {epoch_id}: No data")
            return
        
        print(f"\n" + "="*60)
        print(f"🔄 PROCESSING BATCH {epoch_id}")
        print(f"📅 Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        
        # Calculate and display metrics
        metrics = calculate_metrics(df, epoch_id)
        
        # Show sample data
        print(f"\n📋 Sample Comments:")
        sample_data = df.limit(3).collect()
        for i, row in enumerate(sample_data, 1):
            sentiment_emoji = "😊" if row.sentiment.compound > 0.1 else "😞" if row.sentiment.compound < -0.1 else "😐"
            print(f"   {i}. {sentiment_emoji} [{row.subreddit}] {row.text[:100]}...")
            print(f"      Score: {row.score}, Sentiment: {row.sentiment.compound:.3f}")
        
        print("="*60)
        
    except Exception as e:
        print(f"❌ Error processing batch {epoch_id}: {e}")

# Start the streaming query
print("🚀 Starting streaming query...")
print("📡 Waiting for data from Reddit producer...")
print("🛑 Press Ctrl+C to stop\n")

query = processed_df.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", "./checkpoint") \
    .trigger(processingTime='10 seconds') \
    .start()

print("✅ Streaming query started!")
print(f"🔍 Query ID: {query.id}")
print(f"📊 Status: {query.status}")

In [None]:
#Session 6 Spark streaming intro

# Monitoring and control functions
import threading
import time
from datetime import datetime

def monitor_stream():
    """Monitor the streaming query status"""
    while query.isActive:
        try:
            print(f"\n📊 STREAM STATUS - {datetime.now().strftime('%H:%M:%S')}")
            print(f"   🔄 Active: {query.status['isDataAvailable']}")
            print(f"   📈 Message: {query.status['message']}")
            print(f"   📊 Batches: {batch_counter}")
            print(f"   💾 Raw Data Buffer: {len(raw_data_buffer)} records")
            print(f"   📈 Metrics Buffer: {len(metrics_buffer)} records")
            
            if len(raw_data_buffer) > 0:
                recent_sentiment = sum([r.sentiment.compound for r in raw_data_buffer[-10:] if r.sentiment]) / min(10, len(raw_data_buffer))
                print(f"   😊 Recent Avg Sentiment: {recent_sentiment:.3f}")
            
            time.sleep(30)  # Check every 30 seconds
        except Exception as e:
            print(f"❌ Monitor error: {e}")
            break

# Start monitoring in background
monitor_thread = threading.Thread(target=monitor_stream, daemon=True)
monitor_thread.start()

print("📊 Stream monitoring started in background")
print("⏰ Status updates every 30 seconds")

In [None]:
# Wait for stream termination
try:
    print("\n🎯 Stream is running! Processing Reddit comments...")
    print("📊 Check the output above for real-time metrics")
    print("💾 Data is being saved to output/ directory")
    print("🛑 Press Ctrl+C to stop gracefully\n")
    
    query.awaitTermination()
    
except KeyboardInterrupt:
    print("\n🛑 Stopping stream gracefully...")
    query.stop()
    
    # Final save
    save_data_to_files()
    
    print("\n📋 FINAL SUMMARY:")
    print(f"📊 Total Batches Processed: {batch_counter}")
    print(f"💬 Total Comments Processed: {len(raw_data_buffer)}")
    print(f"📈 Total Metrics Records: {len(metrics_buffer)}")
    print(f"💾 Data saved to output/ directory")
    
    if len(raw_data_buffer) > 0:
        avg_sentiment = sum([r.sentiment.compound for r in raw_data_buffer if r.sentiment]) / len([r for r in raw_data_buffer if r.sentiment])
        print(f"😊 Overall Average Sentiment: {avg_sentiment:.3f}")
        
        subreddit_counts = {}
        for r in raw_data_buffer:
            if r.subreddit:
                subreddit_counts[r.subreddit] = subreddit_counts.get(r.subreddit, 0) + 1
        top_subreddits = sorted(subreddit_counts.items(), key=lambda x: x[1], reverse=True)[:5]
        print(f"🏷️  Top Subreddits: {[sub for sub, count in top_subreddits]}")
    
    print("\n✅ Stream stopped successfully!")
    
finally:
    spark.stop()
    print("🔥 Spark session stopped")

In [None]:
# Visualization Functions
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
import pandas as pd
from datetime import datetime
import json

sns.set_style("whitegrid")
plt.style.use('seaborn-v0_8')

def create_visualizations():
    """Create comprehensive visualizations of the processed data"""
    try:
        if not metrics_buffer:
            print("❌ No metrics data available for visualization")
            return
        
        print("📊 Creating visualizations...")
        
        # Create figure with subplots
        fig = plt.figure(figsize=(20, 15))
        
        # 1. Sentiment Analysis Over Time
        ax1 = plt.subplot(3, 3, 1)
        sentiments = [m['avg_sentiment'] for m in metrics_buffer]
        batches = [m['batch_id'] for m in metrics_buffer]
        plt.plot(batches, sentiments, 'b-o', linewidth=2, markersize=4)
        plt.title('Average Sentiment Over Time', fontsize=14, fontweight='bold')
        plt.xlabel('Batch ID')
        plt.ylabel('Sentiment Score')
        plt.axhline(y=0, color='r', linestyle='--', alpha=0.5)
        plt.grid(True, alpha=0.3)
        
        # 2. Comments Count Over Time
        ax2 = plt.subplot(3, 3, 2)
        comment_counts = [m['total_comments'] for m in metrics_buffer]
        plt.bar(batches, comment_counts, color='skyblue', alpha=0.7)
        plt.title('Comments Per Batch', fontsize=14, fontweight='bold')
        plt.xlabel('Batch ID')
        plt.ylabel('Number of Comments')
        plt.grid(True, alpha=0.3)
        
        # 3. Sentiment Distribution (Latest Batch)
        ax3 = plt.subplot(3, 3, 3)
        if metrics_buffer:
            latest_sentiment = metrics_buffer[-1]['sentiment_distribution']
            labels = ['Positive', 'Negative', 'Neutral']
            sizes = [latest_sentiment['positive'], latest_sentiment['negative'], latest_sentiment['neutral']]
            colors = ['#90EE90', '#FFB6C1', '#D3D3D3']
            plt.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
            plt.title('Sentiment Distribution (Latest Batch)', fontsize=14, fontweight='bold')
        
        # 4. Average Score Over Time
        ax4 = plt.subplot(3, 3, 4)
        avg_scores = [m['avg_score'] for m in metrics_buffer]
        plt.plot(batches, avg_scores, 'g-o', linewidth=2, markersize=4)
        plt.title('Average Reddit Score Over Time', fontsize=14, fontweight='bold')
        plt.xlabel('Batch ID')
        plt.ylabel('Average Score')
        plt.grid(True, alpha=0.3)
        
        # 5. Words Per Comment
        ax5 = plt.subplot(3, 3, 5)
        words_per_comment = [m['avg_words_per_comment'] for m in metrics_buffer]
        plt.plot(batches, words_per_comment, 'purple', linewidth=2, marker='s', markersize=4)
        plt.title('Average Words Per Comment', fontsize=14, fontweight='bold')
        plt.xlabel('Batch ID')
        plt.ylabel('Words per Comment')
        plt.grid(True, alpha=0.3)
        
        # 6. Top Subreddits (Latest Batch)
        ax6 = plt.subplot(3, 3, 6)
        if metrics_buffer and metrics_buffer[-1]['subreddit_distribution']:
            subreddit_data = metrics_buffer[-1]['subreddit_distribution']
            subs = list(subreddit_data.keys())[:5]
            counts = list(subreddit_data.values())[:5]
            plt.barh(subs, counts, color='orange', alpha=0.7)
            plt.title('Top Subreddits (Latest Batch)', fontsize=14, fontweight='bold')
            plt.xlabel('Number of Comments')
        
        # 7. TF-IDF Top Words (Latest Batch)
        ax7 = plt.subplot(3, 3, 7)
        if metrics_buffer and metrics_buffer[-1]['top_words_tfidf']:
            words_data = metrics_buffer[-1]['top_words_tfidf'][:10]
            if words_data:
                words = [item[0] for item in words_data]
                freqs = [item[1] for item in words_data]
                plt.barh(words, freqs, color='red', alpha=0.7)
                plt.title('Top Words (TF-IDF)', fontsize=14, fontweight='bold')
                plt.xlabel('Frequency')
        
        # 8. Processing Statistics
        ax8 = plt.subplot(3, 3, 8)
        total_comments = [m['processing_stats']['total_comments_accumulated'] for m in metrics_buffer]
        plt.plot(batches, total_comments, 'brown', linewidth=3, marker='D', markersize=4)
        plt.title('Cumulative Comments Processed', fontsize=14, fontweight='bold')
        plt.xlabel('Batch ID')
        plt.ylabel('Total Comments')
        plt.grid(True, alpha=0.3)
        
        # 9. Reference Analysis (Latest Batch)
        ax9 = plt.subplot(3, 3, 9)
        if metrics_buffer and metrics_buffer[-1]['references']:
            ref_data = metrics_buffer[-1]['references']
            ref_types = ['User Mentions', 'Subreddit Refs', 'URL Domains']
            ref_counts = [
                len(ref_data['top_user_mentions']),
                len(ref_data['top_subreddit_refs']),
                len(ref_data['top_url_domains'])
            ]
            plt.bar(ref_types, ref_counts, color=['blue', 'green', 'red'], alpha=0.7)
            plt.title('References Found (Latest Batch)', fontsize=14, fontweight='bold')
            plt.ylabel('Count')
            plt.xticks(rotation=45)
        
        plt.tight_layout()
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        plt.savefig(f'output/visualizations/reddit_analytics_{timestamp}.png', 
                   dpi=300, bbox_inches='tight')
        plt.show()
        
        print(f"📊 Visualizations saved to output/visualizations/reddit_analytics_{timestamp}.png")
        
        # Create Word Cloud if we have text data
        create_wordcloud()
        
    except Exception as e:
        print(f"❌ Error creating visualizations: {e}")

def create_wordcloud():
    """Create word cloud from all processed text"""
    try:
        if not raw_data_buffer:
            return
        
        # Combine all text
        all_text = " ".join([row.text for row in raw_data_buffer if row.text])
        
        if len(all_text) < 100:
            print("❌ Not enough text for word cloud")
            return
        
        # Create word cloud
        wordcloud = WordCloud(width=800, height=400, 
                            background_color='white',
                            colormap='viridis',
                            max_words=100).generate(all_text)
        
        plt.figure(figsize=(12, 6))
        plt.imshow(wordcloud, interpolation='bilinear')
        plt.axis('off')
        plt.title('Word Cloud - Germany-related Reddit Comments', 
                 fontsize=16, fontweight='bold', pad=20)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        plt.savefig(f'output/visualizations/wordcloud_{timestamp}.png', 
                   dpi=300, bbox_inches='tight')
        plt.show()
        
        print(f"☁️ Word cloud saved to output/visualizations/wordcloud_{timestamp}.png")
        
    except Exception as e:
        print(f"❌ Error creating word cloud: {e}")

def generate_summary_report():
    """Generate a comprehensive summary report"""
    try:
        if not metrics_buffer:
            print("❌ No data available for report")
            return
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        report = f"""
# Reddit Germany Analytics Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

## Overview
- Total Batches Processed: {len(metrics_buffer)}
- Total Comments Analyzed: {len(raw_data_buffer)}
- Analysis Period: {metrics_buffer[0]['timestamp'] if metrics_buffer else 'N/A'} to {metrics_buffer[-1]['timestamp'] if metrics_buffer else 'N/A'}

## Key Metrics
"""
        
        if metrics_buffer:
            avg_sentiment_overall = sum([m['avg_sentiment'] for m in metrics_buffer]) / len(metrics_buffer)
            avg_score_overall = sum([m['avg_score'] for m in metrics_buffer]) / len(metrics_buffer)
            total_comments = sum([m['total_comments'] for m in metrics_buffer])
            
            report += f"""
- Overall Average Sentiment: {avg_sentiment_overall:.3f}
- Overall Average Score: {avg_score_overall:.2f}
- Total Comments Processed: {total_comments}
- Average Comments per Batch: {total_comments / len(metrics_buffer):.1f}

## Sentiment Analysis
"""
            
            # Sentiment summary
            total_positive = sum([m['sentiment_distribution']['positive'] for m in metrics_buffer])
            total_negative = sum([m['sentiment_distribution']['negative'] for m in metrics_buffer])
            total_neutral = sum([m['sentiment_distribution']['neutral'] for m in metrics_buffer])
            
            report += f"""
- Positive Comments: {total_positive} ({total_positive/total_comments*100:.1f}%)
- Negative Comments: {total_negative} ({total_negative/total_comments*100:.1f}%)
- Neutral Comments: {total_neutral} ({total_neutral/total_comments*100:.1f}%)

## Most Recent Analysis
"""
            
            latest = metrics_buffer[-1]
            report += f"""
- Latest Batch: {latest['batch_id']}
- Comments in Latest Batch: {latest['total_comments']}
- Latest Average Sentiment: {latest['avg_sentiment']:.3f}
- Top Words (TF-IDF): {[word for word, freq in latest['top_words_tfidf'][:5]]}
"""
            
            if latest['references']['top_user_mentions']:
                report += f"\n- Top User Mentions: {[user for user, count in latest['references']['top_user_mentions'][:5]]}"
            
            if latest['references']['top_subreddit_refs']:
                report += f"\n- Top Subreddit References: {[sub for sub, count in latest['references']['top_subreddit_refs'][:5]]}"
        
        # Save report
        with open(f'output/summary_report_{timestamp}.md', 'w') as f:
            f.write(report)
        
        print(f"📋 Summary report saved to output/summary_report_{timestamp}.md")
        print("\n" + report)
        
    except Exception as e:
        print(f"❌ Error generating report: {e}")

print("📊 Visualization functions defined successfully!")

In [None]:
# Utility Functions for Data Export and Analysis

def export_to_csv():
    """Export processed data to CSV format"""
    try:
        if not raw_data_buffer:
            print("❌ No raw data to export")
            return
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        # Convert raw data to pandas DataFrame
        data_for_csv = []
        for row in raw_data_buffer:
            data_for_csv.append({
                'id': row.id,
                'text': row.text[:500] if row.text else '',  # Truncate long text
                'created_utc': row.created_utc,
                'author': row.author,
                'subreddit': row.subreddit,
                'score': row.score,
                'sentiment_compound': row.sentiment.compound if row.sentiment else 0,
                'sentiment_positive': row.sentiment.positive if row.sentiment else 0,
                'sentiment_negative': row.sentiment.negative if row.sentiment else 0,
                'word_count': row.word_count,
                'char_count': row.char_count,
                'user_mentions_count': len(row.user_mentions) if row.user_mentions else 0,
                'url_count': len(row.urls) if row.urls else 0
            })
        
        df = pd.DataFrame(data_for_csv)
        csv_path = f'output/reddit_data_{timestamp}.csv'
        df.to_csv(csv_path, index=False)
        
        print(f"📄 Data exported to CSV: {csv_path}")
        print(f"📊 Exported {len(df)} records")
        
        # Also export metrics
        if metrics_buffer:
            metrics_df = pd.DataFrame(metrics_buffer)
            metrics_csv_path = f'output/metrics_{timestamp}.csv'
            metrics_df.to_csv(metrics_csv_path, index=False)
            print(f"📈 Metrics exported to CSV: {metrics_csv_path}")
        
        return csv_path
        
    except Exception as e:
        print(f"❌ Error exporting to CSV: {e}")
        return None

def analyze_peak_activity():
    """Analyze peak activity periods"""
    try:
        if not raw_data_buffer:
            print("❌ No data for peak analysis")
            return
        
        # Group by hour
        hourly_activity = {}
        for row in raw_data_buffer:
            if row.created_utc:
                hour = datetime.fromtimestamp(row.created_utc).hour
                hourly_activity[hour] = hourly_activity.get(hour, 0) + 1
        
        # Find peak hours
        sorted_hours = sorted(hourly_activity.items(), key=lambda x: x[1], reverse=True)
        
        print("\n🕐 Peak Activity Analysis:")
        print(f"📊 Total hours with activity: {len(hourly_activity)}")
        if sorted_hours:
            print(f"🔥 Peak hour: {sorted_hours[0][0]}:00 ({sorted_hours[0][1]} comments)")
            print(f"📈 Top 5 active hours:")
            for hour, count in sorted_hours[:5]:
                print(f"   {hour:2d}:00 - {count:3d} comments")
        
        return hourly_activity
        
    except Exception as e:
        print(f"❌ Error analyzing peak activity: {e}")
        return {}

def get_top_controversial():
    """Get most controversial comments (high engagement, mixed sentiment)"""
    try:
        if not raw_data_buffer:
            return []
        
        controversial = []
        for row in raw_data_buffer:
            if row.score and row.sentiment and row.text:
                # High score + neutral sentiment = controversial
                controversy_score = abs(row.score) * (1 - abs(row.sentiment.compound))
                controversial.append({
                    'text': row.text[:200],
                    'score': row.score,
                    'sentiment': row.sentiment.compound,
                    'controversy_score': controversy_score,
                    'subreddit': row.subreddit,
                    'author': row.author
                })
        
        # Sort by controversy score
        controversial.sort(key=lambda x: x['controversy_score'], reverse=True)
        
        print("\n🔥 Most Controversial Comments:")
        for i, comment in enumerate(controversial[:5], 1):
            print(f"{i}. Score: {comment['score']:3d}, Sentiment: {comment['sentiment']:+.3f}")
            print(f"   [{comment['subreddit']}] {comment['text'][:150]}...")
            print()
        
        return controversial[:10]
        
    except Exception as e:
        print(f"❌ Error finding controversial comments: {e}")
        return []

print("🔧 Utility functions defined successfully!")

In [None]:
# Final Execution Section
# This cell should be run after the streaming has been stopped

def run_final_analysis():
    """Run comprehensive final analysis after streaming stops"""
    print("\n" + "="*60)
    print("🔬 RUNNING FINAL COMPREHENSIVE ANALYSIS")
    print("="*60)
    
    if not raw_data_buffer and not metrics_buffer:
        print("❌ No data collected. Make sure to run the producer first!")
        return
    
    # 1. Generate visualizations
    print("\n📊 1. Creating Visualizations...")
    create_visualizations()
    
    # 2. Export data
    print("\n💾 2. Exporting Data...")
    csv_path = export_to_csv()
    
    # 3. Peak activity analysis
    print("\n🕐 3. Analyzing Peak Activity...")
    hourly_data = analyze_peak_activity()
    
    # 4. Find controversial content
    print("\n🔥 4. Finding Controversial Content...")
    controversial = get_top_controversial()
    
    # 5. Generate summary report
    print("\n📋 5. Generating Summary Report...")
    generate_summary_report()
    
    # 6. Final statistics
    print("\n📊 6. Final Statistics Summary:")
    if raw_data_buffer:
        print(f"   💬 Total Comments: {len(raw_data_buffer)}")
        print(f"   📈 Total Batches: {len(metrics_buffer)}")
        
        avg_sentiment = sum([r.sentiment.compound for r in raw_data_buffer if r.sentiment]) / len([r for r in raw_data_buffer if r.sentiment])
        print(f"   😊 Overall Sentiment: {avg_sentiment:.3f}")
        
        subreddit_counts = {}
        for r in raw_data_buffer:
            if r.subreddit:
                subreddit_counts[r.subreddit] = subreddit_counts.get(r.subreddit, 0) + 1
        
        print(f"   🏷️  Subreddits Covered: {len(subreddit_counts)}")
        print(f"   🔝 Top Subreddit: {max(subreddit_counts.items(), key=lambda x: x[1]) if subreddit_counts else 'None'}")
        
        total_words = sum([r.word_count for r in raw_data_buffer if r.word_count])
        print(f"   📝 Total Words Analyzed: {total_words:,}")
        
        url_count = sum([len(r.urls) for r in raw_data_buffer if r.urls])
        user_mention_count = sum([len(r.user_mentions) for r in raw_data_buffer if r.user_mentions])
        print(f"   🔗 URLs Found: {url_count}")
        print(f"   👥 User Mentions: {user_mention_count}")
    
    print("\n✅ Final analysis complete!")
    print(f"📁 All results saved in output/ directory")
    print("="*60)

# Instructions for running the analysis
print("""\n📝 INSTRUCTIONS FOR FINAL ANALYSIS:

After stopping the streaming query (Ctrl+C), run:
    run_final_analysis()

This will:
✅ Create comprehensive visualizations
✅ Export data to CSV format
✅ Analyze peak activity periods
✅ Find controversial comments
✅ Generate summary report
✅ Show final statistics

All results will be saved in the output/ directory.
""")

print("🎯 Ready for final analysis!")

In [None]:
# Troubleshooting and Help Section

def show_help():
    """Display help information"""
    help_text = """
🆘 REDDIT ANALYTICS TROUBLESHOOTING GUIDE

❓ COMMON ISSUES:

1. "No data received" or "Batch X: No data"
   ✅ Make sure the Reddit Producer is running first
   ✅ Check that both producer and consumer use the same port (9998)
   ✅ Verify Reddit API credentials are valid
   ✅ Ensure you're connected to the internet

2. "Connection refused" errors
   ✅ Start the consumer first, then the producer
   ✅ Check if port 9998 is available
   ✅ Try changing the port in both producer and consumer

3. "TF-IDF" or "Spark" errors
   ✅ Make sure PySpark is properly installed
   ✅ Check Java version (Java 8 or 11 recommended)
   ✅ Verify Spark dependencies

4. Visualization errors
   ✅ Install required packages: matplotlib, seaborn, wordcloud
   ✅ Run: pip install matplotlib seaborn wordcloud

🚀 RUNNING THE PROJECT:

1. Start Consumer (this notebook):
   - Run all cells up to the streaming query
   - Wait for "Waiting for data from Reddit producer..." message

2. Start Producer (other notebook):
   - Make sure Reddit API credentials are set
   - Run the streaming cell
   - Look for "🔼 Sending comment to socket" messages

3. Monitor Progress:
   - Watch for batch processing messages in consumer
   - Check output/ directory for saved files
   - Stream status updates every 30 seconds

4. Stop and Analyze:
   - Press Ctrl+C to stop streaming
   - Run run_final_analysis() for comprehensive results

📊 OUTPUT FILES:
- output/raw_data/ - Raw Reddit comments
- output/metrics/ - Processed analytics
- output/visualizations/ - Charts and graphs
- output/*.csv - Data in CSV format
- output/summary_report_*.md - Summary report

🔧 CONFIGURATION:
- Port: 9998 (change in both notebooks if needed)
- Batch processing: Every 10 seconds
- Data buffer: Last 1000 comments
- Window analysis: 60 seconds

📞 SUPPORT:
If issues persist, check:
- Python version (3.7+ recommended)
- PySpark installation
- Network connectivity
- Reddit API status
"""
    print(help_text)

def show_current_status():
    """Show current processing status"""
    print(f"\n📊 CURRENT STATUS - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"📈 Batches Processed: {batch_counter}")
    print(f"💾 Raw Data Buffer: {len(raw_data_buffer)} records")
    print(f"📊 Metrics Buffer: {len(metrics_buffer)} records")
    
    if raw_data_buffer:
        recent_comments = raw_data_buffer[-5:]
        print(f"\n🔥 Recent Comments:")
        for i, comment in enumerate(recent_comments, 1):
            sentiment_emoji = "😊" if comment.sentiment.compound > 0.1 else "😞" if comment.sentiment.compound < -0.1 else "😐"
            print(f"   {i}. {sentiment_emoji} [{comment.subreddit}] {comment.text[:80]}...")
    
    print(f"\n📁 Output Directory Contents:")
    try:
        for root, dirs, files in os.walk("output"):
            level = root.replace("output", "").count(os.sep)
            indent = " " * 2 * level
            print(f"{indent}{os.path.basename(root)}/")
            subindent = " " * 2 * (level + 1)
            for file in files:
                print(f"{subindent}{file}")
    except:
        print("   No output directory found yet")

# Quick access functions
print("\n🆘 HELP FUNCTIONS AVAILABLE:")
print("   show_help() - Display troubleshooting guide")
print("   show_current_status() - Show current processing status")
print("   run_final_analysis() - Run comprehensive analysis")
print("   create_visualizations() - Create charts and graphs")
print("   export_to_csv() - Export data to CSV format")
print("\n💡 Type show_help() if you need assistance!")

In [None]:
# This cell was cleaned up - content moved to proper sections above# 🎉 REDDIT GERMANY ANALYTICS - COMPLETE SETUP!
# 
# This notebook provides comprehensive real-time analytics for Germany-related
# Reddit discussions with the following features:
#
# ✅ Real-time data streaming via socket
# ✅ Comprehensive data processing with Spark
# ✅ TF-IDF analysis for important words
# ✅ Reference extraction (users, subreddits, URLs)
# ✅ Sentiment analysis with VADER
# ✅ Real-time metrics and statistics
# ✅ Data persistence (JSON, CSV, Parquet)
# ✅ Comprehensive visualizations
# ✅ Automated reporting
# ✅ Error handling and monitoring
#
# 📊 Analytics Features:
# - Sentiment analysis over time
# - Comment volume tracking
# - TF-IDF word importance
# - Peak activity analysis
# - Controversial content detection
# - Subreddit distribution
# - Reference network analysis
#
# 🎯 Ready to analyze Germany-related Reddit discussions in real-time!

print("\n🎉 REDDIT ANALYTICS CONSUMER READY!")
print("📊 All features loaded successfully!")
print("🚀 Start the streaming query above to begin processing!")