# Data Fetch Demo

This notebook demonstrates the data fetching functionality for collecting CTA transit-related posts and comments from Reddit and Bluesky.

## Overview

The data fetch module (`cta_pipeline/data_fetch.py`) provides shared utilities:
- **RateLimiter**: Sliding window rate limiting for API calls
- **RetryConfig / with_retry**: Retry logic with exponential backoff
- **Anonymizer**: Consistent anonymization of user data while preserving relationships
- **Filter lists**: Blocked users, news accounts, keywords

In [2]:
import sys
sys.path.insert(0, '..')

from cta_pipeline.data_fetch import (
    Anonymizer,
    RateLimiter,
    RetryConfig,
    with_retry,
    fetch_json,
    is_blocked_user,
    contains_blocked_keywords,
    REDDIT_HEADERS,
)
from cta_pipeline.constants import REDDIT_SUBREDDITS, BLUESKY_QUERIES

print("Imports successful!")

Imports successful!


## 1. Anonymizer Demo

The Anonymizer generates consistent anonymous IDs while preserving relationships between posts and comments.

In [5]:
# Create an anonymizer instance
anonymizer = Anonymizer()

# Sample data
sample_users = ["john_doe", "jane_smith", "john_doe", "transit_lover"]
sample_post_ids = ["abc123", "def456", "abc123"]

print("=== User Anonymization ===")
for user in sample_users:
    anon = anonymizer.anonymize_author(user)
    print(f"  {user:20} -> {anon}")

print("\n=== Post ID Anonymization ===")
for post_id in sample_post_ids:
    anon = anonymizer.anonymize_post_id(post_id)
    print(f"  {post_id:20} -> {anon}")

print("\n=== Relationship Preservation ===")
print("Notice that 'john_doe' and 'abc123' produce the same hash each time.")
print("This is done to preserve comment --> post and reply --> parent relationships while maintaining privacy.")

=== User Anonymization ===
  john_doe             -> user_58d20899d485
  jane_smith           -> user_28b45eefca23
  john_doe             -> user_58d20899d485
  transit_lover        -> user_11b228375d03

=== Post ID Anonymization ===
  abc123               -> post_c395eef34d82
  def456               -> post_f50c6d85e137
  abc123               -> post_c395eef34d82

=== Relationship Preservation ===
Notice that 'john_doe' and 'abc123' produce the same hash each time.
This is done to preserve comment --> post and reply --> parent relationships while maintaining privacy.


## 2. Filter Functions Demo

The filter functions help exclude bots, news accounts, and non-CTA transit (Metra, AMTRAK, etc) content.

In [6]:
# Test blocked user detection
print("=== Blocked User Detection ===")
test_users = [
    ("AutoModerator", "reddit"),
    ("regular_user", "reddit"),
    ("chicagotribune.com", "bluesky"),
    ("transit_fan.bsky.social", "bluesky"),
]

for user, platform in test_users:
    blocked = is_blocked_user(user, platform)
    status = "BLOCKED" if blocked else "allowed"
    print(f"  {user:30} ({platform:8}) -> {status}")

print("\n=== Blocked Keywords Detection ===")
test_texts = [
    "The CTA Red Line was delayed today",
    "Taking Metra to work is great",
    "Amtrak prices are too high",
    "Love the Blue Line express",
]

for text in test_texts:
    blocked = contains_blocked_keywords(text)
    status = "BLOCKED (non-CTA)" if blocked else "allowed (CTA-related)"
    print(f"  \"{text[:40]}...\" -> {status}")

=== Blocked User Detection ===
  AutoModerator                  (reddit  ) -> BLOCKED
  regular_user                   (reddit  ) -> allowed
  chicagotribune.com             (bluesky ) -> BLOCKED
  transit_fan.bsky.social        (bluesky ) -> allowed

=== Blocked Keywords Detection ===
  "The CTA Red Line was delayed today..." -> allowed (CTA-related)
  "Taking Metra to work is great..." -> BLOCKED (non-CTA)
  "Amtrak prices are too high..." -> BLOCKED (non-CTA)
  "Love the Blue Line express..." -> allowed (CTA-related)


## 3. Rate Limiter Demo

