# Additional: Handling large datasets - Practical techniques

This additional notebook demonstrates various approaches for working with large datasets when computational resources are limited.

## Techniques covered:
1. Selective file loading
2. Strategic sampling methods
3. Data preprocessing and compression
4. Memory-efficient data handling
5. Creating manageable subsets for training

In [None]:
# Install required packages (uncomment if needed)
# !pip install pandas pyarrow zstandard tqdm

In [1]:
import json
import gzip
import random
import pandas as pd
from pathlib import Path
from datetime import datetime
from typing import Iterator, Dict, Any
import sys
from tqdm import tqdm

## 1. Selective File Loading

If your torrent contains multiple files or the data is organized chronologically, you can load only what you need.

In [None]:
def list_dataset_files(data_dir: str) -> list:
    """
    List all data files in the directory to see what's available
    before committing to loading everything.
    """
    data_path = Path(data_dir)
    
    if not data_path.exists():
        print(f"Directory {data_dir} not found")
        return []
    
    # Common Reddit data file patterns
    file_patterns = ['*.json', '*.json.gz', '*.jsonl', '*.jsonl.gz', '*.zst']
    
    files = []
    for pattern in file_patterns:
        files.extend(data_path.glob(pattern))
    
    # Sort by size to identify large files
    files_with_size = [(f, f.stat().st_size / (1024**3)) for f in files]  # Size in GB
    files_with_size.sort(key=lambda x: x[1], reverse=True)
    
    print("Available files:")
    for file, size_gb in files_with_size:
        print(f"  {file.name}: {size_gb:.2f} GB")
    
    return [f[0] for f in files_with_size]

# Example usage
# files = list_dataset_files('/path/to/reddit/data')

## 2. Memory-Efficient Iterator for Large Files

Instead of loading entire files into memory, process them line-by-line.

In [None]:
def iterate_reddit_posts(file_path: str, max_posts: int = None) -> Iterator[Dict[str, Any]]:
    """
    Memory-efficient iterator for Reddit data files.
    Handles both compressed (.gz) and uncompressed files.
    
    Args:
        file_path: Path to the Reddit data file
        max_posts: Maximum number of posts to yield (None = all)
    """
    count = 0
    
    # Determine if file is compressed
    open_func = gzip.open if file_path.endswith('.gz') else open
    
    try:
        with open_func(file_path, 'rt', encoding='utf-8') as f:
            for line in f:
                if max_posts and count >= max_posts:
                    break
                    
                try:
                    post = json.loads(line.strip())
                    yield post
                    count += 1
                except json.JSONDecodeError:
                    continue  # Skip malformed lines
                    
    except FileNotFoundError:
        print(f"File not found: {file_path}")
        return

# Example: Preview first 5 posts without loading entire file
# for i, post in enumerate(iterate_reddit_posts('reddit_data.json', max_posts=5)):
#     print(f"Post {i+1}: {post.get('title', 'No title')[:50]}...")

## 3. Strategic Sampling Methods

Create representative subsets of your data using various sampling strategies.

In [None]:
def random_sample_posts(file_path: str, sample_rate: float = 0.1, output_path: str = None) -> list:
    """
    Randomly sample a percentage of posts from a large file.
    
    Args:
        file_path: Path to source file
        sample_rate: Proportion of posts to keep (0.1 = 10%)
        output_path: Optional path to save sampled data
    
    Returns:
        List of sampled posts
    """
    sampled_posts = []
    
    print(f"Sampling {sample_rate*100:.1f}% of posts from {Path(file_path).name}...")
    
    for post in tqdm(iterate_reddit_posts(file_path)):
        if random.random() < sample_rate:
            sampled_posts.append(post)
    
    print(f"Sampled {len(sampled_posts):,} posts")
    
    # Optionally save to file
    if output_path:
        with open(output_path, 'w') as f:
            for post in sampled_posts:
                f.write(json.dumps(post) + '\n')
        print(f"Saved to {output_path}")
    
    return sampled_posts

# Example usage
# sampled_data = random_sample_posts('large_reddit_file.json', sample_rate=0.1, 
#                                     output_path='sampled_10percent.jsonl')

