In [1]:
import sys
from pathlib import Path
from datetime import datetime, timezone
import json
import bz2
from collections import defaultdict
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm

workspace_root = Path.cwd()
sys.path.insert(0, str(workspace_root / 'src'))

from thesis_pipeline.io.config import load_all_configs

print(f"Reddit extraction started: {datetime.now().isoformat()}")
print(f"Workspace: {workspace_root}")

Reddit extraction started: 2025-12-18T14:52:18.325052
Workspace: /Users/stahlma/Desktop/01_Studium/11_Thesis/Data_Experiment


## 1. Load Configuration and Setup Paths

In [2]:
configs = load_all_configs(workspace_root / 'configs')
reddit_cfg = configs['reddit']
global_cfg = configs['global']

# Input/output directories
raw_reddit_dir = workspace_root / 'data/00_raw/reddit/politosphere_2016-09_2016-10'
silver_reddit_dir = workspace_root / 'data/01_silver/reddit'
silver_reddit_dir.mkdir(parents=True, exist_ok=True)

# Period
period_start = global_cfg['validation_run']['period_start']
period_end = global_cfg['validation_run']['period_end']

print(f"Input: {raw_reddit_dir.relative_to(workspace_root)}")
print(f"Output: {silver_reddit_dir.relative_to(workspace_root)}")
print(f"Period: {period_start} to {period_end}")
print(f"Time index field: {reddit_cfg['processing']['time_index_field']}")

Input: data/00_raw/reddit/politosphere_2016-09_2016-10
Output: data/01_silver/reddit
Period: 2016-09-01 to 2016-10-31
Time index field: created_utc


## 2. Inspect Comment File Structure

First, let's examine a few comments to understand the data structure.

In [3]:
# Look at first 3 comments from September file
sample_file = raw_reddit_dir / 'comments_2016-09.bz2'

print("Sample comments from 2016-09:")
print("=" * 80)

with bz2.open(sample_file, 'rt', encoding='utf-8') as f:
    for i in range(3):
        line = f.readline()
        comment = json.loads(line)
        
        print(f"\nComment {i+1}:")
        print(f"  Author: {comment.get('author', 'N/A')}")
        print(f"  Subreddit: {comment.get('subreddit', 'N/A')}")
        print(f"  Created UTC: {comment.get('created_utc', 'N/A')}")
        print(f"  Body preview: {comment.get('body', '')[:100]}...")
        
        # Show all available fields
        if i == 0:
            print(f"\n  Available fields: {list(comment.keys())}")

print("\n" + "=" * 80)

Sample comments from 2016-09:

Comment 1:
  Author: krb7H
  Subreddit: politics
  Created UTC: 1472688001
  Body preview: trump seems to be gaining supporters at an increasing rate. I remember when he had single-digit chan...

  Available fields: ['author', 'body', 'body_cleaned', 'controversiality', 'created_utc', 'distinguished', 'edited', 'gilded', 'id', 'language', 'link_id', 'parent_id', 'retrieved_on', 'score', 'subreddit', 'subreddit_id']

