In [1]:
import praw
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import re
import string
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from pyspark.sql import SparkSession
import os

# Initialize NLTK and VADER
nltk.download('vader_lexicon')
nltk.download('stopwords')
nltk.download('punkt')
analyzer = SentimentIntensityAnalyzer()
stop_words = set(stopwords.words('english'))
stemmer = PorterStemmer()

# Reddit API setup
reddit = praw.Reddit(
    client_id = '-_LwwR2GAkBuc-mvHUVSEQ',
    client_secret = '5X25s7-NfqlEnlstXzdvHwWEKyyMfw',
    user_agent = 'Social Media Sentiment Analysis v1.0 by /u/Massive_Strategy75'
)

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [2]:
# Setup Spark environment
os.environ["SPARK_HOME"] = "C:\\spark-3.3.2-bin-hadoop3"
os.environ["HADOOP_HOME"] = "C:\\hadoop-3.4.1"
os.environ["JAVA_HOME"] = "C:\\jdk-11.0.26.4-hotspot"
os.environ["PATH"] += f";{os.environ['SPARK_HOME']}\\bin;{os.environ['HADOOP_HOME']}\\bin"

# Create Spark session
spark = SparkSession.builder \
    .master("spark://hadoop-master-node:7077") \
    .appName("RedditDemo") \
    .config("spark.driver.memory", "512M") \
    .config("spark.executor.memory", "512M") \
    .config("spark.executor.cores", "2") \
    .config("spark.driver.host", "192.168.1.1") \
    .getOrCreate()

print("✅ Spark Session Created Successfully!")


✅ Spark Session Created Successfully!


In [3]:
def clean_text(text):
    """Clean and preprocess text"""
    text = text.lower()
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    text = re.sub(r'\@\w+|\#','', text)
    text = re.sub(r'\d+', '', text)
    text = text.translate(str.maketrans('', '', string.punctuation))
    tokens = nltk.word_tokenize(text)
    tokens = [stemmer.stem(word) for word in tokens if word not in stop_words]
    return ' '.join(tokens)

In [9]:
def collect_posts(limit=100):
    """Collect newest posts from r/WorldNews"""
    print("📥 Collecting latest posts from r/WorldNews...")
    
    # Get new posts
    posts = reddit.subreddit('worldnews').new(limit=limit)
    data = []
    
    for post in posts:
        # Only get posts from last 24 hours
        post_time = datetime.fromtimestamp(post.created_utc)
        if datetime.now() - post_time > timedelta(days=1):
            continue
            
        cleaned_text = clean_text(post.title)
        sentiment = analyzer.polarity_scores(cleaned_text)
        
        data.append({
            "title": post.title,
            "cleaned_text": cleaned_text,
            "score": post.score,
            "created_at": post_time,
            "num_comments": post.num_comments,
            "sentiment_score": sentiment['compound'],
            "sentiment_label": 'positive' if sentiment['compound'] >= 0.05 
                             else 'negative' if sentiment['compound'] <= -0.05 
                             else 'neutral',
            "collected_at": datetime.now()
        })
    
    df = pd.DataFrame(data)
    print(f"✅ Collected {len(df)} new posts!")
    return df

# Collect new posts
new_data = collect_posts(limit=100)

📥 Collecting latest posts from r/WorldNews...
✅ Collected 100 new posts!


In [13]:
def collect_posts(limit=100):
    """Collect newest posts from multiple active subreddits"""
    print("📥 Collecting latest posts from multiple subreddits...")
    
    # List of active subreddits to collect from
    subreddits = [
        'news',          # General news
        'worldnews',     # World news
        'politics',      # Political news
        'technology',    # Tech news
        'business'       # Business news
    ]
    
    # Combine them into a multi-subreddit
    subreddit = reddit.subreddit('+'.join(subreddits))
    
    # Get new posts
    posts = subreddit.new(limit=limit)
    data = []
    
    # Try to read existing post IDs
    try:
        existing_df = spark.read.parquet("hdfs://hadoop-master-node:9000/sentiment/reddit_processed")
        existing_ids = set(existing_df.select("post_id").toPandas()["post_id"])
        print(f"Found {len(existing_ids)} existing posts to check against")
    except:
        existing_ids = set()
        print("No existing posts found in storage")
    
    current_time = datetime.now()
    cutoff_time = current_time - timedelta(hours=1)  # Reduced to 1 hour for more frequent updates
    
    for post in posts:
        post_time = datetime.fromtimestamp(post.created_utc)
        
        # Skip if post is too old
        if post_time < cutoff_time:
            continue
            
        # Skip if we already have this post
        if post.id in existing_ids:
            continue
            
        cleaned_text = clean_text(post.title)
        sentiment = analyzer.polarity_scores(cleaned_text)
        
        data.append({
            "title": post.title,
            "cleaned_text": cleaned_text,
            "score": post.score,
            "created_at": post_time,
            "num_comments": post.num_comments,
            "post_id": post.id,
            "subreddit": post.subreddit.display_name,  # Track which subreddit
            "sentiment_score": sentiment['compound'],
            "sentiment": 'positive' if sentiment['compound'] >= 0.05 
                        else 'negative' if sentiment['compound'] <= -0.05 
                        else 'neutral',
            "collected_at": current_time
        })
    
    if not data:
        print("No new posts found!")
        return pd.DataFrame()
        
    df = pd.DataFrame(data)
    
    print(f"""
✅ Collection Summary:
- Total new posts collected: {len(df)}
- Posts by subreddit: {df['subreddit'].value_counts().to_dict()}
- Time range: {df['created_at'].min()} to {df['created_at'].max()}
- Sentiment distribution: {df['sentiment'].value_counts().to_dict()}
    """)
    
    return df