The RateLimiter component ensures we don't exceed API rate limits. It can be configured for different social media services as per their specified API rate limits. In the final scripts, we try to aim slightly lower than the rate limits, just to be safe with our requests and avoiding failures.

In [7]:
import time

# Create a rate limiter (5 calls per 10 seconds for demo)
rate_limiter = RateLimiter(max_calls=5, window_seconds=10)

print("=== Rate Limiter Demo ===")
print(f"Configured: {rate_limiter.max_calls} calls per {rate_limiter.window_seconds} seconds")
print("\nSimulating API calls...")

for i in range(6):
    rate_limiter.check()  # Will sleep if at limit
    rate_limiter.increment()
    print(f"  Call {i+1} completed at {time.strftime('%H:%M:%S')}")

print("\nNote: In production, limits are 2800/5min for Bluesky, 30/min for Reddit")

=== Rate Limiter Demo ===
Configured: 5 calls per 10 seconds

Simulating API calls...
  Call 1 completed at 08:48:32
  Call 2 completed at 08:48:32
  Call 3 completed at 08:48:32
  Call 4 completed at 08:48:32
  Call 5 completed at 08:48:32
{"sleep_seconds": 10.0, "calls_made": 5, "event": "rate_limit_sleep", "level": "info", "timestamp": "2025-12-22T13:48:32.893910Z", "func_name": "check", "filename": "data_fetch.py", "lineno": 45}
  Call 6 completed at 08:48:42

Note: In production, limits are 2800/5min for Bluesky, 30/min for Reddit


## 4. Reddit Data Fetch Sample

Let's fetch a small sample of posts from one subreddit to demonstrate the functionality.

In [8]:
print("=== Configured Subreddits ===")
for sub in REDDIT_SUBREDDITS:
    print(f"  r/{sub}")

=== Configured Subreddits ===
  r/Chicago
  r/AskChicago
  r/CarFreeChicago
  r/AskCHI
  r/cta
  r/ChicagoUrbanism
  r/WindyCity
  r/ChicagoNWSide
  r/greatNWSide


In [9]:
import pandas as pd

def fetch_sample_reddit_posts(subreddit: str, limit: int = 5):
    """Fetch a small sample of posts from a subreddit."""
    url = f"https://www.reddit.com/r/{subreddit}/search.json"
    params = {
        "q": "cta OR train OR bus",
        "restrict_sr": 1,
        "limit": limit,
        "sort": "new",
    }
    
    data = fetch_json(url, params=params, headers=REDDIT_HEADERS)
    
    if data is None:
        print(f"Failed to fetch from r/{subreddit}")
        return []
    
    posts = []
    for child in data.get("data", {}).get("children", []):
        if child.get("kind") != "t3":
            continue
        post_data = child.get("data", {})
        posts.append({
            "subreddit": subreddit,
            "title": post_data.get("title", "")[:60] + "...",
            "author": post_data.get("author", ""),
            "score": post_data.get("score", 0),
            "num_comments": post_data.get("num_comments", 0),
        })
    
    return posts

# Fetch sample from r/cta
print("=== Fetching Sample Posts from r/cta ===")
sample_posts = fetch_sample_reddit_posts("cta", limit=5)

if sample_posts:
    df = pd.DataFrame(sample_posts)
    display(df)
else:
    print("No posts fetched (rate limited or network issue)")

=== Fetching Sample Posts from r/cta ===


Unnamed: 0,subreddit,title,author,score,num_comments
0,cta,Full sized CTA map...,Previous_Mastodon212,9,6
1,cta,Does the Santa car have an assigned number?...,HinsdaleCounty,9,5
2,cta,CPD On Cta...,Queasy-History4464,7,6
3,cta,Red Line Seats...,avec_n,3,6
4,cta,Most CTA drivers are assholes and it gets wors...,Pristine-Angle3100,0,4


In [10]:
# Demonstrate anonymization of the fetched posts
if sample_posts:
    print("=== Anonymized Version ===")
    anonymizer = Anonymizer()
    
    anon_posts = []
    for post in sample_posts:
        anon_posts.append({
            "subreddit": post["subreddit"],
            "title": post["title"],
            "author": anonymizer.anonymize_author(post["author"]),
            "score": post["score"],
            "num_comments": post["num_comments"],
        })
    
    df_anon = pd.DataFrame(anon_posts)
    display(df_anon)
    
    print("\nNote: Author names are now anonymized but consistent (same author = same hash)")

