# 🦋 Bluesky Social Justice Data Collection & Analysis
## DFP F25 Social Media Blue Team

This notebook provides a comprehensive interface for collecting and analyzing social justice data from Bluesky using dual collection methods.

## Collection Methods

### **1. Firehose Collection (Real-time)**
- Live stream of current posts as they happen
- Best for monitoring ongoing conversations
- Duration-based collection

### **2. Search API Collection (Historical)**
- Native search with deep pagination
- Systematic historical data collection
- Date range filtering and cursor-based navigation
- Enhanced query design with exact phrases and hashtags

### **3. Hybrid Collection (Both)**
- Historical data first (search API)
- Then real-time monitoring (firehose)
- Complete coverage of past and present
- Automatic time splitting: 75% search, 25% firehose

### **Features:**
- Dual collection methods (firehose + search API)
- Deep pagination with cursor persistence
- Date range filtering for historical collection
- Author influence metrics (follower counts, verification)
- Enhanced search queries (exact phrases, hashtags)
- Session-based organization with alltime datasets
- Rich content analysis (hashtags, media, emotions)
- Secure authentication from external credentials file

### **Why Author Metrics Matter:**
- Firehose captures posts within seconds (engagement typically zero)
- Follower counts provide immediate influence assessment
- Essential for identifying high-impact voices in social justice content
- Helps prioritize content before viral spread occurs

### **Social Justice Keywords:**
- Food insecurity
- Housing crisis  
- Homelessness
- Unemployment
- Gender inequality


## 🔧 Setup and Dependencies

First, let's import all required libraries and check our setup.


In [29]:
# Import required libraries
import json
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timezone
from collections import defaultdict, Counter
import re

# Check if atproto is available
try:
    from atproto import Client
    print("✅ atproto library available")
except ImportError:
    print("❌ atproto not found. Install with: pip install atproto")

# Set up plotting
plt.style.use('default')
sns.set_palette("husl")

print("🔧 Setup complete!")


✅ atproto library available
🔧 Setup complete!


## 📊 Load and Explore Collected Data

Let's examine the social justice data we've collected from Bluesky.


In [30]:
# Load all alltime data for analysis
keywords = ["food_insecurity", "housing", "homeless", "unemployment", "gender_inequality"]
all_data = []
keyword_counts = {}

print("📊 Loading Alltime Data:")
print("=" * 40)

for keyword in keywords:
    alltime_file = f"data/alltime/{keyword}_alltime.jsonl"
    if os.path.exists(alltime_file):
        keyword_posts = []
        with open(alltime_file, 'r') as f:
            for line in f:
                try:
                    post = json.loads(line.strip())
                    all_data.append(post)
                    keyword_posts.append(post)
                except json.JSONDecodeError:
                    continue
        
        keyword_counts[keyword] = len(keyword_posts)
        print(f"✅ {keyword.replace('_', ' ').title()}: {len(keyword_posts):,} posts")
    else:
        keyword_counts[keyword] = 0
        print(f"❌ {keyword.replace('_', ' ').title()}: No data")

print(f"\n📈 Total Posts Loaded: {len(all_data):,}")

# Convert to DataFrame for analysis
if all_data:
    df = pd.DataFrame(all_data)
    print(f"✅ DataFrame created with {len(df)} rows and {len(df.columns)} columns")
    
    # Show data structure
    print(f"\n🔍 Data Structure:")
    print(f"   Post fields: uri, text, created_at, author_handle")
    print(f"   Author fields: followers_count, influence_score, verified")
    print(f"   Content fields: word_count, hashtags, emotion_score")
    print(f"   Session fields: session_name, collected_at")
else:
    df = pd.DataFrame()
    print("❌ No data available - run collection first")


📊 Loading Alltime Data:
✅ Food Insecurity: 2,152 posts
✅ Housing: 1,574 posts
✅ Homeless: 4,399 posts
✅ Unemployment: 4,329 posts
✅ Gender Inequality: 241 posts

📈 Total Posts Loaded: 12,695
✅ DataFrame created with 12695 rows and 45 columns

🔍 Data Structure:
   Post fields: uri, text, created_at, author_handle
   Author fields: followers_count, influence_score, verified
   Content fields: word_count, hashtags, emotion_score
   Session fields: session_name, collected_at


### 📋 Collection Method Details

**🔥 Firehose Collection** (Original Method)
- **How it works**: Connects to live Bluesky firehose stream
- **Data type**: Real-time posts as they happen
- **Best for**: Monitoring current conversations, trending topics
- **Parameters**: Duration in seconds/minutes
- **Coverage**: Only new posts during collection window

