In [10]:
from sentence_transformers import SentenceTransformer
from sklearn.cluster import KMeans
from keybert import KeyBERT
import numpy as np
from prefect import flow, task
from datetime import datetime
import pandas as pd
import requests
import time
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import nltk
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sqlalchemy import create_engine

nltk.download("vader_lexicon")

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     C:\Users\manal\AppData\Roaming\nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

In [None]:
# 2. Fetch Data from Reddit
@task
def fetch_reddit_posts(subreddit="technology", limit=50):
    url = f"https://api.pushshift.io/reddit/search/submission/?subreddit={subreddit}&size={limit}&sort=desc"
    res = requests.get(url)
    data = res.json().get("data", [])

    if not data:
        print(" No posts fetched from Pushshift.")
        return pd.DataFrame(columns=["title", "selftext", "created_utc", "subreddit", "score"])

    posts = []
    for d in data:
        title = d.get("title") or d.get("selftext") or "(no title)"
        posts.append({
            "id": d.get("id"),
            "title": str(title).strip(),
            "selftext": str(d.get("selftext", "")),
            "created_utc": datetime.utcfromtimestamp(d.get("created_utc", 0)),
            "subreddit": d.get("subreddit", subreddit),
            "score": d.get("score", 0)
        })

    df = pd.DataFrame(posts)
    if "title" not in df.columns:
        df["title"] = "(missing)"
    print("Columns fetched:", df.columns.tolist())
    return df

In [None]:
#3. Sentiment Analysis
@task
def analyze_sentiment(df):
    if "title" not in df.columns:
        print("No title column, creating empty one.")
        df["title"] = ""

    def get_sentiment(text):
        try:
            return TextBlob(text).sentiment.polarity
        except Exception:
            return 0.0

    df["sentiment_score"] = df["title"].apply(get_sentiment)
    df["sentiment_label"] = df["sentiment_score"].apply(
        lambda x: "positive" if x > 0.1 else ("negative" if x < -0.1 else "neutral")
    )
    return df



In [None]:
#topic clustering
@task
def cluster_topics(df, num_clusters=5):
    model = SentenceTransformer('all-MiniLM-L6-v2')
    kw_model = KeyBERT(model='all-MiniLM-L6-v2')

    embeddings = model.encode(df["title"].tolist(), show_progress_bar=False)
    kmeans = KMeans(n_clusters=num_clusters, random_state=42)
    df["cluster"] = kmeans.fit_predict(embeddings)

    df["topic_keywords"] = df.groupby("cluster")["title"].transform(
        lambda x: ", ".join([kw[0] for kw in kw_model.extract_keywords(" ".join(x), top_n=3)])
    )
    print(" Topic clustering complete.")
    return df


In [None]:
#store csv
@task
def store_to_csv(df):
    filename = f"reddit_sentiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    df.to_csv(filename, index=False)
    print(f" Saved results to {filename}")


In [None]:
#store to sqlite
from sqlalchemy import create_engine, text

@task
def store_to_sqlite(df):
    """Store the final DataFrame in a local SQLite database."""
    engine = create_engine(DB_PATH)
    with engine.connect() as conn:
        conn.execute(text("DROP TABLE IF EXISTS reddit_sentiment"))
        print("Dropped old table to refresh schema.")
    df.to_sql("reddit_sentiment", engine, if_exists="replace", index=False)
    print("Saved results to SQLite database (table: reddit_sentiment).")


In [None]:
from datetime import datetime
import time

print(datetime.now()) 
print(datetime.utcfromtimestamp(time.time())) 

2025-10-19 20:05:36.949545
2025-10-20 00:05:36.952499


  print(datetime.utcfromtimestamp(time.time()))  # ✅ should print UTC version


In [None]:
#define flow
import requests

@task
def get_hot_subreddits(limit=5):
    """Fetch the top trending subreddits automatically."""
    url = "https://www.reddit.com/subreddits/popular.json?limit=50"
    headers = {"User-Agent": "Mozilla/5.0"}
    try:
        res = requests.get(url, headers=headers, timeout=15)
        res.raise_for_status()
        data = res.json()["data"]["children"]
        sorted_subs = sorted(data, key=lambda x: x["data"]["subscribers"], reverse=True)
        top_subs = [x["data"]["display_name"].lower() for x in sorted_subs[:limit]]
        print(f"Top {limit} subreddits detected: {top_subs}")
        return top_subs
    except Exception as e:
        print("Error fetching trending subreddits:", e)
        # fallback default list
        return ["technology", "ai", "worldnews", "science", "gaming"]


@flow(name="Reddit Sentiment Tracker - Topic Aware")
def reddit_sentiment_flow(limit=50):
    """Main Prefect flow to process top trending subreddits."""
    subreddits = get_hot_subreddits()
    all_data = []

    for sub in subreddits:
        df = fetch_reddit_posts(subreddit=sub, limit=limit)
        df = analyze_sentiment(df)
        df = cluster_topics(df)
        all_data.append(df)

    if all_data:
        combined_df = pd.concat(all_data, ignore_index=True)
        store_to_csv(combined_df)
        store_to_sqlite(combined_df)
        print(f"Processed {len(combined_df)} total posts from {len(subreddits)} subreddits.")
    else:
        print("No data fetched from any subreddit.")


In [None]:
#run flow
if __name__ == "__main__":
    reddit_sentiment_flow(limit=50)

In [None]:
engine = create_engine("sqlite:///reddit_sentiment.db")
df = pd.read_sql("SELECT * FROM reddit_sentiment", engine)
df.head()