# OpenAI, Consumer based sentiment Analysis

-- To Be Written --

## Data Collection

In [3]:
import os
import asyncio
import pandas as pd
import json
import datetime
from asyncpraw import Reddit
from asyncpraw.models import MoreComments

# Load Reddit API credentials from environment variables
USERNAME = os.getenv('USER')
if not USERNAME:
    raise ValueError("USER environment variable not set")

PASSWORD = os.getenv('PASSWORD')
if not PASSWORD:
    raise ValueError("PASSWORD environment variable not set")

CLIENT_ID = os.getenv('CLIENT_ID')
if not CLIENT_ID:
    raise ValueError("CLIENT_ID environment variable not set")

CLIENT_SECRET = os.getenv('CLIENT_SECRET')
if not CLIENT_SECRET:
    raise ValueError("CLIENT_SECRET environment variable not set")


async def create_reddit_instance():
    reddit = Reddit(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        user_agent="my user agent",
        username=USERNAME,
        password=PASSWORD,
    )
    
    # Enable rate limit handling
    reddit.requestor.rate_limit_sleep = True  # Auto-handles rate-limiting
    return reddit


def load_csv_data(file_path):
    """
    Reads the CSV file and groups the queries by post_id.
    Ensures that all query values are strings before joining.
    Returns a dictionary mapping post_id -> query string.
    """
    df = pd.read_csv(file_path)

    # Convert queries to strings and handle NaN values
    df["query"] = df["query"].fillna("").astype(str)

    # Group by post_id and join unique queries with a semicolon.
    grouped = df.groupby("post_id")["query"].apply(lambda x: ";".join(set(x))).reset_index()
    mapping = dict(zip(grouped["post_id"], grouped["query"]))

    return mapping


async def fetch_post_and_comments(reddit, post_id):
    """
    Fetches the submission and its top 10 first-level comments.
    Returns a list of dictionaries containing post and comment details.
    In case of an error, returns one dictionary with error details.
    """
    rows = []
    try:
        submission = await reddit.submission(id=post_id)  # Await here
        
        # Load submission details
        await submission.load()
        
        # Fetch top-level comments (limit: 10)
        await submission.comments.replace_more(limit=10)
        top_comments = submission.comments[:10]

        submission_details = {
            "post_id": submission.id,
            "subreddit": submission.subreddit.display_name,
            "post_title": submission.title,
            "post_body": submission.selftext,
            "number_of_comments": submission.num_comments,
            "readable_datetime": datetime.datetime.fromtimestamp(submission.created_utc).strftime("%Y-%m-%d %H:%M:%S"),
            "post_author": submission.author.name if submission.author else None,
        }

        if top_comments:
            for comment in top_comments:
                if isinstance(comment, MoreComments):  # Skip "load more" placeholders
                    continue
                row = submission_details.copy()
                row.update({
                    "comment_id": comment.id,
                    "comment_body": comment.body,
                    "number_of_upvotes": comment.score,
                    "comment_author": comment.author.name if comment.author else None,
                })
                rows.append(row)
        else:
            # No comments found; create a row with only post details
            row = submission_details.copy()
            row.update({
                "comment_id": None,
                "comment_body": None,
                "number_of_upvotes": None,
                "comment_author": None,
            })
            rows.append(row)

    except Exception as e:
        print(f"Error fetching data for post_id {post_id}: {e}")
        # Return an error row where key details (like 'subreddit') are None.
        rows.append({
            "post_id": post_id,
            "subreddit": None,
            "post_title": None,
            "post_body": None,
            "number_of_comments": None,
            "readable_datetime": None,
            "post_author": None,
            "comment_id": None,
            "comment_body": None,
            "number_of_upvotes": None,
            "comment_author": None,
        })
    
    return rows


def save_csv(data, file_name):
    """Writes the provided data (a list of dictionaries) to a CSV file."""
    df = pd.DataFrame(data)
    df.to_csv(file_name, index=False)
    print(f"CSV file saved as {file_name}")


def save_json(data, file_name):
    """Writes the provided data (a list of dictionaries) to a JSON file."""
    with open(file_name, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, default=str)
    print(f"JSON file saved as {file_name}")


async def countdown(seconds):
    """Displays a countdown timer for the given number of seconds."""
    for remaining in range(seconds, 0, -1):
        print(f"Resuming in {remaining:2d} seconds...", end="\r", flush=True)
        await asyncio.sleep(1)
    print("")  # Move to a new line after countdown


