# Melbourne Social Media Data Collection for Air Quality Analysis
# Research Notebook for Environmental and Social Sensing

## Overview
This notebook is a critical component of the "Proactive Air Quality Forecasting and Health Warning System in Melbourne" project. It focuses on collecting data from various social media platforms (Twitter, Reddit, Facebook) to derive a **User Observations Index (UOI)**. This index captures real-time, community-reported observations of smoke and air pollution, providing a valuable data source to complement official sensor data.

### Key Features:
- **Multi-Platform Collection**: Gathers data from Twitter, Reddit, and provides a framework for Facebook.
- **Secure API Key Management**: Utilizes a `.env` file for secure credential storage.
- **Robust & Resilient**: Comprehensive error handling and logging for each platform.
- **Targeted Search**: Uses specific keywords and locations relevant to Melbourne's air quality.
- **Structured Output**: Saves raw data in clean JSON format, ready for preprocessing and analysis.

## 1. Environment Setup and Dependencies

In [11]:
# =============================================================================
# CELL 1: SETUP, IMPORTS, AND LOGGER CONFIGURATION
# =============================================================================
"""
Melbourne Social Media Data Collection System for UOI
=====================================================

This notebook collects social media posts related to air quality in Melbourne
from Twitter, Reddit, and Facebook.

Author: Research Team (Lê Nguyễn Gia Hưng, Hoàng Phạm Gia Bảo, Võ Tấn Phát)
Date: 2025
Purpose: Collect data for User Observations Index (UOI) calculation.
"""

import os
import sys
import json
import logging
from datetime import datetime, timezone
from typing import Dict, List, Any

import pandas as pd
import praw
import tweepy
import facebook
from dotenv import load_dotenv
from tqdm.notebook import tqdm

# --- 1. UTF-8 Aware Logger Setup (Consistent with Project Standards) ---
def setup_logger(log_file='logs/03_social_media_data_collection.log', level=logging.INFO):
    """Configures a logger to be UTF-8 aware for both console and file output."""
    # Create logs directory if it doesn't exist
    os.makedirs(os.path.dirname(log_file), exist_ok=True)
    
    logger = logging.getLogger()
    logger.setLevel(level)
    
    if logger.hasHandlers():
        logger.handlers.clear()

    console_handler = logging.StreamHandler(sys.stdout)
    console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    console_handler.setFormatter(console_formatter)
    
    file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8')
    file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(file_formatter)

    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    
    return logger

# --- 2. Initialize Environment ---
load_dotenv()
logger = setup_logger()

logger.info("📱 Melbourne Social Media Data Collection System")
logger.info("=" * 50)
logger.info("🔬 Research Environment Initialized")
logger.info("📋 Dependencies loaded and logger configured.")

2025-06-21 01:36:58,866 - INFO - 📱 Melbourne Social Media Data Collection System
2025-06-21 01:36:58,869 - INFO - 🔬 Research Environment Initialized
2025-06-21 01:36:58,870 - INFO - 📋 Dependencies loaded and logger configured.


## 2. Configuration and Research Parameters

In [12]:
# =============================================================================
# CELL 2: RESEARCH CONFIGURATION
# =============================================================================

