In [17]:
import os
import pickle
import datetime
import re
import html
import unicodedata
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from tqdm import tqdm
from fileStreams import getFileJsonStream
from collections import defaultdict

# Pre-download required NLTK resources once at the module level
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('punkt', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)

# Initialize global resources once
STOP_WORDS = set(stopwords.words('english'))
LEMMATIZER = WordNetLemmatizer()

# POS tag cache to avoid redundant tagging
POS_CACHE = {}
# Lemma cache to avoid redundant lemmatization
LEMMA_CACHE = {}

def get_wordnet_pos(tag):
    """Convert NLTK POS tag to WordNet POS tag"""
    if tag.startswith('J'):
        return 'a'  # adjective
    elif tag.startswith('V'):
        return 'v'  # verb
    elif tag.startswith('N'):
        return 'n'  # noun
    elif tag.startswith('R'):
        return 'r'  # adverb
    else:
        return 'n'  # default as noun

def preprocess_text(text, lemmatize=True, without_stopwords=True):
    """Preprocess Reddit text content with optimized NLTK operations"""
    # Handle HTML entities
    text = html.unescape(text)
    
    # Unicode normalization
    text = unicodedata.normalize('NFKD', text)
    
    # Remove URLs and Markdown formatting
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'!\[.*?\]\(.*?\)', '', text)
    text = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', text)
    text = re.sub(r'\*\*(.*?)\*\*', r'\1', text)
    text = re.sub(r'\*(.*?)\*', r'\1', text)
    
    # Remove subreddit and user references
    text = re.sub(r'/r/\w+', '', text)
    text = re.sub(r'r/\w+', '', text)
    text = re.sub(r'/u/\w+', '', text)
    text = re.sub(r'u/\w+', '', text)
    
    # Basic text cleaning
    text = re.sub("[^A-Za-z]+", ' ', text).lower()
    
    # Remove single letters (except 'i')
    text = re.sub(r'\b([a-hj-z])\b', '', text)
    
    words = text.split()
    if not words:
        return []
        
    # Lemmatization
    if lemmatize:
        stop_words = STOP_WORDS if without_stopwords else set()
        
        # Process words in a single batch for better performance
        words_to_tag = [word for word in words if not (without_stopwords and word in stop_words)]
        
        if not words_to_tag:
            return []
            
        # First check our caches
        uncached_words = [word for word in words_to_tag if word not in POS_CACHE]
        
        # Only perform POS tagging on words not in cache
        if uncached_words:
            tagged_uncached = nltk.pos_tag(uncached_words)
            # Update cache with new tags
            for word, tag in tagged_uncached:
                POS_CACHE[word] = tag
        
        processed_words = []
        
        # Process each word with cached information
        for word in words_to_tag:
            tag = POS_CACHE[word]
            wordnet_pos = get_wordnet_pos(tag)
            
            # Check lemma cache first
            lemma_key = (word, wordnet_pos)
            if lemma_key in LEMMA_CACHE:
                lemma = LEMMA_CACHE[lemma_key]
            else:
                lemma = LEMMATIZER.lemmatize(word, pos=wordnet_pos)
                LEMMA_CACHE[lemma_key] = lemma
                
            processed_words.append(lemma)
                
        return processed_words
    
    return words