**🔍 Search API Collection** (NEW - Option A Implementation)  
- **How it works**: Uses `GET /xrpc/app.bsky.feed.searchPosts` with cursor pagination
- **Data type**: Historical posts with deep pagination
- **Best for**: Systematic research, historical analysis, comprehensive datasets
- **Parameters**: Date ranges, maximum posts per keyword
- **Coverage**: Can collect posts from weeks/months back
- **Features**: 
  - Enhanced search queries (exact phrases, hashtags)
  - Date range filtering (`record.createdAt` checking)
  - Cursor-based pagination (no missed posts)
  - Rate limiting and error handling

**⚡ Hybrid Collection** (RECOMMENDED)
- **How it works**: Search API first for historical data, then firehose for real-time
- **Data type**: Complete coverage of both historical and current data
- **Best for**: Comprehensive research projects
- **Parameters**: Both date ranges and duration
- **Coverage**: Historical baseline + ongoing monitoring


### 🔍 Enhanced Search Queries (Option A)

The new search API uses optimized queries designed to reduce noise and maximize relevant results:

**Food Insecurity:**
- `"food insecurity"`, `"food insecure"`, `#foodinsecurity`
- `"hunger crisis"`, `"food desert"`, `"SNAP benefits"`
- `"food bank"`, `"food pantry"`, `"EBT"`, `"WIC"`

**Housing:**
- `"housing crisis"`, `"affordable housing"`, `#housingcrisis`
- `"rent crisis"`, `"housing shortage"`, `"eviction"`
- `"housing costs"`, `"rent burden"`, `"gentrification"`

**Homelessness:**
- `"homeless"`, `"homelessness"`, `#homeless`
- `"unhoused"`, `"rough sleeping"`, `"encampment"`
- `"shelter"`, `"street sleeping"`, `"housing first"`

**Unemployment:**
- `"unemployment"`, `"unemployed"`, `#unemployment`
- `"job loss"`, `"layoffs"`, `"jobless"`
- `"unemployment benefits"`, `"fired"`, `"laid off"`

**Gender Inequality:**
- `"gender inequality"`, `"gender gap"`, `#gendergap`
- `"pay gap"`, `"wage gap"`, `"gender discrimination"`
- `"equal pay"`, `"workplace inequality"`, `"glass ceiling"`


## 🚀 Data Collection Execution

### Collection Method Selection

Choose your collection approach and set parameters below. The collector supports three methods:

1. **Firehose**: Real-time stream collection
2. **Search API**: Historical data with pagination  
3. **Both**: Historical first, then real-time (hybrid approach)


In [31]:
# 🔧 COLLECTION PARAMETERS - Edit these values

# ===== COLLECTION METHOD =====
# Choose: 'firehose', 'search', or 'both'
COLLECTION_METHOD = "both"  # Options: firehose | search | both

# ===== TIME MANAGEMENT =====
# Option 1: Single time parameter (recommended for 'both' method)
TOTAL_TIME_SECONDS = 1800  # Auto-splits: 75% search (225s), 25% firehose (75s)

# Option 2: Separate time controls (alternative)
# DURATION_SECONDS = 75      # Firehose duration
# SEARCH_TIMEOUT = 225       # Search timeout

# ===== SEARCH API PARAMETERS (for historical collection) =====
DAYS_BACK = 14  # Days back from now (alternative to specific dates)
# START_DATE = "2024-09-01"  # Optional: specific start date (YYYY-MM-DD)
# END_DATE = "2024-09-18"    # Optional: specific end date (YYYY-MM-DD)
MAX_POSTS_PER_KEYWORD = 100  # Maximum posts per keyword for search

# ===== GENERAL PARAMETERS =====
SESSION_NAME = "notebook_collection"  # Custom session name (optional)

# Display configuration
print(f"📊 Collection Configuration:")
print(f"   Method: {COLLECTION_METHOD}")
print(f"   Session: {SESSION_NAME}")

if COLLECTION_METHOD == 'both' and 'TOTAL_TIME_SECONDS' in locals():
    total_minutes = TOTAL_TIME_SECONDS / 60
    search_time = int(TOTAL_TIME_SECONDS * 0.75)
    firehose_time = int(TOTAL_TIME_SECONDS * 0.25)
    print(f"   Total Time: {TOTAL_TIME_SECONDS} seconds ({total_minutes:.1f} minutes)")
    print(f"   Search Phase: {search_time}s (75%)")
    print(f"   Firehose Phase: {firehose_time}s (25%)")
elif COLLECTION_METHOD in ['firehose', 'both'] and 'DURATION_SECONDS' in locals():
    duration_minutes = DURATION_SECONDS / 60
    print(f"   Firehose Duration: {DURATION_SECONDS} seconds ({duration_minutes:.1f} minutes)")

