In [None]:
# Google Cloud BigQuery
from google.cloud import bigquery
from google.api_core.exceptions import NotFound

# Reddit API
import praw
import requests

# Data Manipulation & Exploration
import pandas as pd
from datetime import datetime, timedelta
import time

# Access Credentials
import json
import os

In [None]:
now = datetime.now()
year = now.year
month = now.strftime("%b").lower()  # jan, feb, mar
table_suffix = f"{year}_{month}"

In [None]:
# Initialize Client Object
client = bigquery.Client()

In [None]:
# Path to Reddit API Credentials
credentials = 'client_secrets.json'

# Read Credentials from JSON file
with open(credentials) as f:
    creds = json.load(f)

In [None]:
# Python Reddit API Wrapper
reddit = praw.Reddit(client_id=creds['client_id'],
                     client_secret=creds['client_secret'],
                     user_agent=creds['user_agent'],
                     redirect_uri=creds['redirect_uri'],
                     refresh_token=creds['refresh_token'])

In [None]:
# Create an Empty DataFrame for Result Storage
bigdata = pd.DataFrame()

# Provide List of Different Genres
genres = ['rising']

# Provide List of East African Subreddits
subreddits = ["Kenya"]

# Loop through Each Subreddit
for sub in subreddits:
    search = reddit.subreddit(sub)
    print(f"ðŸ“¥ Extracting posts from r/{sub}...")

    # Loop through Genres while Extracting Posts
    for genre in genres:
        posts = []

        # Submit Requests
        submissions = getattr(search, genre)(limit=1000)

        for post in submissions:
            created_at_datetime = datetime.fromtimestamp(post.created)
            today_date = datetime.today()
            genr = genre

            # Compute derived metrics
            engagement_rate = (post.score + post.num_comments) / (search.subscribers / 1000)
            engagement_intensity = (post.score * getattr(post, 'upvote_ratio', 1)) + post.num_comments + getattr(post, 'total_awards_received', 0)

            # Append to list
            posts.append([
                genr,
                post.title,
                getattr(post, 'link_flair_text', None),  # flair (topic/category)
                post.score,
                post.id,
                str(post.subreddit),
                post.url,
                post.num_comments,
                post.selftext,
                getattr(post, 'upvote_ratio', None),
                getattr(post, 'total_awards_received', 0),
                str(post.author),
                getattr(post, 'over_18', False),
                getattr(post, 'spoiler', False),
                getattr(post, 'is_self', None),
                created_at_datetime,
                today_date,
                search.subscribers,
                engagement_rate,
                engagement_intensity
            ])

        # Convert to DataFrame
        data = pd.DataFrame(
            posts,
            columns=[
                'genre', 'title', 'flair', 'score', 'id', 'subreddit', 'url',
                'num_comments', 'body', 'upvote_ratio', 'awards', 'author',
                'over_18', 'spoiler', 'is_self', 'created', 'today_date',
                'subscribers', 'engagement_rate', 'engagement_intensity'
            ]
        )

        # Concatenate
        if not data.empty:
            bigdata = pd.concat([bigdata, data], ignore_index=True)

print("âœ… Reddit weekly data extraction complete")

In [None]:
# Drop Duplicates
bigdata.drop_duplicates(subset='id', keep='first', inplace=True)

In [None]:
# Define Table ID
table_id = f"data-storage-485106.reddit.trending_now_{table_suffix}"

In [None]:
if now.day == 1: 

    # Check if current month table already has current month data
    try:
        check_sql = f"""
                    SELECT COUNT(*) AS cnt
                    FROM `{table_id}`
                    WHERE EXTRACT(MONTH FROM CAST(start_date AS DATETIME)) = {now.month}
                      AND EXTRACT(YEAR FROM CAST(start_date AS DATETIME)) = {now.year}
                    """
        check_df = client.query(check_sql).to_dataframe()
        has_current_month_data = check_df.loc[0, "cnt"] > 0
    except NotFound:
        has_current_month_data = False  # Table doesn't exist yet
  
    if not has_current_month_data:
      try:
        prev_month_date = now.replace(day=1) - timedelta(days=1)
        prev_table_suffix = f"{prev_month_date.year}_{prev_month_date.strftime('%b').lower()}"
        prev_table_id = f"data-storage-485106.reddit.trending_now_{prev_table_suffix}"
        
        try:
            prev_data = client.query(
                f"SELECT * FROM `{prev_table_id}` ORDER BY start_date DESC"
            ).to_dataframe()
            bigdata = pd.concat([prev_data, bigdata], ignore_index=True)
            print(f"Appended {len(prev_data)} rows from previous month table.")
        except NotFound:
            print("No previous month table found, skipping append.")
        
        job = client.load_table_from_dataframe(
            bigdata,
            table_id,
            job_config=bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
        )
        job.result()
        print(f"All data loaded into {table_id}, total rows: {len(bigdata)}")

      except Exception as e:
          print(f"Error during 1st-of-month load: {e}")

else:
    # ðŸ”¥ NORMAL WORKFLOW (this was missing)
    job = client.load_table_from_dataframe(
        bigdata,
        table_id,
        job_config=bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
    )
    job.result()
    print(f"Normal load completed into {table_id}, rows: {len(bigdata)}")

In [None]:
# Define SQL Query to Retrieve All Records from BigQuery
sql = (f"""
        SELECT *
        FROM `{table_id}`;
       """)

# Run SQL Query
data = client.query(sql).to_dataframe()

In [None]:
# Drop Duplicated Records
data.drop_duplicates(subset='id', keep='first', inplace=True)

# Replace Original BigQuery Table
client.delete_table(table_id)

In [None]:
# Upload New BigQuery Table
job = client.load_table_from_dataframe(data,table_id)
while job.state != 'DONE':
    time.sleep(1)
    job.reload()
    print(job.state)