=== Anonymized Version ===


Unnamed: 0,subreddit,title,author,score,num_comments
0,cta,Full sized CTA map...,user_482ebcbbfc2b,9,6
1,cta,Does the Santa car have an assigned number?...,user_367ac42061f7,9,5
2,cta,CPD On Cta...,user_19146637be4d,7,6
3,cta,Red Line Seats...,user_00db62e69e47,3,6
4,cta,Most CTA drivers are assholes and it gets wors...,user_c414a18ad943,0,4



Note: Author names are now anonymized but consistent (same author = same hash)


## 5. Bluesky Configuration

Bluesky requires authentication. Set these environment variables before running the full fetch:

```bash
export BSKY_USERNAME=your-username.bsky.social
export BSKY_PASSWORD=your-app-password
```

Or create a `.env` file (copy from `.env.example`).

In [14]:
import os
from dotenv import load_dotenv
load_dotenv()

print("=== Bluesky Configuration ===")
print(f"Search queries configured: {len(BLUESKY_QUERIES)}")
for query in BLUESKY_QUERIES:
    print(f"  - {query}")

print("\n=== Environment Variables ===")
username = os.environ.get("BSKY_USERNAME")
password = os.environ.get("BSKY_PASSWORD")

if username:
    print(f"  BSKY_USERNAME: {username}")
    print(f"  BSKY_PASSWORD: {'*' * 8 if password else 'NOT SET'}")
else:
    print("  BSKY_USERNAME: NOT SET")
    print("  BSKY_PASSWORD: NOT SET")
    print("\n  To configure, run:")
    print("    export BSKY_USERNAME=your-username.bsky.social")
    print("    export BSKY_PASSWORD=your-app-password")

=== Bluesky Configuration ===
Search queries configured: 6
  - cta AND train
  - cta AND bus
  - cta AND line
  - chicago AND train
  - chicago AND bus
  - chicago AND line

=== Environment Variables ===
  BSKY_USERNAME: acoffeerunner.bsky.social
  BSKY_PASSWORD: ********


In [15]:
# Test Bluesky connection if credentials are available
if username and password:
    try:
        from atproto import Client
        
        print("=== Testing Bluesky Connection ===")
        client = Client()
        client.login(username, password)
        print(f"  Successfully authenticated as: {username}")
        
        # Fetch a small sample
        print("\n=== Fetching Sample Bluesky Posts ===")
        results = client.app.bsky.feed.search_posts(
            params={"q": "cta AND train", "limit": 3}
        )
        
        anonymizer = Anonymizer()
        for post in results.posts[:3]:
            text = getattr(post.record, "text", "")[:60]
            author = anonymizer.anonymize_author(post.author.handle)
            print(f"  Author: {author}")
            print(f"  Text: {text}...")
            print()
            
    except Exception as e:
        print(f"  Error: {e}")
else:
    print("Skipping Bluesky test (credentials not configured)")

=== Testing Bluesky Connection ===
  Successfully authenticated as: acoffeerunner.bsky.social

=== Fetching Sample Bluesky Posts ===
  Author: user_0288c7a08a23
  Text: "Chicago police have been more visible on the CTA Downtown i...

  Author: user_0576e6c7bd53
  Text: TWU president John Samuelsen is being very disingenuous. MBT...

  Author: user_27ff040a7f82
  Text: A 52-year-old man was arrested and charged after he set hims...



## 6. Running Full Data Fetch

To run the full data fetch scripts:

### Reddit
```bash
python reddit_data_fetch.py
```

Output files:
- `data/posts/reddit/reddit_posts.csv`
- `data/posts/reddit/reddit_comments.csv`

### Bluesky
```bash
# Set credentials first
export BSKY_USERNAME=your-username.bsky.social
export BSKY_PASSWORD=your-app-password

python bsky_data_fetch.py
```

Output files:
- `data/posts/bsky/bsky_posts.csv`
- `data/posts/bsky/bsky_comments.csv`

In [17]:
# Check if output files already exist
from pathlib import Path

print("=== Existing Data Files ===")