class SocialMediaConfig:
    """Configuration class for social media data collection."""
    
    def __init__(self):
        # --- API Credentials ---
        self.TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')
        self.REDDIT_CLIENT_ID = os.getenv('REDDIT_CLIENT_ID')
        self.REDDIT_CLIENT_SECRET = os.getenv('REDDIT_CLIENT_SECRET')
        self.REDDIT_USER_AGENT = os.getenv('REDDIT_USER_AGENT')
        self.FACEBOOK_ACCESS_TOKEN = os.getenv('FACEBOOK_ACCESS_TOKEN')

        # --- Search Parameters ---
        # Shared keywords for all platforms
        self.KEYWORDS = ["smoke", "air quality", "haze", "bushfire smoke", "air pollution", "PM2.5"]
        self.TWITTER_QUERY = f'("melbourne" OR "#melbweather") ({" OR ".join(self.KEYWORDS)}) -is:retweet'
        self.REDDIT_SUBREDDITS = ['melbourne', 'australia']
        self.FACEBOOK_GROUP_IDS = ['MelbourneWeatherWatch'] # Example, use actual ID
        
        # --- Data Collection Parameters ---
        self.SEARCH_LIMIT_PER_PLATFORM = 500  # Max items to fetch per run
        self.REQUEST_TIMEOUT = 30 # seconds
        
        # --- Output Configuration ---
        self.OUTPUT_DIR = "../../data/raw/social_media/"
        os.makedirs(self.OUTPUT_DIR, exist_ok=True)
        self.TIMESTAMP = datetime.now().strftime('%Y%m%d_%H%M%S')
        self.OUTPUT_PATHS = {
            "twitter": os.path.join(self.OUTPUT_DIR, f"twitter_data_{self.TIMESTAMP}.json"),
            "reddit": os.path.join(self.OUTPUT_DIR, f"reddit_data_{self.TIMESTAMP}.json"),
            "facebook": os.path.join(self.OUTPUT_DIR, f"facebook_data_{self.TIMESTAMP}.json")
        }
        
    def validate_credentials(self):
        """Checks if essential API credentials are set."""
        if not all([self.TWITTER_BEARER_TOKEN]):
            logger.warning("⚠️ Twitter API Bearer Token not found. Twitter collection will be skipped.")
        else: logger.info("🔑 Twitter credentials configured: ✅")
            
        if not all([self.REDDIT_CLIENT_ID, self.REDDIT_CLIENT_SECRET, self.REDDIT_USER_AGENT]):
            logger.warning("⚠️ Reddit API credentials not found. Reddit collection will be skipped.")
        else: logger.info("🔑 Reddit credentials configured: ✅")
            
        if not self.FACEBOOK_ACCESS_TOKEN:
            logger.warning("⚠️ Facebook Access Token not found. Facebook collection will be skipped.")
        else: logger.info("🔑 Facebook credentials configured: ✅")

# --- Initialize configuration ---
try:
    config = SocialMediaConfig()
    config.validate_credentials()
    logger.info(f"🎯 Research Configuration Loaded")
    logger.info(f"💾 Output directory: {config.OUTPUT_DIR}")
except Exception as e:
    logger.critical(f"❌ Could not initialize configuration: {e}")

2025-06-21 01:36:58,896 - INFO - 🔑 Twitter credentials configured: ✅
2025-06-21 01:36:58,897 - INFO - 🔑 Reddit credentials configured: ✅
2025-06-21 01:36:58,898 - INFO - 🔑 Facebook credentials configured: ✅
2025-06-21 01:36:58,899 - INFO - 🎯 Research Configuration Loaded
2025-06-21 01:36:58,899 - INFO - 💾 Output directory: ../../data/raw/social_media/


## 3. Data Collection Functions

In [None]:
# =============================================================================
# CELL 3: CORE DATA COLLECTION CLASS (REVISED)
# =============================================================================
# This revised cell incorporates feedback to improve robustness.
# Key Changes in `fetch_from_twitter`:
#   - Uses `tweepy.Paginator` to correctly handle pagination and fetch up to the desired limit.
#   - Adds specific error handling for `tweepy.errors.TooManyRequestsError`.
#   - Provides a more accurate `tqdm` progress bar for the Twitter collection process.
# =============================================================================