In [None]:
def time_stratified_sample(file_path: str, 
                          start_date: str = None, 
                          end_date: str = None,
                          output_path: str = None) -> list:
    """
    Sample posts from specific time periods (e.g., election cycles, major events).
    
    Args:
        file_path: Path to source file
        start_date: Start date in format 'YYYY-MM-DD' (None = no start limit)
        end_date: End date in format 'YYYY-MM-DD' (None = no end limit)
        output_path: Optional path to save filtered data
    
    Returns:
        List of posts within the date range
    """
    filtered_posts = []
    
    start_ts = datetime.strptime(start_date, '%Y-%m-%d').timestamp() if start_date else 0
    end_ts = datetime.strptime(end_date, '%Y-%m-%d').timestamp() if end_date else float('inf')
    
    print(f"Filtering posts between {start_date or 'beginning'} and {end_date or 'end'}...")
    
    for post in tqdm(iterate_reddit_posts(file_path)):
        # Reddit timestamps are usually in 'created_utc' field
        post_time = post.get('created_utc', 0)
        
        if start_ts <= post_time <= end_ts:
            filtered_posts.append(post)
    
    print(f"Found {len(filtered_posts):,} posts in date range")
    
    if output_path:
        with open(output_path, 'w') as f:
            for post in filtered_posts:
                f.write(json.dumps(post) + '\n')
        print(f"Saved to {output_path}")
    
    return filtered_posts

# Example: Get posts from 2024 election period
# election_posts = time_stratified_sample('reddit_data.json', 
#                                         start_date='2024-01-01', 
#                                         end_date='2024-11-30',
#                                         output_path='election_2024_posts.jsonl')

In [None]:
def engagement_filtered_sample(file_path: str, 
                               min_score: int = 10,
                               min_comments: int = 5,
                               output_path: str = None) -> list:
    """
    Filter posts by engagement metrics (upvotes, comments).
    Focuses on content that actually reached people.
    
    Args:
        file_path: Path to source file
        min_score: Minimum upvote score
        min_comments: Minimum number of comments
        output_path: Optional path to save filtered data
    
    Returns:
        List of high-engagement posts
    """
    filtered_posts = []
    
    print(f"Filtering posts with score >= {min_score} and comments >= {min_comments}...")
    
    for post in tqdm(iterate_reddit_posts(file_path)):
        score = post.get('score', 0)
        num_comments = post.get('num_comments', 0)
        
        if score >= min_score and num_comments >= min_comments:
            filtered_posts.append(post)
    
    print(f"Found {len(filtered_posts):,} high-engagement posts")
    
    if output_path:
        with open(output_path, 'w') as f:
            for post in filtered_posts:
                f.write(json.dumps(post) + '\n')
        print(f"Saved to {output_path}")
    
    return filtered_posts

# Example usage
# popular_posts = engagement_filtered_sample('reddit_data.json', 
#                                            min_score=50, 
#                                            min_comments=10,
#                                            output_path='popular_posts.jsonl')

## 4. Data Preprocessing and Compression

Extract only essential fields and save in efficient formats.

In [None]:
def extract_essential_fields(file_path: str, 
                            fields: list = None,
                            output_path: str = None) -> list:
    """
    Extract only the fields you need for training, reducing data size.
    
    Args:
        file_path: Path to source file
        fields: List of field names to keep (default: common fields for misinformation analysis)
        output_path: Optional path to save processed data
    
    Returns:
        List of processed posts with only essential fields
    """
    if fields is None:
        # Default fields for political misinformation analysis
        fields = [
            'id',
            'title',
            'selftext',
            'author',
            'created_utc',
            'score',
            'num_comments',
            'subreddit',
            'url',
            'domain'
        ]
    
    processed_posts = []
    
    print(f"Extracting fields: {', '.join(fields)}")
    
    for post in tqdm(iterate_reddit_posts(file_path)):
        # Extract only specified fields
        processed_post = {field: post.get(field, '') for field in fields}
        processed_posts.append(processed_post)
    
    print(f"Processed {len(processed_posts):,} posts")
    
    if output_path:
        with open(output_path, 'w') as f:
            for post in processed_posts:
                f.write(json.dumps(post) + '\n')
        
        # Show size reduction
        original_size = Path(file_path).stat().st_size / (1024**2)  # MB
        new_size = Path(output_path).stat().st_size / (1024**2)  # MB
        reduction = (1 - new_size/original_size) * 100
        
        print(f"Saved to {output_path}")
        print(f"Size reduction: {original_size:.1f} MB → {new_size:.1f} MB ({reduction:.1f}% smaller)")
    
    return processed_posts