Comment 2:
  Author: OQcjv
  Subreddit: politics
  Created UTC: 1472688001
  Body preview: Hi `alictrmods`. Thank you for participating in /r/Politics. However, [your submission](https://www....

Comment 3:
  Author: mQu7y
  Subreddit: The_Donald
  Created UTC: 1472688002
  Body preview: The Mistakes of the Obama......



## 3. Define Processing Functions

In [4]:
def parse_comment(line: str) -> dict:
    """
    Parse a JSON line from the bz2 file and extract relevant fields.
    
    Returns:
        Dictionary with selected fields, or None if invalid
    """
    try:
        comment = json.loads(line)
        
        # Extract timestamp (comment creation time ONLY)
        created_utc = comment.get('created_utc')
        if not created_utc:
            return None
        
        # Convert to datetime for date bucketing
        timestamp = datetime.fromtimestamp(created_utc, tz=timezone.utc)
        date_str = timestamp.strftime('%Y-%m-%d')
        
        return {
            'comment_id': comment.get('id'),
            'author': comment.get('author'),
            'subreddit': comment.get('subreddit'),
            'body': comment.get('body'),
            'created_utc': created_utc,
            'date': date_str,
            'score': comment.get('score'),
            'link_id': comment.get('link_id'),  # Thread reference (for context, not indexing)
            'parent_id': comment.get('parent_id')
        }
    except (json.JSONDecodeError, KeyError, ValueError) as e:
        return None


def should_keep_comment(comment: dict, config: dict) -> bool:
    """
    Apply filters based on configuration.
    
    Returns:
        True if comment should be kept
    """
    # Check for deleted/removed
    if config['filters']['remove_deleted']:
        body = comment.get('body', '')
        if body in ['[deleted]', '[removed]']:
            return False
    
    # Check minimum length
    min_length = config['filters']['min_comment_length']
    if len(comment.get('body', '')) < min_length:
        return False
    
    # Check for bot comments (simple heuristic: author ends with 'bot')
    if config['filters']['remove_bot_comments']:
        author = comment.get('author', '').lower()
        if author.endswith('bot'):
            return False
    
    return True


print("✓ Processing functions defined")

✓ Processing functions defined


## 4. Process Comment Files

Read both monthly files and organize comments by date.

In [5]:
# Collect comments by date
comments_by_date = defaultdict(list)

# Process both month files
comment_files = [
    raw_reddit_dir / 'comments_2016-09.bz2',
    raw_reddit_dir / 'comments_2016-10.bz2'
]

total_lines = 0
total_parsed = 0
total_kept = 0

print("Processing comment files...")
print("=" * 80)

for comment_file in comment_files:
    if not comment_file.exists():
        print(f"⚠ Skipping missing file: {comment_file.name}")
        continue
    
    print(f"\nProcessing: {comment_file.name}")
    
    with bz2.open(comment_file, 'rt', encoding='utf-8') as f:
        for line in tqdm(f, desc=f"  Reading {comment_file.name}"):
            total_lines += 1
            
            # Parse comment
            comment = parse_comment(line)
            if comment is None:
                continue
            total_parsed += 1
            
            # Apply filters
            if not should_keep_comment(comment, reddit_cfg):
                continue
            total_kept += 1
            
            # Group by date
            date_str = comment['date']
            comments_by_date[date_str].append(comment)

print("\n" + "=" * 80)
print(f"Total lines read: {total_lines:,}")
print(f"Successfully parsed: {total_parsed:,} ({100*total_parsed/total_lines:.1f}%)")
print(f"After filtering: {total_kept:,} ({100*total_kept/total_parsed:.1f}%)")
print(f"Unique dates: {len(comments_by_date)}")
print("=" * 80)

Processing comment files...

Processing: comments_2016-09.bz2


  Reading comments_2016-09.bz2: 4229159it [01:10, 60318.28it/s]



Processing: comments_2016-10.bz2


  Reading comments_2016-10.bz2: 5566298it [01:29, 62095.44it/s]


Total lines read: 9,795,457
Successfully parsed: 9,795,457 (100.0%)
After filtering: 8,787,694 (89.7%)
Unique dates: 61





## 5. Deduplicate and Write Daily Parquet Files

In [6]:
# Deduplication and write
print("\nDeduplicating and writing daily Parquet files...")
print("=" * 80)

total_before_dedup = 0
total_after_dedup = 0
files_written = []

for date_str in sorted(comments_by_date.keys()):
    comments = comments_by_date[date_str]
    total_before_dedup += len(comments)
    
    # Deduplicate: exact match on (author, body, created_utc)
    seen = set()
    unique_comments = []
    
    for comment in comments:
        key = (comment['author'], comment['body'], comment['created_utc'])
        if key not in seen:
            seen.add(key)
            unique_comments.append(comment)
    
    total_after_dedup += len(unique_comments)
    
    # Write to Parquet
    if unique_comments:
        df = pd.DataFrame(unique_comments)
        
        # Write with snappy compression
        output_file = silver_reddit_dir / f"{date_str}.parquet"
        df.to_parquet(output_file, compression='snappy', index=False)
        files_written.append(output_file)
        
        print(f"✓ {date_str}: {len(unique_comments):,} comments ({output_file.stat().st_size / 1024:.1f} KB)")

print("\n" + "=" * 80)
print(f"Before deduplication: {total_before_dedup:,} comments")
print(f"After deduplication: {total_after_dedup:,} comments")
print(f"Duplicates removed: {total_before_dedup - total_after_dedup:,}")
print(f"Files written: {len(files_written)}")
print("=" * 80)


Deduplicating and writing daily Parquet files...
✓ 2016-09-01: 109,184 comments (20014.5 KB)
✓ 2016-09-02: 98,775 comments (17807.7 KB)
✓ 2016-09-03: 74,731 comments (13677.4 KB)
✓ 2016-09-04: 80,990 comments (14837.5 KB)
✓ 2016-09-05: 85,066 comments (15418.9 KB)
✓ 2016-09-06: 110,391 comments (19796.8 KB)
✓ 2016-09-07: 105,031 comments (19094.4 KB)
✓ 2016-09-08: 123,756 comments (21187.7 KB)
✓ 2016-09-09: 98,605 comments (18197.1 KB)
✓ 2016-09-10: 87,648 comments (15302.4 KB)
✓ 2016-09-11: 138,072 comments (21245.0 KB)
✓ 2016-09-12: 153,446 comments (25045.1 KB)
✓ 2016-09-13: 142,864 comments (23227.4 KB)
✓ 2016-09-14: 137,999 comments (23159.3 KB)
✓ 2016-09-15: 132,073 comments (22930.3 KB)
✓ 2016-09-16: 143,633 comments (24518.1 KB)
✓ 2016-09-17: 111,225 comments (18485.4 KB)
✓ 2016-09-18: 110,791 comments (18184.4 KB)
✓ 2016-09-19: 148,111 comments (24955.9 KB)
✓ 2016-09-20: 154,930 comments (26133.1 KB)
✓ 2016-09-21: 138,415 comments (24035.1 KB)
✓ 2016-09-22: 144,055 comments (

## 6. Verify Date Range Coverage

In [7]:
from datetime import timedelta

# Check coverage
start_date = datetime.strptime(period_start, '%Y-%m-%d')
end_date = datetime.strptime(period_end, '%Y-%m-%d')
expected_days = (end_date - start_date).days + 1

# Find missing dates
expected_dates = set()
current = start_date
while current <= end_date:
    expected_dates.add(current.strftime('%Y-%m-%d'))
    current += timedelta(days=1)

written_dates = set(f.stem for f in files_written)
missing_dates = expected_dates - written_dates

print("Date coverage:")
print(f"Expected days: {expected_days}")
print(f"Files written: {len(files_written)}")
print(f"Coverage: {100*len(files_written)/expected_days:.1f}%")

if missing_dates:
    print(f"\n⚠ Missing {len(missing_dates)} dates (no comments found):")
    for date in sorted(missing_dates)[:10]:  # Show first 10
        print(f"  - {date}")
    if len(missing_dates) > 10:
        print(f"  ... and {len(missing_dates) - 10} more")
else:
    print("\n✓ Full coverage: all dates have data")

Date coverage:
Expected days: 61
Files written: 61
Coverage: 100.0%

✓ Full coverage: all dates have data


## 7. Sample Verification

Spot-check a few daily files to ensure data integrity.

In [8]:
# Read and verify a sample file
if files_written:
    sample_file = files_written[len(files_written)//2]  # Pick middle file
    
    print(f"Sample verification: {sample_file.name}")
    print("=" * 80)
    
    df_sample = pd.read_parquet(sample_file)
    
    print(f"Rows: {len(df_sample):,}")
    print(f"Columns: {list(df_sample.columns)}")
    print(f"\nData types:")
    print(df_sample.dtypes)
    
    print(f"\nFirst 3 comments:")
    for idx, row in df_sample.head(3).iterrows():
        print(f"\n  {idx+1}. {row['author']} in r/{row['subreddit']}")
        print(f"     Date: {row['date']}")
        print(f"     Body: {row['body'][:100]}...")
    
    print("\n" + "=" * 80)
else:
    print("⚠ No files to verify")

Sample verification: 2016-10-01.parquet
Rows: 102,991
Columns: ['comment_id', 'author', 'subreddit', 'body', 'created_utc', 'date', 'score', 'link_id', 'parent_id']

Data types:
comment_id     object
author         object
subreddit      object
body           object
created_utc     int64
date           object
score           int64
link_id        object
parent_id      object
dtype: object

First 3 comments:

  1. iQHgh in r/The_Donald
     Date: 2016-10-01
     Body: *Someone* gets it ..

...

  2. 5KBRe in r/politics
     Date: 2016-10-01
     Body: &gt;Dollar coins finally catch on thanks to triumphant RenFaire nerds. 

Alright, let's try to be a ...

  3. [deleted] in r/The_Donald
     Date: 2016-10-01
     Body: Something something socialism only works until......



## 8. Save Processing Metadata

In [9]:
# Save processing summary
processing_metadata = {
    'timestamp': datetime.now().isoformat(),
    'period': {
        'start': period_start,
        'end': period_end,
        'days': expected_days
    },
    'input_files': [f.name for f in comment_files if f.exists()],
    'processing': {
        'total_lines_read': total_lines,
        'successfully_parsed': total_parsed,
        'after_filtering': total_kept,
        'before_deduplication': total_before_dedup,
        'after_deduplication': total_after_dedup,
        'duplicates_removed': total_before_dedup - total_after_dedup
    },
    'output': {
        'directory': str(silver_reddit_dir.relative_to(workspace_root)),
        'files_written': len(files_written),
        'dates_covered': sorted([f.stem for f in files_written]),
        'missing_dates': sorted(list(missing_dates)) if missing_dates else []
    },
    'config': {
        'time_index_field': reddit_cfg['processing']['time_index_field'],
        'filters': reddit_cfg['filters'],
        'deduplication': reddit_cfg['deduplication']
    }
}

metadata_file = silver_reddit_dir / 'processing_metadata.json'
with open(metadata_file, 'w') as f:
    json.dump(processing_metadata, f, indent=2)

print(f"✓ Processing metadata saved: {metadata_file.relative_to(workspace_root)}")

✓ Processing metadata saved: data/01_silver/reddit/processing_metadata.json


## Summary

**Processing Complete!**

Silver layer created with:
- Daily Parquet files (one per date)
- Timestamped by `created_utc` (comment creation time)
- Filtered for quality (length, deleted, bots)
- Deduplicated by (author, body, created_utc)

**Next Step:** Proceed to notebook 12 to create thread-context pseudodocuments (gold layer).