class SocialMediaCollector:
    """Handles the logic for fetching data from social media APIs."""
    
    def __init__(self, config: SocialMediaConfig):
        self.config = config
        self.api_clients = {}
        self._initialize_clients()
        
    def _initialize_clients(self):
        """Initializes API clients based on available credentials."""
        try:
            if self.config.TWITTER_BEARER_TOKEN:
                # Add wait_on_rate_limit to help with minor limit breaches
                self.api_clients['twitter'] = tweepy.Client(
                    self.config.TWITTER_BEARER_TOKEN,
                    wait_on_rate_limit=True
                )
            if all([self.config.REDDIT_CLIENT_ID, self.config.REDDIT_CLIENT_SECRET, self.config.REDDIT_USER_AGENT]):
                self.api_clients['reddit'] = praw.Reddit(
                    client_id=self.config.REDDIT_CLIENT_ID,
                    client_secret=self.config.REDDIT_CLIENT_SECRET,
                    user_agent=self.config.REDDIT_USER_AGENT
                )
            if self.config.FACEBOOK_ACCESS_TOKEN:
                self.api_clients['facebook'] = facebook.GraphAPI(self.config.FACEBOOK_ACCESS_TOKEN)
            logger.info("API clients initialized successfully.")
        except Exception as e:
            logger.error(f"❌ Failed to initialize an API client: {e}")

    def _save_to_json(self, data: List[Dict], platform: str):
        """Saves collected data to a JSON file."""
        if not data:
            logger.info(f"No data to save for {platform}.")
            return
        
        path = self.config.OUTPUT_PATHS[platform]
        try:
            with open(path, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=4)
            logger.info(f"💾 Successfully saved {len(data)} records for {platform} to {path}")
        except IOError as e:
            logger.error(f"❌ Could not write to file {path}: {e}")

    def fetch_from_twitter(self) -> List[Dict]:
        """Fetches tweets using the Twitter API v2, with pagination and rate limit handling."""
        if 'twitter' not in self.api_clients:
            logger.warning("Skipping Twitter: client not initialized.")
            return []
            
        client = self.api_clients['twitter']
        collected_tweets = []
        logger.info(f"🐦 Fetching up to {self.config.SEARCH_LIMIT_PER_PLATFORM} tweets with query: {self.config.TWITTER_QUERY}")
        
        try:
            # Use Paginator to automatically handle fetching multiple pages of results
            paginator = tweepy.Paginator(
                client.search_recent_tweets,
                query=self.config.TWITTER_QUERY,
                tweet_fields=["created_at", "author_id", "public_metrics", "geo"],
                max_results=100  # Max per page, Paginator handles the rest
            ).flatten(limit=self.config.SEARCH_LIMIT_PER_PLATFORM)

            # Loop through the paginated results with a progress bar
            for tweet in tqdm(paginator, desc="Processing Tweets", unit="tweet", total=self.config.SEARCH_LIMIT_PER_PLATFORM):
                collected_tweets.append({
                    'platform': 'twitter',
                    'id': tweet.id,
                    'text': tweet.text,
                    'created_at': tweet.created_at.isoformat(),
                    'author_id': tweet.author_id
                })
            
            if not collected_tweets:
                logger.info("No tweets found for the given query.")

        except tweepy.errors.TooManyRequestsError:
            logger.error("❌ Twitter API rate limit exceeded. Collection for Twitter has stopped. Please wait and try again later.")
        except Exception as e:
            logger.error(f"❌ An unexpected error occurred during Twitter fetch: {e}")
        
        self._save_to_json(collected_tweets, 'twitter')
        return collected_tweets

    def fetch_from_reddit(self) -> List[Dict]:
        """
        Fetches posts from Reddit with improved error handling.
        This version isolates failures to a single subreddit, allowing the
        collection to continue with other subreddits if one fails.
        """
        if 'reddit' not in self.api_clients:
            logger.warning("Skipping Reddit: client not initialized.")
            return []
            
        reddit = self.api_clients['reddit']
        collected_posts = []
        query = ' OR '.join(self.config.KEYWORDS)
        logger.info(f"🤖 Fetching from Reddit subreddits: {self.config.REDDIT_SUBREDDITS}")
        
        # Calculate an approximate limit per subreddit
        limit_per_sub = self.config.SEARCH_LIMIT_PER_PLATFORM // len(self.config.REDDIT_SUBREDDITS)
        if limit_per_sub == 0: limit_per_sub = 1 # Ensure at least 1 post is fetched if limit is small
        
        for subreddit_name in self.config.REDDIT_SUBREDDITS:
            try:
                logger.info(f"-> Searching r/{subreddit_name}...")
                subreddit = reddit.subreddit(subreddit_name)
                search_results = subreddit.search(query, sort='new', limit=limit_per_sub)
                
                # Use a list to buffer results from the generator, making it more resilient to mid-stream errors
                subreddit_posts = list(tqdm(search_results, desc=f"r/{subreddit_name}", unit="post"))
                
                for submission in subreddit_posts:
                    collected_posts.append({
                        'platform': 'reddit',
                        'id': submission.id,
                        'title': submission.title,
                        'text': submission.selftext,
                        'created_utc': datetime.fromtimestamp(submission.created_utc, tz=timezone.utc).isoformat(),
                        'subreddit': subreddit_name,
                        'score': submission.score,
                        'url': submission.url
                    })
                logger.info(f"-> Found {len(subreddit_posts)} posts in r/{subreddit_name}.")

            except Exception as e:
                # By placing the try/except block here, if one subreddit fails,
                # we log the error and the loop continues to the next one.
                logger.error(f"❌ An error occurred while fetching from r/{subreddit_name}: {e}")
                logger.warning(f"--> Skipping to the next subreddit.")
                continue # Move to the next subreddit
                
        self._save_to_json(collected_posts, 'reddit')
        return collected_posts
    
    def fetch_from_facebook(self) -> List[Dict]:
        """
        Placeholder for fetching data from Facebook Groups/Pages.
        NOTE: Requires a valid User or Page Access Token with appropriate permissions.
        Access to public group data is highly restricted. This is a basic framework.
        """
        if 'facebook' not in self.api_clients:
            logger.warning("Skipping Facebook: client not initialized.")
            return []
            
        logger.warning("🔵 Facebook collection is a placeholder. API access is restricted and this function will not collect data.")
        # The following code is a template and will likely fail without a reviewed app and proper permissions.
        # graph = self.api_clients['facebook']
        # collected_posts = []
        # try:
        #     for group_id in self.config.FACEBOOK_GROUP_IDS:
        #         feed = graph.get_connections(id=group_id, connection_name='feed',
        #                                    fields='message,created_time,id,permalink_url')
        #         # Process feed data here...
        #     self._save_to_json(collected_posts, 'facebook')
        # except facebook.GraphAPIError as e:
        #     logger.error(f"❌ Facebook Graph API Error: {e}")
        return []