if COLLECTION_METHOD in ['search', 'both']:
    print(f"   Search Days Back: {DAYS_BACK} days")
    print(f"   Max Posts/Keyword: {MAX_POSTS_PER_KEYWORD}")

print(f"   📁 Output: data/sessions/{SESSION_NAME}/ + data/alltime/")


📊 Collection Configuration:
   Method: both
   Session: notebook_collection
   Total Time: 1800 seconds (30.0 minutes)
   Search Phase: 1350s (75%)
   Firehose Phase: 450s (25%)
   Search Days Back: 14 days
   Max Posts/Keyword: 100
   📁 Output: data/sessions/notebook_collection/ + data/alltime/


### ▶️ Execute HYBRID Collection

Run the cell below to start data collection with your chosen method and parameters.

**What happens:**
- **🔍 Search method**: Collects historical data using native search API with pagination
- **🔥 Firehose method**: Collects real-time data from live stream  
- **⚡ Both method**: Historical data first, then real-time (recommended for comprehensive coverage)


In [32]:
import subprocess
import time
from datetime import datetime

# Check current alltime data BEFORE collection
print("📊 Current Alltime Data BEFORE Collection:")
print("=" * 50)

keywords = ["food_insecurity", "housing", "homeless", "unemployment", "gender_inequality"]
before_counts = {}

for keyword in keywords:
    alltime_file = f"data/alltime/{keyword}_alltime.jsonl"
    if os.path.exists(alltime_file):
        with open(alltime_file, 'r') as f:
            count = sum(1 for line in f)
        before_counts[keyword] = count
        print(f"   {keyword.replace('_', ' ').title()}: {count} posts")
    else:
        before_counts[keyword] = 0
        print(f"   {keyword.replace('_', ' ').title()}: 0 posts")

total_before = sum(before_counts.values())
print(f"\n📈 Total Before: {total_before} posts")

print(f"\n🚀 Starting HYBRID Collection...")
print(f"   Method: {COLLECTION_METHOD}")
print(f"   Session: {SESSION_NAME}")

if COLLECTION_METHOD in ['firehose', 'both']:
    print(f"   🔥 Firehose Duration: {DURATION_SECONDS} seconds ({DURATION_SECONDS/60:.1f} minutes)")
if COLLECTION_METHOD in ['search', 'both']:
    print(f"   🔍 Search Days Back: {DAYS_BACK} days")
    print(f"   🔍 Max Posts/Keyword: {MAX_POSTS_PER_KEYWORD}")

# Build command arguments
cmd_args = [
    'python', 'bluesky_social_justice_collector.py',
    '--method', COLLECTION_METHOD,
    '--session_name', SESSION_NAME
]

# Add time management arguments
if COLLECTION_METHOD == 'both' and 'TOTAL_TIME_SECONDS' in locals():
    # Use total-time parameter for automatic split
    cmd_args.extend(['--total-time', str(TOTAL_TIME_SECONDS)])
elif COLLECTION_METHOD in ['firehose', 'both'] and 'DURATION_SECONDS' in locals():
    # Use separate duration parameter
    cmd_args.extend(['--duration', str(DURATION_SECONDS)])

# Add search timeout if specified
if 'SEARCH_TIMEOUT' in locals():
    cmd_args.extend(['--search-timeout', str(SEARCH_TIMEOUT)])

# Add search parameters
if COLLECTION_METHOD in ['search', 'both']:
    if DAYS_BACK:
        cmd_args.extend(['--days-back', str(DAYS_BACK)])
    cmd_args.extend(['--max-posts', str(MAX_POSTS_PER_KEYWORD)])

print(f"   🔧 Command: {' '.join(cmd_args)}")

# Run collection
start_time = time.time()
try:
    # Set timeout based on method
    timeout_seconds = 120  # Default for search
    if COLLECTION_METHOD == 'both' and 'TOTAL_TIME_SECONDS' in locals():
        timeout_seconds = TOTAL_TIME_SECONDS + 120
    elif COLLECTION_METHOD in ['firehose', 'both'] and 'DURATION_SECONDS' in locals():
        timeout_seconds = DURATION_SECONDS + 120
    
    result = subprocess.run(cmd_args, capture_output=True, text=True, timeout=timeout_seconds)
    
    print("✅ Collection completed!")
    print("\n📋 Collection Output:")
    print("-" * 30)
    print(result.stdout[-1500:])  # Show last 1500 characters
    
    if result.stderr:
        print("\n⚠️ Warnings/Errors:")
        print(result.stderr[-500:])
    
except subprocess.TimeoutExpired:
    print("⏰ Collection timed out (normal for long runs)")
except Exception as e:
    print(f"❌ Collection error: {e}")