data_files = [
    "data/posts/reddit/reddit_posts.csv",
    "data/posts/reddit/reddit_comments.csv",
    "data/posts/bsky/bsky_posts.csv",
    "data/posts/bsky/bsky_comments.csv",
]

for file_path in data_files:
    path = Path("..") / file_path
    if path.exists():
        size = path.stat().st_size / 1024
        print(f"  {file_path}: {size:.1f} KB")
        
        # Show sample if CSV
        if path.suffix == ".csv":
            try:
                df = pd.read_csv(path, nrows=3)
                print(f"    Columns: {list(df.columns)}")
                print(f"    Sample rows: {len(df)}")
            except Exception as e:
                print(f"    Error reading: {e}")
    else:
        print(f"  {file_path}: NOT FOUND")

=== Existing Data Files ===
  data/posts/reddit/reddit_posts.csv: 876.6 KB
    Columns: ['post_id', 'subreddit', 'index', 'timestamp', 'text', 'num_comments', 'permalink']
    Sample rows: 3
  data/posts/reddit/reddit_comments.csv: 8733.6 KB
    Columns: ['post_id', 'comment_id', 'parent_id', 'timestamp', 'body', 'author', 'score', 'is_post']
    Sample rows: 3
  data/posts/bsky/bsky_posts.csv: 4285.7 KB
    Columns: ['post_id', 'parent_id', 'author', 'text', 'timestamp']
    Sample rows: 3
  data/posts/bsky/bsky_comments.csv: 10850.4 KB
    Columns: ['post_id', 'comment_id', 'parent_comment_id', 'author', 'text', 'timestamp']
    Sample rows: 3


## 7. Creating a Combined Sample Dataset

Let's create a sample dataset of 2000 comments combining both Reddit and Bluesky data. This demonstrates how to merge data from multiple sources for analysis.

In [21]:
from pathlib import Path
import pandas as pd

SAMPLE_SIZE = 2000

# Load comments from both sources
reddit_path = Path("..") / "data/posts/reddit/reddit_comments.csv"
bsky_path = Path("..") / "data/posts/bsky/bsky_comments.csv"

dfs = []

# Load Reddit comments
if reddit_path.exists():
    reddit_df = pd.read_csv(reddit_path)
    # Normalize columns to common schema
    reddit_df = reddit_df.rename(columns={"body": "text"})
    reddit_df["source"] = "reddit"
    # Select common columns
    reddit_df = reddit_df[["post_id", "comment_id", "author", "text", "timestamp", "source"]]
    dfs.append(reddit_df)
    print(f"Loaded {len(reddit_df):,} Reddit comments")
else:
    print("Reddit comments file not found")

# Load Bluesky comments
if bsky_path.exists():
    bsky_df = pd.read_csv(bsky_path)
    bsky_df["source"] = "bluesky"
    # Select common columns
    bsky_df = bsky_df[["post_id", "comment_id", "author", "text", "timestamp", "source"]]
    dfs.append(bsky_df)
    print(f"Loaded {len(bsky_df):,} Bluesky comments")
else:
    print("Bluesky comments file not found")