# --- Initialize the collector ---\
try:
    collector = SocialMediaCollector(config)
    logger.info("🔧 Social media collector instance created.")
except Exception as e:
    logger.critical(f"❌ Failed to instantiate SocialMediaCollector: {e}")

2025-06-21 01:36:58,936 - INFO - API clients initialized successfully.
2025-06-21 01:36:58,938 - INFO - 🔧 Social media collector instance created.


## 4. Execute Data Collection

In [14]:
# =============================================================================
# CELL 4: DATA COLLECTION ORCHESTRATOR
# =============================================================================

def run_collection_pipeline():
    """Orchestrates the entire social media data collection process."""
    logger.info("🚀 Starting social media data collection pipeline...")
    start_time = datetime.now()
    
    # --- Execute collection for each platform ---
    twitter_data = collector.fetch_from_twitter()
    reddit_data = collector.fetch_from_reddit()
    facebook_data = collector.fetch_from_facebook() # Placeholder call
    
    # --- Summarize Results ---
    end_time = datetime.now()
    duration = end_time - start_time
    logger.info("=" * 60)
    logger.info("✅ SOCIAL MEDIA DATA COLLECTION COMPLETED!")
    logger.info(f"⏱️  Total duration: {duration}")
    logger.info(f"📊 Records collected:")
    logger.info(f"   - Twitter: {len(twitter_data)} records saved to {config.OUTPUT_PATHS['twitter']}")
    logger.info(f"   - Reddit: {len(reddit_data)} records saved to {config.OUTPUT_PATHS['reddit']}")
    logger.info(f"   - Facebook: {len(facebook_data)} records (placeholder)")
    logger.info("=" * 60)
    print("\n🎉 Collection complete. Check logs and output files for details.")
    print("➡️ Proceed to the next cell for preliminary analysis.")

# --- Execute the Pipeline ---
run_collection_pipeline()