def process_and_save_comments(path, subreddit, without_stopwords=True, batch_size=1000000):
    """Process comments and save in batches"""
    print(f"Processing file: {path}")
    
    # Batch processing counters
    batch_count = 0
    batch_number = 1
    total_count = 0
    
    # Create output directory
    output_dir = f"processed_comments/{subreddit}"
    os.makedirs(output_dir, exist_ok=True)
    
    # Create data structure for comments without author filtering
    comments_batch = []
    
    print(f"Starting to process {subreddit} comments...")
    with open(path, "rb") as f:
        jsonStream = getFileJsonStream(path, f)
        if jsonStream is None:
            print(f"Unable to read file {path}")
            return
        
        for row in tqdm(jsonStream, desc=f"Processing {subreddit} comments"):
            if "body" not in row or "created_utc" not in row or "author" not in row or "id" not in row:
                continue
                
            author = row["author"]
            if author in {"AutoModerator", "election_info_bot"}:
                continue
            
            comment_id = row["id"]
            text = row["body"]
            created_timestamp = row["created_utc"]
            date = datetime.datetime.fromtimestamp(int(created_timestamp))
            
            # Process text with optimized functions
            processed_words = preprocess_text(text, lemmatize=True, without_stopwords=without_stopwords)
            
            if processed_words:
                # Save processed comment with metadata
                comment_data = {
                    "comment_id": comment_id,
                    "author": author,
                    "date": date.strftime("%Y-%m-%d"),
                    "timestamp": created_timestamp,
                    "processed_text": processed_words,  # Original order preserved
                    "original": text
                }
                
                comments_batch.append(comment_data)
                batch_count += 1
                
            # Check if we need to save the current batch
            if batch_count >= batch_size:
                print(f"\nReached {batch_size} comments, saving batch {batch_number}...")
                
                # Save batch directly without filtering
                save_path = f"{output_dir}/{subreddit}_batch{batch_number}.pkl"
                with open(save_path, "wb") as out_file:
                    pickle.dump(comments_batch, out_file)
                
                print(f"Saved {len(comments_batch)} comments to {save_path}")
                
                # Reset batch data
                comments_batch = []
                batch_count = 0
                batch_number += 1
                total_count += batch_size
                
                # Report cache stats
                print(f"POS tag cache size: {len(POS_CACHE)} words")
                print(f"Lemma cache size: {len(LEMMA_CACHE)} word-POS pairs")
    
    # Process any remaining comments
    if batch_count > 0:
        print(f"\nSaving remaining {batch_count} comments...")
        
        # Save batch
        save_path = f"{output_dir}/{subreddit}_batch{batch_number}.pkl"
        with open(save_path, "wb") as out_file:
            pickle.dump(comments_batch, out_file)
        
        print(f"Saved {len(comments_batch)} comments to {save_path}")
        total_count += batch_count
    
    print(f"\nCompleted processing {subreddit} comments!")
    print(f"Total comments saved: {total_count}")
    print(f"Final POS tag cache size: {len(POS_CACHE)} words")
    print(f"Final lemma cache size: {len(LEMMA_CACHE)} word-POS pairs")

def main():
    """Main function"""
    # Define data file paths
    files = {
        "democrats": r"datasets/democrats_comments.zst",
        "republican": r"datasets/Republican_comments.zst",
        "conservative": r"datasets/Conservative_comments.zst",
        "liberal": r"datasets/Liberal_comments.zst",
        "backpacking": r"datasets/backpacking_comments.zst",
        "vagabond": r"datasets/vagabond_comments.zst"
    }
    
    # Choose subreddit to process
    subreddit_to_process = "conservative"  # Change this to process other subreddits
    
    # Ensure output directory exists
    os.makedirs(f"processed_comments/{subreddit_to_process}", exist_ok=True)

    if subreddit_to_process in files:
        process_and_save_comments(  
            files[subreddit_to_process],
            subreddit_to_process,
            without_stopwords=True,
            batch_size=1000000
        )
    else:
        print(f"Subreddit not found: {subreddit_to_process}")
        print(f"Available subreddits: {', '.join(files.keys())}")
        
if __name__ == "__main__":
    main()

Processing file: datasets/Conservative_comments.zst
Starting to process conservative comments...


Processing conservative comments: 1008333it [00:36, 35491.45it/s]


Reached 1000000 comments, saving batch 1...


Processing conservative comments: 1018898it [00:39, 7922.46it/s] 

Saved 1000000 comments to processed_comments/conservative/conservative_batch1.pkl
POS tag cache size: 125996 words
Lemma cache size: 125996 word-POS pairs


Processing conservative comments: 2024567it [01:11, 36607.09it/s]


Reached 1000000 comments, saving batch 2...
Saved 1000000 comments to processed_comments/conservative/conservative_batch2.pkl


Processing conservative comments: 2031742it [01:14, 6605.51it/s] 

POS tag cache size: 173667 words
Lemma cache size: 173667 word-POS pairs


Processing conservative comments: 3034414it [01:44, 39619.67it/s]