# Example usage
# essential_data = extract_essential_fields('large_reddit_file.json',
#                                           output_path='essential_fields_only.jsonl')

In [None]:
def convert_to_parquet(file_path: str, output_path: str = None):
    """
    Convert JSON/JSONL to Parquet format for better compression and faster loading.
    Parquet is a columnar format that's much more efficient than JSON.
    
    Args:
        file_path: Path to source JSON/JSONL file
        output_path: Path for output Parquet file
    """
    if output_path is None:
        output_path = Path(file_path).with_suffix('.parquet')
    
    print(f"Converting {file_path} to Parquet format...")
    
    # Read data in chunks to avoid memory issues
    chunk_size = 10000
    chunks = []
    
    current_chunk = []
    for i, post in enumerate(tqdm(iterate_reddit_posts(file_path))):
        current_chunk.append(post)
        
        if len(current_chunk) >= chunk_size:
            chunks.append(pd.DataFrame(current_chunk))
            current_chunk = []
    
    # Add remaining posts
    if current_chunk:
        chunks.append(pd.DataFrame(current_chunk))
    
    # Combine and save
    df = pd.concat(chunks, ignore_index=True)
    df.to_parquet(output_path, compression='snappy', index=False)
    
    # Show size comparison
    original_size = Path(file_path).stat().st_size / (1024**2)  # MB
    new_size = Path(output_path).stat().st_size / (1024**2)  # MB
    reduction = (1 - new_size/original_size) * 100
    
    print(f"\nConversion complete!")
    print(f"Original: {original_size:.1f} MB")
    print(f"Parquet: {new_size:.1f} MB ({reduction:.1f}% smaller)")
    print(f"Saved to: {output_path}")
    
    return output_path

# Example usage
# parquet_file = convert_to_parquet('sampled_data.jsonl')

## 5. Complete Pipeline: From Large File to Training-Ready Dataset

In [None]:
def create_training_dataset(source_file: str,
                           output_dir: str = './processed_data',
                           sample_rate: float = 0.1,
                           min_score: int = 10,
                           start_date: str = None,
                           end_date: str = None,
                           fields: list = None) -> str:
    """
    Complete pipeline to create a manageable training dataset from a large Reddit file.
    
    Steps:
    1. Filter by date range (if specified)
    2. Filter by engagement metrics
    3. Random sampling
    4. Extract essential fields
    5. Convert to Parquet format
    
    Args:
        source_file: Path to large source file
        output_dir: Directory to save processed files
        sample_rate: Proportion of posts to keep after filtering
        min_score: Minimum upvote score for engagement filter
        start_date: Start date for time filter (YYYY-MM-DD)
        end_date: End date for time filter (YYYY-MM-DD)
        fields: Fields to extract (None = defaults)
    
    Returns:
        Path to final Parquet file
    """
    # Create output directory
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True, parents=True)
    
    print("="*60)
    print("CREATING TRAINING DATASET")
    print("="*60)
    print(f"Source: {source_file}")
    print(f"Output directory: {output_dir}")
    print()
    
    # Step 1: Apply all filters
    print("Step 1: Applying filters...")
    filtered_posts = []
    
    start_ts = datetime.strptime(start_date, '%Y-%m-%d').timestamp() if start_date else 0
    end_ts = datetime.strptime(end_date, '%Y-%m-%d').timestamp() if end_date else float('inf')
    
    for post in tqdm(iterate_reddit_posts(source_file)):
        # Time filter
        if start_date or end_date:
            post_time = post.get('created_utc', 0)
            if not (start_ts <= post_time <= end_ts):
                continue
        
        # Engagement filter
        if post.get('score', 0) < min_score:
            continue
        
        # Random sampling
        if random.random() < sample_rate:
            filtered_posts.append(post)
    
    print(f"✓ Filtered to {len(filtered_posts):,} posts\n")
    
    # Step 2: Extract essential fields
    print("Step 2: Extracting essential fields...")
    if fields is None:
        fields = ['id', 'title', 'selftext', 'author', 'created_utc', 
                 'score', 'num_comments', 'subreddit', 'url', 'domain']
    
    processed_posts = [{field: post.get(field, '') for field in fields} 
                      for post in filtered_posts]
    print(f"✓ Extracted {len(fields)} fields\n")
    
    # Step 3: Save as Parquet
    print("Step 3: Converting to Parquet format...")
    df = pd.DataFrame(processed_posts)
    
    output_file = output_path / 'training_data.parquet'
    df.to_parquet(output_file, compression='snappy', index=False)
    
    file_size = output_file.stat().st_size / (1024**2)  # MB
    print(f"✓ Saved {len(df):,} posts to {output_file}")
    print(f"✓ Final size: {file_size:.1f} MB\n")
    
    # Summary statistics
    print("="*60)
    print("DATASET SUMMARY")
    print("="*60)
    print(f"Total posts: {len(df):,}")
    print(f"Date range: {df['created_utc'].min()} to {df['created_utc'].max()}")
    print(f"Subreddits: {df['subreddit'].nunique()}")
    print(f"Average score: {df['score'].mean():.1f}")
    print(f"Average comments: {df['num_comments'].mean():.1f}")
    print(f"File size: {file_size:.1f} MB")
    print("="*60)
    
    return str(output_file)