2025-06-21 01:36:58,980 - INFO - 🚀 Starting social media data collection pipeline...
2025-06-21 01:36:58,980 - INFO - 🐦 Fetching up to 500 tweets with query: ("melbourne" OR "#melbweather") (smoke OR air quality OR haze OR bushfire smoke OR air pollution OR PM2.5) -is:retweet


Processing Tweets:   0%|          | 0/500 [00:00<?, ?tweet/s]

2025-06-21 01:36:59,464 - INFO - 💾 Successfully saved 12 records for twitter to ../../data/raw/social_media/twitter_data_20250621_013658.json
2025-06-21 01:36:59,466 - INFO - 🤖 Fetching from Reddit subreddits: ['melbourne', 'australia']


r/melbourne: 0post [00:00, ?post/s]

2025-06-21 01:37:03,380 - ERROR - ❌ An error occurred during Reddit fetch: error with request ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))
2025-06-21 01:37:03,385 - INFO - ✅ SOCIAL MEDIA DATA COLLECTION COMPLETED!
2025-06-21 01:37:03,386 - INFO - ⏱️  Total duration: 0:00:04.402947
2025-06-21 01:37:03,387 - INFO - 📊 Records collected:
2025-06-21 01:37:03,388 - INFO -    - Twitter: 12 records saved to ../../data/raw/social_media/twitter_data_20250621_013658.json
2025-06-21 01:37:03,390 - INFO -    - Reddit: 0 records saved to ../../data/raw/social_media/reddit_data_20250621_013658.json
2025-06-21 01:37:03,391 - INFO -    - Facebook: 0 records (placeholder)

🎉 Collection complete. Check logs and output files for details.
➡️ Proceed to the next cell for preliminary analysis.


## 5. Data Loading and Preliminary Analysis

In [16]:
# =============================================================================
# CELL 5: DATA LOADING AND PRELIMINARY ANALYSIS
# =============================================================================

def load_and_analyze_data():
    """Loads the collected JSON data into Pandas DataFrames and shows a summary."""
    logger.info("🔎 Loading and analyzing collected data...")
    dataframes = {}
    
    print("\n--- Data Analysis Report ---")
    
    for platform, path in config.OUTPUT_PATHS.items():
        if os.path.exists(path) and os.path.getsize(path) > 0:
            try:
                df = pd.read_json(path)
                dataframes[platform] = df
                print(f"\n📈 {platform.title()} Data Overview:")
                print(f"   - Total records: {len(df):,}")
                print(f"   - Columns: {list(df.columns)}")
                print("\n   Sample Data:")
                display(df.head(3))
            except Exception as e:
                logger.error(f"❌ Failed to load and analyze {platform} data from {path}: {e}")
        else:
            print(f"\n🟡 {platform.title()} data not found or file is empty. Skipped.")
            
    print("--- End of Report ---")
    
    if not dataframes:
        print("\nNo data was loaded. Please check the collection logs.")
        return None
    
    print("\n✅ DataFrames are loaded and returned. You can access them via the 'loaded_dfs' variable.")
    return dataframes

# --- Run the analysis on the generated files ---
# You can re-run this cell anytime after the collection is complete.
loaded_dfs = load_and_analyze_data()

2025-06-21 01:37:25,117 - INFO - 🔎 Loading and analyzing collected data...

--- Data Analysis Report ---

📈 Twitter Data Overview:
   - Total records: 12
   - Columns: ['platform', 'id', 'text', 'created_at', 'author_id']

   Sample Data:


Unnamed: 0,platform,id,text,created_at,author_id
0,twitter,1936013144666894505,Saw a lot of very questionably dressed people ...,2025-06-20 10:47:57+00:00,995284287288033284
1,twitter,1935786917443826001,@JodiK46062719 Its melbourne. I doubt its the ...,2025-06-19 19:49:00+00:00,1843659927027372032
2,twitter,1935660223513215226,Last night in the big smoke (hopefully for a l...,2025-06-19 11:25:34+00:00,198704132



🟡 Reddit data not found or file is empty. Skipped.

🟡 Facebook data not found or file is empty. Skipped.
--- End of Report ---

✅ DataFrames are loaded and returned. You can access them via the 'loaded_dfs' variable.