Reached 1000000 comments, saving batch 3...
Saved 1000000 comments to processed_comments/conservative/conservative_batch3.pkl


Processing conservative comments: 3042178it [01:46, 7506.74it/s] 

POS tag cache size: 213252 words
Lemma cache size: 213252 word-POS pairs


Processing conservative comments: 4044368it [02:16, 31628.08it/s]


Reached 1000000 comments, saving batch 4...
Saved 1000000 comments to processed_comments/conservative/conservative_batch4.pkl


Processing conservative comments: 4054791it [02:18, 8846.43it/s] 

POS tag cache size: 247173 words
Lemma cache size: 247173 word-POS pairs


Processing conservative comments: 5052754it [02:49, 36698.31it/s]


Reached 1000000 comments, saving batch 5...
Saved 1000000 comments to processed_comments/conservative/conservative_batch5.pkl


Processing conservative comments: 5063536it [02:51, 9049.56it/s] 

POS tag cache size: 277406 words
Lemma cache size: 277406 word-POS pairs


Processing conservative comments: 6067061it [03:20, 41557.34it/s]


Reached 1000000 comments, saving batch 6...


Processing conservative comments: 6075606it [03:22, 8220.65it/s] 

Saved 1000000 comments to processed_comments/conservative/conservative_batch6.pkl
POS tag cache size: 303053 words
Lemma cache size: 303053 word-POS pairs


Processing conservative comments: 7080552it [03:49, 59488.28it/s]


Reached 1000000 comments, saving batch 7...
Saved 1000000 comments to processed_comments/conservative/conservative_batch7.pkl


Processing conservative comments: 7099140it [03:51, 16321.05it/s]

POS tag cache size: 324278 words
Lemma cache size: 324278 word-POS pairs


Processing conservative comments: 8095393it [04:17, 38532.30it/s]


Reached 1000000 comments, saving batch 8...
Saved 1000000 comments to processed_comments/conservative/conservative_batch8.pkl


Processing conservative comments: 8102885it [04:19, 7470.08it/s] 

POS tag cache size: 340768 words
Lemma cache size: 340768 word-POS pairs


Processing conservative comments: 9103313it [04:48, 37776.78it/s]


Reached 1000000 comments, saving batch 9...


Processing conservative comments: 9110552it [04:50, 7198.98it/s] 

Saved 1000000 comments to processed_comments/conservative/conservative_batch9.pkl
POS tag cache size: 357637 words
Lemma cache size: 357637 word-POS pairs


Processing conservative comments: 10118194it [05:19, 35580.96it/s]


Reached 1000000 comments, saving batch 10...
Saved 1000000 comments to processed_comments/conservative/conservative_batch10.pkl


Processing conservative comments: 10125299it [05:21, 6699.31it/s] 

POS tag cache size: 376309 words
Lemma cache size: 376309 word-POS pairs


Processing conservative comments: 11127583it [05:51, 37623.66it/s]


Reached 1000000 comments, saving batch 11...


Processing conservative comments: 11135448it [05:53, 7675.05it/s] 

Saved 1000000 comments to processed_comments/conservative/conservative_batch11.pkl
POS tag cache size: 394619 words
Lemma cache size: 394619 word-POS pairs


Processing conservative comments: 12136369it [06:24, 32666.99it/s]


Reached 1000000 comments, saving batch 12...
Saved 1000000 comments to processed_comments/conservative/conservative_batch12.pkl


Processing conservative comments: 12143399it [06:26, 6998.12it/s] 

POS tag cache size: 412637 words
Lemma cache size: 412637 word-POS pairs


Processing conservative comments: 13147081it [06:56, 38547.93it/s]


Reached 1000000 comments, saving batch 13...
Saved 1000000 comments to processed_comments/conservative/conservative_batch13.pkl


Processing conservative comments: 13158340it [06:58, 9718.60it/s] 

POS tag cache size: 431323 words
Lemma cache size: 431323 word-POS pairs


Processing conservative comments: 14156847it [07:26, 45618.67it/s]


Reached 1000000 comments, saving batch 14...


Processing conservative comments: 14165336it [07:28, 9135.06it/s] 