if dfs:
    # Combine all sources
    combined = pd.concat(dfs, ignore_index=True)
    print(f"\nTotal combined: {len(combined):,} comments")
    
    # Sample proportionally from each source
    source_counts = combined["source"].value_counts()
    print(f"\nSource distribution:")
    for source, count in source_counts.items():
        pct = count / len(combined) * 100
        print(f"  {source}: {count:,} ({pct:.1f}%)")
    
    # Stratified sampling to maintain source proportions
    sample_dfs = []
    for source in combined["source"].unique():
        source_df = combined[combined["source"] == source]
        # Calculate proportional sample size
        proportion = len(source_df) / len(combined)
        n_samples = int(SAMPLE_SIZE * proportion)
        # Sample (or take all if fewer than n_samples)
        n_samples = min(n_samples, len(source_df))
        sampled = source_df.sample(n=n_samples, random_state=42)
        sample_dfs.append(sampled)
    
    sample_df = pd.concat(sample_dfs, ignore_index=True)
    
    # Shuffle the combined sample
    sample_df = sample_df.sample(frac=1, random_state=42).reset_index(drop=True)
    
    # Anonymize author, post_id, and comment_id
    print("\nAnonymizing user and post data:")
    anonymizer = Anonymizer()
    sample_df["author"] = sample_df["author"].apply(anonymizer.anonymize_author)
    sample_df["post_id"] = sample_df["post_id"].apply(anonymizer.anonymize_post_id)
    sample_df["comment_id"] = sample_df["comment_id"].apply(
        lambda x: anonymizer.anonymize(x, prefix="comment_")
    )
    print(f"Anonymized {len(sample_df):,} rows")
    
    print(f"\nSample dataset created")
    print(f"Total samples: {len(sample_df):,}")
    print(f"\nSample source distribution:")
    for source, count in sample_df["source"].value_counts().items():
        pct = count / len(sample_df) * 100
        print(f"  {source}: {count:,} ({pct:.1f}%)")
    
    # Save sample dataset
    output_path = Path("..") / "data/sample_comments_2000.csv"
    sample_df.to_csv(output_path, index=False)
    print(f"\nSaved to: {output_path}")
    
    # Display sample
    print("\nSample preview (anonymized):")
    display(sample_df.head(10))
else:
    print("No data files found. Run the fetch scripts first.")

Loaded 33,020 Reddit comments
Loaded 29,801 Bluesky comments

Total combined: 62,821 comments

Source distribution:
  reddit: 33,020 (52.6%)
  bluesky: 29,801 (47.4%)

Anonymizing user and post data:
Anonymized 1,999 rows

Sample dataset created
Total samples: 1,999

Sample source distribution:
  reddit: 1,051 (52.6%)
  bluesky: 948 (47.4%)

Saved to: ../data/sample_comments_2000.csv

Sample preview (anonymized):


Unnamed: 0,post_id,comment_id,author,text,timestamp,source
0,post_e110fbc897b2,comment_ac38807efd27,user_39467c09ce27,MODS!!!!!,1763339511.0,reddit
1,post_0b2e57dfc198,comment_6a6d00e38b6c,user_ad6530f147f1,Looked myself. Couldn't find anything on what ...,1709238607.0,reddit
2,post_3e01a205280c,comment_4f820fa3b794,user_753b0bb42975,Among other things that piece of shit in n.v t...,1759987350.0,reddit
3,post_a341235980e8,comment_3806052aef0c,user_8fc27ae13848,happy birthday!,1761585157.0,reddit
4,post_f66d41d3711f,comment_b2e0aa3929fb,user_34f2c9cb63cb,ICE have been ignoring many laws.\nIt would be...,2025-08-31T19:12:39.395Z,bluesky
5,post_8284caaa558d,comment_3bac8c0bab36,user_caeba23aadfe,www.youtube.com/playlist?lis...\n\nI wish like...,2025-12-07T13:51:41.661Z,bluesky
6,post_a8a81d6a7a2c,comment_4d2b82efd2e6,user_4ec2a4e8ee1d,\nEveryone needs to vent sometimes. Here's a f...,1763003021.0,reddit
7,post_fb760a1e5285,comment_4e2c249e4ac8,user_5c59e53795b1,now the very entertaining stuff! budget direct...,2025-09-18T22:55:58.305Z,bluesky
8,post_d359c67403aa,comment_3e26878b1aa6,user_a589b1e1f9da,Hotels dot com. Search Soldier Field and filte...,1759443308.0,reddit
9,post_f445c85aae96,comment_1ff3be1c6464,user_82c3d53ddba9,The Second Tunnel is absolutely Essential the ...,2025-07-15T08:37:52.703Z,bluesky


## Summary

This notebook demonstrated:

1. **Anonymizer**: Consistent hashing of usernames and IDs while preserving relationships
2. **Filter Functions**: Blocking bots, news accounts, and non-CTA transit content
3. **Rate Limiter**: Preventing API rate limit violations
4. **Reddit Fetching**: Sample post retrieval with anonymization
5. **Bluesky Configuration**: Environment variable setup for authentication
6. **Combined Dataset**: Creating a stratified sample from multiple sources

The full fetch scripts (`reddit_data_fetch.py` and `atproto_data_fetch.py`) use these utilities to collect comprehensive datasets while respecting API limits and user privacy.