# Example: Create a training dataset from a large file
# training_file = create_training_dataset(
#     source_file='large_reddit_politics.json',
#     output_dir='./training_data',
#     sample_rate=0.15,  # Keep 15% after filtering
#     min_score=20,      # Posts with at least 20 upvotes
#     start_date='2023-01-01',
#     end_date='2024-12-31'
# )

## 6. Loading and Working with the Processed Data

In [None]:
def load_training_data(parquet_file: str) -> pd.DataFrame:
    """
    Load the processed training data.
    Parquet loads much faster than JSON!
    """
    print(f"Loading {parquet_file}...")
    df = pd.read_parquet(parquet_file)
    print(f"Loaded {len(df):,} posts")
    return df

# Example usage
# df = load_training_data('./processed_data/training_data.parquet')
# print(df.head())
# print(df.info())

## 7. Analyzing Data Distribution

In [None]:
def analyze_dataset(df: pd.DataFrame):
    """
    Quick analysis of your dataset to ensure it's representative.
    """
    print("DATASET ANALYSIS")
    print("="*60)
    
    print(f"\nTotal posts: {len(df):,}")
    print(f"\nDate range:")
    if 'created_utc' in df.columns:
        df['date'] = pd.to_datetime(df['created_utc'], unit='s')
        print(f"  From: {df['date'].min()}")
        print(f"  To: {df['date'].max()}")
    
    print(f"\nTop 10 subreddits by post count:")
    if 'subreddit' in df.columns:
        print(df['subreddit'].value_counts().head(10))
    
    print(f"\nEngagement statistics:")
    if 'score' in df.columns:
        print(f"  Score - Mean: {df['score'].mean():.1f}, Median: {df['score'].median():.1f}")
    if 'num_comments' in df.columns:
        print(f"  Comments - Mean: {df['num_comments'].mean():.1f}, Median: {df['num_comments'].median():.1f}")
    
    print(f"\nText length statistics:")
    if 'selftext' in df.columns:
        df['text_length'] = df['selftext'].str.len()
        print(f"  Mean: {df['text_length'].mean():.0f} characters")
        print(f"  Median: {df['text_length'].median():.0f} characters")
    
    print("="*60)

# Example usage
# analyze_dataset(df)

## Summary

### Recommended Workflow:

1. **Explore your data structure** using `list_dataset_files()` to see what you're working with
2. **Choose your strategy**:
   - If data is split by time: Use selective loading
   - If you need speed: Use random sampling with lower rate
   - If you need quality: Use engagement-based filtering
   - If focused on events: Use time-stratified sampling
3. **Run the complete pipeline** with `create_training_dataset()`
4. **Validate your dataset** with `analyze_dataset()`

### Expected Results:
- **10% random sample** of 25GB file → ~2.5GB
- **Essential fields only** → additional 50-70% reduction
- **Parquet format** → additional 30-50% reduction
- **Final size**: Typically 500MB - 1GB (manageable for most systems)