Saved 1000000 comments to processed_comments/conservative/conservative_batch14.pkl
POS tag cache size: 445631 words
Lemma cache size: 445631 word-POS pairs


Processing conservative comments: 15166380it [07:55, 37957.20it/s]


Reached 1000000 comments, saving batch 15...
Saved 1000000 comments to processed_comments/conservative/conservative_batch15.pkl


Processing conservative comments: 15173395it [07:57, 7838.31it/s] 

POS tag cache size: 458541 words
Lemma cache size: 458541 word-POS pairs


Processing conservative comments: 16172298it [08:26, 36419.57it/s]


Reached 1000000 comments, saving batch 16...


Processing conservative comments: 16179933it [08:28, 7693.87it/s] 

Saved 1000000 comments to processed_comments/conservative/conservative_batch16.pkl
POS tag cache size: 472757 words
Lemma cache size: 472757 word-POS pairs


Processing conservative comments: 17182992it [08:55, 44591.10it/s]


Reached 1000000 comments, saving batch 17...
Saved 1000000 comments to processed_comments/conservative/conservative_batch17.pkl


Processing conservative comments: 17195445it [08:57, 11421.38it/s]

POS tag cache size: 486615 words
Lemma cache size: 486615 word-POS pairs


Processing conservative comments: 18186702it [09:21, 47274.56it/s]


Reached 1000000 comments, saving batch 18...


Processing conservative comments: 18195560it [09:23, 9904.60it/s] 

Saved 1000000 comments to processed_comments/conservative/conservative_batch18.pkl
POS tag cache size: 497034 words
Lemma cache size: 497034 word-POS pairs


Processing conservative comments: 18984143it [09:40, 32681.30it/s]



Saving remaining 788076 comments...
Saved 788076 comments to processed_comments/conservative/conservative_batch19.pkl

Completed processing conservative comments!
Total comments saved: 18788076
Final POS tag cache size: 502625 words
Final lemma cache size: 502625 word-POS pairs


In [12]:
import pickle

def inspect_pkl_file(file_path, num_examples=5):
    """
    Load a pickle file and print a few example records
    
    Args:
        file_path: Path to the pickle file
        num_examples: Number of examples to show (default 5)
    """
    # Load the pickle file
    with open(file_path, 'rb') as file:
        data = pickle.load(file)
    
    # Print information about the data structure
    print(f"Data type: {type(data)}")
    
    if isinstance(data, list):
        print(f"Number of items: {len(data)}")
        
        # Display examples
        print(f"\nShowing first {min(num_examples, len(data))} examples:")
        for i, item in enumerate(data[:num_examples]):
            print(f"\nExample {i+1}:")
            if isinstance(item, dict):
                for key, value in item.items():
                    # For processed text, just show a few words
                    if key == "processed_text" and isinstance(value, list) and len(value) > 10:
                        print(f"  {key}: {value[:10]} ... (total: {len(value)} words)")
                    else:
                        print(f"  {key}: {value}")
            else:
                print(item)
    else:
        print("Data is not a list. Structure:", data)

# Example usage
inspect_pkl_file("processed_comments/democrats/democrats_batch1.pkl")

Data type: <class 'list'>
Number of items: 1000000

Showing first 5 examples:

Example 1:
  comment_id: c07p2u0
  author: Garak
  date: 2009-02-16
  timestamp: 1234791099
  processed_text: ['allow', 'legend', 'grow', 'ill', 'mythical', 'proportion', 'lie', 'fund', 'acorn', 'nowhere'] ... (total: 49 words)
  original: &gt;  And they have allowed its legend to grow to ill and mythical proportions: lies about funding for ACORN, which is nowhere mentioned in the bill. Gross and unanswered misrepresentations by McCain about the "honey bee insurance" provision.

This is a great point. Democrats have this habit of letting the Republicans not only control the conversation, but reduce it to a fourth-grade level. "Honey bee insurance!" (Insurance for livestock producers in general, including honeybees because they are [very important and are have had a rough few years](http://en.wikipedia.org/wiki/Colony_Collapse_Disorder).) "$30M for mice!" (Wetlands restoration.) "Fruit fly research!" (Genetic