new_data = collect_posts(limit=100)

📥 Collecting latest posts from multiple subreddits...
No existing posts found in storage

✅ Collection Summary:
- Total new posts collected: 49
- Posts by subreddit: {'politics': 30, 'worldnews': 14, 'news': 3, 'technology': 2}
- Time range: 2025-03-28 00:46:14 to 2025-03-28 01:44:13
- Sentiment distribution: {'negative': 24, 'neutral': 17, 'positive': 8}
    


In [14]:
# Save new data
print("\n💾 Saving to Hadoop...")
spark_df = spark.createDataFrame(new_data)
spark_df.write.mode("append") \
    .partitionBy("collected_at") \
    .parquet("hdfs://hadoop-master-node:9000/sentiment/results/reddit")

print("✅ Data saved successfully!")


💾 Saving to Hadoop...



iteritems is deprecated and will be removed in a future version. Use .items instead.


iteritems is deprecated and will be removed in a future version. Use .items instead.



✅ Data saved successfully!


In [15]:
# Cell for Interactive Visualizations with Adaptive Time Scaling
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta

# Read all historical data
all_data = spark.read.parquet("hdfs://hadoop-master-node:9000/sentiment/results/reddit").toPandas()

# Convert timestamps to datetime
all_data['created_at'] = pd.to_datetime(all_data['created_at'])
all_data['collected_at'] = pd.to_datetime(all_data['collected_at'])

# Calculate date range
date_range = (all_data['created_at'].max() - all_data['created_at'].min()).days

# Determine appropriate time grouping
if date_range <= 2:  # 2 days or less
    freq = 'H'
    time_unit = 'Hourly'
elif date_range <= 30:  # 1 month or less
    freq = 'D'
    time_unit = 'Daily'
elif date_range <= 365:  # 1 year or less
    freq = 'W'
    time_unit = 'Weekly'
else:  # More than a year
    freq = 'M'
    time_unit = 'Monthly'

print(f"Data spans {date_range} days. Using {time_unit} aggregation.")

# Create figure
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=(
        f'Sentiment Trends ({time_unit})',
        'Post Engagement by Sentiment',
        'Overall Sentiment Distribution',
        f'Sentiment Pattern ({time_unit})'
    )
)

# 1. Sentiment Trend Over Time (Adaptive)
# Group by time and sentiment, then pivot
trend_data = all_data.groupby([
    pd.Grouper(key='created_at', freq=freq),
    'sentiment_label'
]).size().reset_index(name='count')
trend_pivot = trend_data.pivot(
    index='created_at',
    columns='sentiment_label',
    values='count'
).fillna(0)

# Add trend lines
for sentiment in ['positive', 'negative', 'neutral']:
    if sentiment in trend_pivot.columns:
        fig.add_trace(
            go.Scatter(
                x=trend_pivot.index,
                y=trend_pivot[sentiment],
                name=sentiment.capitalize(),
                line=dict(color={'positive': 'green', 'negative': 'red', 'neutral': 'gray'}[sentiment]),
                mode='lines+markers'
            ),
            row=1, col=1
        )

# 2. Engagement by Sentiment (Box Plot)
fig.add_trace(
    go.Box(
        x=all_data['sentiment_label'],
        y=all_data['score'],
        name='Post Scores',
        boxpoints='outliers'
    ),
    row=1, col=2
)

# 3. Overall Sentiment Distribution
sentiment_counts = all_data['sentiment_label'].value_counts()
fig.add_trace(
    go.Bar(
        x=sentiment_counts.index,
        y=sentiment_counts.values,
        name='Total Posts',
        text=sentiment_counts.values,
        textposition='auto',
    ),
    row=2, col=1
)

# 4. Time Pattern (Adaptive)
if freq == 'H':
    # Hourly pattern
    pattern_data = all_data.groupby([
        all_data['created_at'].dt.hour,
        'sentiment_label'
    ]).size().reset_index(name='count')
    x_title = 'Hour of Day'
elif freq == 'D':
    # Daily pattern
    pattern_data = all_data.groupby([
        all_data['created_at'].dt.dayofweek,
        'sentiment_label'
    ]).size().reset_index(name='count')
    x_title = 'Day of Week'
else:
    # Monthly pattern
    pattern_data = all_data.groupby([
        all_data['created_at'].dt.month,
        'sentiment_label'
    ]).size().reset_index(name='count')
    x_title = 'Month'