async def main():
    reddit = await create_reddit_instance()
    post_query_mapping = load_csv_data("combined_data.csv")
    all_post_ids = list(post_query_mapping.keys())
    
    # Check for an existing checkpoint (processed post_ids)
    checkpoint_file = "checkpoint.json"
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, "r", encoding="utf-8") as f:
            processed_ids = json.load(f)
        print(f"Resuming from checkpoint: {len(processed_ids)} posts already processed.")
    else:
        processed_ids = []

    # Only process posts that have not yet been processed.
    to_process_all = [post_id for post_id in all_post_ids if post_id not in processed_ids]
    print(f"Total posts to process: {len(to_process_all)}")

    # Maintain a retry counter for each post.
    retries = {post_id: 0 for post_id in to_process_all}
    max_retries = 3  # Maximum attempts per post
    batch_size = 30  # Number of posts to process per mini-batch

    # Define output file names.
    output_csv = "new_combined_dataset.csv"
    output_json = "new_combined_dataset.json"

    # If output files already exist, load their data; otherwise, start with an empty list.
    if os.path.exists(output_csv):
        existing_df = pd.read_csv(output_csv)
        output_data = existing_df.to_dict(orient="records")
    else:
        output_data = []

    while to_process_all:
        print(f"\nStarting a round with {len(to_process_all)} posts to process.")
        next_to_process = []  # To hold posts that need to be retried

        # Process posts in mini-batches.
        for i in range(0, len(to_process_all), batch_size):
            batch_ids = to_process_all[i:i+batch_size]
            print(f"\nProcessing mini-batch {i // batch_size + 1} "
                  f"of {((len(to_process_all)-1) // batch_size) + 1} (posts: {batch_ids})")
            
            tasks = [fetch_post_and_comments(reddit, post_id) for post_id in batch_ids]
            results = await asyncio.gather(*tasks)
            
            for post_id, rows in zip(batch_ids, results):
                # If the first row's "subreddit" is None, we assume an error occurred.
                if rows and rows[0]["subreddit"] is None:
                    retries[post_id] += 1
                    if retries[post_id] < max_retries:
                        print(f"Error for post_id {post_id}; retrying (attempt {retries[post_id]}/{max_retries}).")
                        next_to_process.append(post_id)
                    else:
                        print(f"Error for post_id {post_id} after {max_retries} attempts; logging error.")
                        for row in rows:
                            row["query"] = post_query_mapping.get(post_id, "")
                        output_data.extend(rows)
                        processed_ids.append(post_id)
                else:
                    # Successful result.
                    for row in rows:
                        row["query"] = post_query_mapping.get(post_id, "")
                    output_data.extend(rows)
                    processed_ids.append(post_id)
            
            # Incrementally save output after each mini-batch.
            save_csv(output_data, output_csv)
            save_json(output_data, output_json)
            # Update the checkpoint.
            with open(checkpoint_file, "w", encoding="utf-8") as f:
                json.dump(processed_ids, f, indent=4)
            
            print("Mini-batch processed. Waiting 60 seconds before the next mini-batch...")
            await countdown(60)

        if next_to_process:
            print(f"\nRetrying {len(next_to_process)} posts in the next round...\n")
            to_process_all = next_to_process
        else:
            break

    print("All posts processed.")

In [None]:
import nest_asyncio
nest_asyncio.apply()

async def run_async():
    await main()

await run_async()

  df = pd.read_csv(file_path)


Total posts to process: 9575

Starting a round with 9575 posts to process.

Processing mini-batch 1 of 320 (posts: [' the next era is the niche content producer who produces high quality video content', '1002dom', '1007cpq', '100ayoe', '100ye6s', '101chgd', '101melg', '101ms83', '101o6zx', '101p00n', '102ci8x', '102jcse', '102l28b', '102lbp8', '102lrwi', '102xqim', '1030pti', '1030xji', '1031mz4', '1031yi2', '10346f5', '1035gzm', '103ahhi', '103gran', '103qc9j', '103vj6v', '103w7m4', '103wsie', '103yg7r', '1041tuw'])
Error fetching data for post_id  the next era is the niche content producer who produces high quality video content: received 404 HTTP response
Error fetching data for post_id 1030xji: received 429 HTTP response
Error for post_id  the next era is the niche content producer who produces high quality video content; retrying (attempt 1/3).
Error for post_id 1030xji; retrying (attempt 1/3).
CSV file saved as new_combined_dataset.csv
JSON file saved as new_combined_dataset.json

## Data Preprocessing

In [None]:
# Todo

## Tf-idf Vector

In [None]:
# Todo

## Embedding

In [None]:
# Todo

## Labeling

In [None]:
# Todo

## Pipeline (Part - C)