actual_duration = time.time() - start_time
print(f"\n⏱️ Actual runtime: {actual_duration/60:.1f} minutes")


📊 Current Alltime Data BEFORE Collection:
   Food Insecurity: 2152 posts
   Housing: 1574 posts
   Homeless: 4399 posts
   Unemployment: 4329 posts
   Gender Inequality: 241 posts

📈 Total Before: 12695 posts

🚀 Starting HYBRID Collection...
   Method: both
   Session: notebook_collection
   🔥 Firehose Duration: 300 seconds (5.0 minutes)
   🔍 Search Days Back: 14 days
   🔍 Max Posts/Keyword: 100
   🔧 Command: python bluesky_social_justice_collector.py --method both --session_name notebook_collection --total-time 1800 --days-back 14 --max-posts 100
✅ Collection completed!

📋 Collection Output:
------------------------------
mployment"': 36 posts
   ⏰ Search timeout reached, stopping at query '"unemployed"'
   📊 Total for 'unemployment': 36 posts

⏰ Search timeout reached (1350s), stopping collection

💾 Final save: 36 posts

📈 Search collection complete: 1524 total posts
🔗 Updating alltime files with search results...
   📊 Alltime files updated:
     food insecurity: +266 new (total: 241

### 📊 Collection Results & Output Directory

Check the results and see what data was collected.


In [33]:
# Check AFTER collection results
print("📊 Collection Results:")
print("=" * 50)

# Check alltime data AFTER collection
after_counts = {}
for keyword in keywords:
    alltime_file = f"data/alltime/{keyword}_alltime.jsonl"
    if os.path.exists(alltime_file):
        with open(alltime_file, 'r') as f:
            count = sum(1 for line in f)
        after_counts[keyword] = count
        growth = count - before_counts.get(keyword, 0)
        print(f"   {keyword.replace('_', ' ').title()}: {count} posts (+{growth} new)")
    else:
        after_counts[keyword] = 0
        print(f"   {keyword.replace('_', ' ').title()}: 0 posts")

total_after = sum(after_counts.values())
total_growth = total_after - total_before

print(f"\n📈 Total Growth: {total_before} → {total_after} (+{total_growth} new posts)")

# Show output directories
print(f"\n📁 Output Directories:")
print(f"   Session data: data/sessions/{SESSION_NAME}/")
print(f"   Alltime data: data/alltime/")

# Check session directory
session_dir = f"data/sessions/{SESSION_NAME}"
if os.path.exists(session_dir):
    session_files = [f for f in os.listdir(session_dir) if f.endswith('.jsonl')]
    print(f"\n📂 Session Files Created:")
    for file in session_files:
        file_path = os.path.join(session_dir, file)
        with open(file_path, 'r') as f:
            count = sum(1 for line in f)
        print(f"   {file}: {count} posts")

# Show sample of latest collected data
print(f"\n📝 Sample of Latest Data:")
print("-" * 30)

for keyword in keywords:
    alltime_file = f"data/alltime/{keyword}_alltime.jsonl"
    if os.path.exists(alltime_file) and after_counts[keyword] > before_counts.get(keyword, 0):
        try:
            # Get last post
            with open(alltime_file, 'r') as f:
                lines = f.readlines()
            if lines:
                last_post = json.loads(lines[-1].strip())
                print(f"\n🎯 Latest {keyword.replace('_', ' ').title()} post:")
                print(f"   Author: @{last_post.get('author_handle', 'unknown')}")
                print(f"   Followers: {last_post.get('author_followers_count', 0):,}")
                print(f"   Text: {last_post.get('text', '')[:100]}...")
                print(f"   Session: {last_post.get('session_name', 'unknown')}")
                break
        except:
            continue


📊 Collection Results:
   Food Insecurity: 2418 posts (+266 new)
   Housing: 2110 posts (+536 new)
   Homeless: 5085 posts (+686 new)
   Unemployment: 4365 posts (+36 new)
   Gender Inequality: 241 posts (+0 new)

📈 Total Growth: 12695 → 14219 (+1524 new posts)

📁 Output Directories:
   Session data: data/sessions/notebook_collection/
   Alltime data: data/alltime/

📂 Session Files Created:
   homeless_posts.jsonl: 686 posts
   food_insecurity_posts.jsonl: 266 posts
   housing_posts.jsonl: 536 posts
   unemployment_posts.jsonl: 36 posts

📝 Sample of Latest Data:
------------------------------

🎯 Latest Food Insecurity post:
   Author: @edgarallandoh.bsky.social
   Followers: 2,614
   Text: 5calls.org, help in local food bank, tutor children, exchange info on what skills/resources you have...
   Session: notebook_collection