pattern_pivot = pattern_data.pivot(
    index=pattern_data.columns[0],
    columns='sentiment_label',
    values='count'
).fillna(0)

for sentiment in ['positive', 'negative', 'neutral']:
    if sentiment in pattern_pivot.columns:
        fig.add_trace(
            go.Scatter(
                x=pattern_pivot.index,
                y=pattern_pivot[sentiment],
                name=f'{sentiment.capitalize()} ({time_unit})',
                line=dict(color={'positive': 'green', 'negative': 'red', 'neutral': 'gray'}[sentiment])
            ),
            row=2, col=2
        )

# Update layout
fig.update_layout(
    height=800,
    width=1200,
    showlegend=True,
    title=dict(
        text=f"Reddit WorldNews Sentiment Analysis Dashboard ({time_unit} View)",
        x=0.5,
        y=0.95
    ),
    legend=dict(
        yanchor="top",
        y=0.99,
        xanchor="left",
        x=1.05
    ),
    hovermode='x unified'
)

# Update axes labels
fig.update_xaxes(title_text="Time", row=1, col=1)
fig.update_xaxes(title_text="Sentiment", row=1, col=2)
fig.update_xaxes(title_text="Sentiment", row=2, col=1)
fig.update_xaxes(title_text=x_title, row=2, col=2)

fig.update_yaxes(title_text="Number of Posts", row=1, col=1)
fig.update_yaxes(title_text="Post Score", row=1, col=2)
fig.update_yaxes(title_text="Number of Posts", row=2, col=1)
fig.update_yaxes(title_text="Number of Posts", row=2, col=2)

# Add range slider for time series
fig.update_xaxes(rangeslider_visible=True, row=1, col=1)

# Show the plot
fig.show()

# Print summary statistics
print(f"\n📊 {time_unit} Summary:")
print("-" * 50)

# Group by time period
time_stats = all_data.groupby(pd.Grouper(key='created_at', freq=freq)).agg({
    'sentiment_label': lambda x: x.value_counts().to_dict(),
    'score': 'mean',
    'num_comments': 'mean'
})

print(f"\nSentiment Distribution per {time_unit}:")
print(time_stats['sentiment_label'].head())
print(f"\nAverage Engagement per {time_unit}:")
print(f"Score: {time_stats['score'].round(2)}")
print(f"Comments: {time_stats['num_comments'].round(2)}")

Data spans 0 days. Using Hourly aggregation.



Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead




📊 Hourly Summary:
--------------------------------------------------

Sentiment Distribution per Hourly:
created_at
2025-03-27 05:00:00                   {'negative': 8, 'neutral': 2}
2025-03-27 06:00:00    {'neutral': 4, 'positive': 2, 'negative': 2}
2025-03-27 07:00:00                   {'neutral': 4, 'negative': 2}
2025-03-27 08:00:00                                              {}
2025-03-27 09:00:00                   {'neutral': 8, 'negative': 4}
Freq: H, Name: sentiment_label, dtype: object

Average Engagement per Hourly:
Score: created_at
2025-03-27 05:00:00    4132.00
2025-03-27 06:00:00    1465.38
2025-03-27 07:00:00     195.67
2025-03-27 08:00:00        NaN
2025-03-27 09:00:00    4465.08
2025-03-27 10:00:00      52.25
2025-03-27 11:00:00     303.83
2025-03-27 12:00:00        NaN
2025-03-27 13:00:00    1560.00
2025-03-27 14:00:00     377.50
2025-03-27 15:00:00      34.75
2025-03-27 16:00:00     119.00
2025-03-27 17:00:00    1492.60
2025-03-27 18:00:00     112.54
2025-03-27 19

In [16]:
print("\n📈 Summary Statistics:")
print("-" * 50)
print(f"Total posts collected: {len(all_data)}")
print("\nSentiment distribution:")
print(all_data['sentiment_label'].value_counts())
print("\nAverage scores by sentiment:")
print(all_data.groupby('sentiment_label')['score'].mean())
print("\nMost discussed topics (by comments):")
print(all_data.nlargest(5, 'num_comments')[['title', 'num_comments', 'sentiment_label']])


📈 Summary Statistics:
--------------------------------------------------
Total posts collected: 249

Sentiment distribution:
negative    96
neutral     82
positive    22
Name: sentiment_label, dtype: int64

Average scores by sentiment:
sentiment_label
negative     493.343750
neutral     1930.487805
positive     197.227273
Name: score, dtype: float64

Most discussed topics (by comments):
                                                 title  num_comments  \
91   Trump says US will 'go as far as we have to' t...          1547   
213  Trump says US will 'go as far as we have to' t...          1529   
226  Trump Threatens More Tariffs If EU 'Works With...           875   
115  Trump Threatens More Tariffs If EU 'Works With...           874   
198  France announces $2 billion military aid packa...           739   

    sentiment_label  
91          neutral  
213         neutral  
226        negative  
115        negative  
198         neutral  
