# Yelp Data Cleaning: Noise Reduction (Chunking Refactor)

## Phase 0: Setup and Initial Load

In [None]:
import pandas as pd
import numpy as np
import json
import datetime
import re
import os
import random
import gc # Garbage Collector
from sklearn.ensemble import IsolationForest
import nltk # Uncomment if using stopwords in Phase 2
from nltk.corpus import stopwords # Uncomment if using stopwords
from nltk.tokenize import word_tokenize # Uncomment if using stopwords

# Potential plotting libraries (optional)
import matplotlib.pyplot as plt
import seaborn as sns

# Settings
pd.set_option('display.max_columns', None) # Show all columns

SyntaxError: invalid syntax (1124650115.py, line 14)

In [None]:
# Define File Paths
input_dir = '../yelp_dataset/' # Relative to script location
review_file = os.path.join(input_dir, 'yelp_academic_dataset_review.json')
user_file = os.path.join(input_dir, 'yelp_academic_dataset_user.json')
business_file = os.path.join(input_dir, 'yelp_academic_dataset_business.json')
# tip_file = os.path.join(input_dir, 'yelp_academic_dataset_tip.json') # Optional

# Define Output Directory
output_dir = os.path.join(input_dir, 'cleaned_data')
os.makedirs(output_dir, exist_ok=True)
print(f"Output directory created/exists: {output_dir}")

In [None]:
# Load Static Data (Users and Business)
print("Loading user data...")
users_df = pd.read_json(user_file, lines=True)
print("Loading business data...")
business_df = pd.read_json(business_file, lines=True)
print("Static data loading complete.")

print("--- Initial Static Data Shapes ---")
print(f"Users: {users_df.shape}")     # Expected: (1987897, 22)
print(f"Business: {business_df.shape}") # Expected: (150346, 14)

print("\n--- User Columns & Head ---")
print(users_df.columns)
print(users_df.head(2))

print("\n--- Business Columns & Head ---")
print(business_df.columns)
print(business_df.head(2))

print("\n--- Missing Values (Static) ---")
print("\nUsers:\n", users_df.isnull().sum())
print("\nBusiness:\n", business_df.isnull().sum()) # Address/postal code NaNs expected

In [None]:
# Define helper function (moved from original Phase 1)
def count_friends(friends_str):
    if friends_str is None or friends_str == 'None' or pd.isna(friends_str):
        return 0
    return len(friends_str.split(','))

# Define URL pattern (moved from original Phase 2)
url_pattern = r'http[s]?://|www\\.|\\S+\\.(com|net|org|edu|gov)\\S*|\\S+@\\S+'

## Chunk Processing Loop (Reviews)

In [None]:
chunk_size = 500000
processed_chunks = []
total_reviews_processed = 0
total_reviews_kept_chunk_phase = 0

print(f"Starting review processing in chunks of {chunk_size}...")

for i, review_chunk in enumerate(pd.read_json(review_file, lines=True, chunksize=chunk_size)):
    print(f"\n--- Processing Chunk {i+1} --- ({len(review_chunk)} reviews)")
    total_reviews_processed += len(review_chunk)

    # 1. Merge with Users
    print("  Merging with users...")
    chunk_merged = pd.merge(review_chunk, users_df, on='user_id', how='inner', suffixes=('_review', '_user'))
    print(f"  Reviews after merge: {len(chunk_merged)} (Removed {len(review_chunk) - len(chunk_merged)} due to missing users)")
    del review_chunk # Free memory
    gc.collect()

    # 2. Feature Engineering (Chunk-level)
    print("  Performing feature engineering...")
    chunk_merged['date'] = pd.to_datetime(chunk_merged['date'], errors='coerce')
    chunk_merged['yelping_since'] = pd.to_datetime(chunk_merged['yelping_since'], errors='coerce')
    # Drop rows where date conversion failed
    initial_chunk_count = len(chunk_merged)
    chunk_merged.dropna(subset=['date', 'yelping_since'], inplace=True)
    print(f"    Dropped {initial_chunk_count - len(chunk_merged)} rows due to NaT dates.")

    chunk_merged['text_length'] = chunk_merged['text'].astype(str).str.len()
    compliment_cols = [col for col in chunk_merged.columns if col.startswith('compliment_')]
    chunk_merged['user_compliments_total'] = chunk_merged[compliment_cols].sum(axis=1)
    chunk_merged['user_friends_count'] = chunk_merged['friends'].apply(count_friends)
    if 'elite' in chunk_merged.columns:
        chunk_merged['user_is_elite'] = chunk_merged['elite'].apply(lambda x: isinstance(x, str) and x != '' and x is not None)
    else:
        chunk_merged['user_is_elite'] = False

    # 3. Apply Review-Level Filters (A, F, 2A)
    print("  Applying review-level filters...")
    count_before_filters = len(chunk_merged)

    # Filter A: Text Length
    MIN_REVIEW_LENGTH = 50
    chunk_merged = chunk_merged[chunk_merged['text_length'] >= MIN_REVIEW_LENGTH]
    print(f"    Filter A (Text Length >= {MIN_REVIEW_LENGTH}): Kept {len(chunk_merged)} reviews")

    # Filter F: Low Usefulness (Old/Short/0-useful)
    MAX_REVIEW_AGE_YEARS = 2
    cutoff_date = pd.Timestamp.now(tz='UTC') - pd.DateOffset(years=MAX_REVIEW_AGE_YEARS)
    if chunk_merged['date'].dt.tz is not None:
        cutoff_date = cutoff_date.tz_convert(chunk_merged['date'].dt.tz)
    else:
        cutoff_date = cutoff_date.tz_localize(None)
    condition_f = (
        (chunk_merged['date'] < cutoff_date) & 
        (chunk_merged['useful_review'] == 0) & 
        (chunk_merged['text_length'] < 150)
    )
    chunk_merged = chunk_merged[~condition_f]
    print(f"    Filter F (Low Usefulness): Kept {len(chunk_merged)} reviews")

    # Filter 2A: URLs/Emails
    contains_url_mask = chunk_merged['text'].str.contains(url_pattern, regex=True, na=False)
    chunk_merged = chunk_merged[~contains_url_mask]
    print(f"    Filter 2A (URLs/Emails): Kept {len(chunk_merged)} reviews")

    reviews_removed_in_chunk = count_before_filters - len(chunk_merged)
    print(f"  Total reviews removed in this chunk by review-level filters: {reviews_removed_in_chunk}")
    total_reviews_kept_chunk_phase += len(chunk_merged)

    # 4. Append to list (select necessary columns to save memory)
    # Keep all columns needed for Phase 1 filters + final output
    cols_to_keep_for_phase1 = [
        'review_id', 'user_id', 'business_id', 'stars_review', 
        'useful_review', 'funny_review', 'cool_review', 'text', 'date', 
        'text_length', 'user_compliments_total', 'user_friends_count', 
        'user_is_elite', 'review_count' # Original user review count needed for Filter E
        # 'yelping_since' # Not strictly needed after date drop
    ]
    # Ensure all needed columns exist before selecting
    existing_cols_to_keep = [col for col in cols_to_keep_for_phase1 if col in chunk_merged.columns]
    processed_chunks.append(chunk_merged[existing_cols_to_keep])
    print(f"  Chunk {i+1} processing complete. Appended {len(chunk_merged)} reviews.")

    # 5. Memory Cleanup
    del chunk_merged
    gc.collect()

print(f"\nFinished processing all chunks. Total reviews processed: {total_reviews_processed}")
print(f"Total reviews kept after chunk-level processing: {total_reviews_kept_chunk_phase}")

## Combine Processed Chunks

In [None]:
print("\nCombining processed chunks...")
if processed_chunks:
    reviews_combined_df = pd.concat(processed_chunks, ignore_index=True)
    print(f"Combined DataFrame shape: {reviews_combined_df.shape}")
    # Clear chunk list from memory
    del processed_chunks
    gc.collect()
else:
    print("No chunks were processed or kept. Exiting.")
    # Handle exit or create an empty DataFrame if needed
    reviews_combined_df = pd.DataFrame() # Or raise an error

# Ensure date is datetime type after concat
if not reviews_combined_df.empty:
    reviews_combined_df['date'] = pd.to_datetime(reviews_combined_df['date'], errors='coerce')

## Phase 1: Rule-Based Filtering (User Aggregation)

In [None]:
# Baseline Evaluation Metrics (After Chunking/Combining, Before Stage 1 Filters)
print("--- Baseline Metrics (After Chunking/Combining) ---")
if not reviews_combined_df.empty:
    baseline_review_count = len(reviews_combined_df)
    baseline_user_ids = reviews_combined_df['user_id'].unique()
    baseline_user_count = len(baseline_user_ids)

    print(f"Combined Review Count: {baseline_review_count}")
    print(f"Combined Unique User Count: {baseline_user_count}")

    print("Combined Star Rating Distribution (%):")
    print(reviews_combined_df['stars_review'].value_counts(normalize=True).sort_index() * 100)

    print(f"Combined Avg Text Length: {reviews_combined_df['text_length'].mean():.2f}")
    print(f"Combined Avg Useful Votes per Review: {reviews_combined_df['useful_review'].mean():.2f}")

    # Calculate baseline average user metrics (need to aggregate first)
    baseline_user_agg = reviews_combined_df.drop_duplicates(subset=['user_id']).agg(
        avg_compliments=('user_compliments_total', 'mean'),
        avg_friends=('user_friends_count', 'mean'),
        pct_elite=('user_is_elite', 'mean')
    )
    print("\nBaseline Avg User Metrics (Unique Users):")
    print(f"  Avg Compliments: {baseline_user_agg['avg_compliments']:.2f}")
    print(f"  Avg Friends Count: {baseline_user_agg['avg_friends']:.2f}")
    print(f"  % Elite Users: {baseline_user_agg['pct_elite'] * 100:.2f}%")

    # Keep a copy for filtering stages
    reviews_stage1_df = reviews_combined_df.copy()
    del reviews_combined_df # Free memory
    gc.collect()
else:
    print("Combined DataFrame is empty, skipping Phase 1.")
    reviews_stage1_df = pd.DataFrame() # Ensure it exists

### Apply User-Level Filters (Iteratively)

In [None]:
# Filters B, C, D, E are applied here on the combined data
if not reviews_stage1_df.empty:
    # (B) Filter by Review Bursts
    print("\n--- Filter B: Review Bursts ---")
    count_before = len(reviews_stage1_df)
    users_before = reviews_stage1_df['user_id'].nunique()

    # Calculate reviews per user per day
    reviews_per_day = reviews_stage1_df.groupby([reviews_stage1_df['user_id'], reviews_stage1_df['date'].dt.date])\
                                         .size().reset_index(name='reviews_per_day')

    # Find max reviews per day for each user
    max_reviews_per_user = reviews_per_day.groupby('user_id')['reviews_per_day'].max().reset_index(name='max_reviews_per_day')

    # Merge max back to the main df
    reviews_stage1_df = pd.merge(reviews_stage1_df, max_reviews_per_user, on='user_id', how='left')

    avg_max_burst_before = reviews_stage1_df['max_reviews_per_day'].mean()

    # Apply filter
    MAX_BURST_RATE = 10 # Reviews per day
    users_to_remove_burst = reviews_stage1_df[reviews_stage1_df['max_reviews_per_day'] > MAX_BURST_RATE]['user_id'].unique()
    reviews_stage1_df = reviews_stage1_df[~reviews_stage1_df['user_id'].isin(users_to_remove_burst)]

    count_after = len(reviews_stage1_df)
    users_after = reviews_stage1_df['user_id'].nunique()
    # Recalculate mean only if df is not empty
    avg_max_burst_after = reviews_stage1_df['max_reviews_per_day'].mean() if not reviews_stage1_df.empty else 0

    print(f"Users identified with burst rate > {MAX_BURST_RATE}: {len(users_to_remove_burst)}")
    print(f"Reviews removed: {count_before - count_after}")
    print(f"Users removed: {users_before - users_after}")
    print(f"Avg max reviews/day before: {avg_max_burst_before:.2f}, after: {avg_max_burst_after:.2f}")

    # Drop the temporary column
    reviews_stage1_df.drop(columns=['max_reviews_per_day'], inplace=True)
    gc.collect()
else:
    print("Skipping Filter B: Input DataFrame is empty.")

In [None]:
if not reviews_stage1_df.empty:
    # (C) Filter by Activity Window
    print("\n--- Filter C: Activity Window ---")
    count_before = len(reviews_stage1_df)
    users_before = reviews_stage1_df['user_id'].nunique()

    # Group by user to find min/max date and review count
    user_activity = reviews_stage1_df.groupby('user_id').agg(
        min_date=('date', 'min'),
        max_date=('date', 'max'),
        user_review_count_stage1=('review_id', 'count') # Count reviews remaining at this stage
    ).reset_index()

    # Calculate activity window
    user_activity['activity_window_days'] = (user_activity['max_date'] - user_activity['min_date']).dt.days

    # Merge back
    reviews_stage1_df = pd.merge(reviews_stage1_df, user_activity[['user_id', 'activity_window_days', 'user_review_count_stage1']], on='user_id', how='left')

    median_window_before = reviews_stage1_df['activity_window_days'].median()

    # Apply filter
    MIN_ACTIVITY_WINDOW_DAYS = 60
    MIN_REVIEWS_FOR_WINDOW_FILTER = 3

    condition = (
        (reviews_stage1_df['activity_window_days'] < MIN_ACTIVITY_WINDOW_DAYS) & 
        (reviews_stage1_df['user_review_count_stage1'] >= MIN_REVIEWS_FOR_WINDOW_FILTER)
    )
    users_to_remove_window = reviews_stage1_df[condition]['user_id'].unique()
    reviews_stage1_df = reviews_stage1_df[~reviews_stage1_df['user_id'].isin(users_to_remove_window)]

    count_after = len(reviews_stage1_df)
    users_after = reviews_stage1_df['user_id'].nunique()
    median_window_after = reviews_stage1_df['activity_window_days'].median() if not reviews_stage1_df.empty else 0

    print(f"Users identified with window < {MIN_ACTIVITY_WINDOW_DAYS} days & >= {MIN_REVIEWS_FOR_WINDOW_FILTER} reviews: {len(users_to_remove_window)}")
    print(f"Reviews removed: {count_before - count_after}")
    print(f"Users removed: {users_before - users_after}")
    print(f"Median activity window (days) before: {median_window_before:.0f}, after: {median_window_after:.0f}")

    # Drop temporary columns
    reviews_stage1_df.drop(columns=['activity_window_days', 'user_review_count_stage1'], inplace=True)
    gc.collect()
else:
    print("Skipping Filter C: Input DataFrame is empty.")

In [None]:
if not reviews_stage1_df.empty:
    # (D) Filter by Rating Deviation
    print("\n--- Filter D: Rating Deviation ---")
    count_before = len(reviews_stage1_df)
    users_before = reviews_stage1_df['user_id'].nunique()

    # Calculate average star rating per business (using current filtered reviews)
    business_avg_stars = reviews_stage1_df.groupby('business_id')['stars_review'].mean().reset_index(name='business_avg_rating')

    # Merge business average rating
    reviews_stage1_df = pd.merge(reviews_stage1_df, business_avg_stars, on='business_id', how='left')

    # Calculate deviation for each review
    reviews_stage1_df['rating_deviation'] = (reviews_stage1_df['stars_review'] - reviews_stage1_df['business_avg_rating']).abs()

    # Calculate average deviation per user
    user_avg_deviation = reviews_stage1_df.groupby('user_id')['rating_deviation'].mean().reset_index(name='user_avg_deviation')

    # Merge user average deviation
    reviews_stage1_df = pd.merge(reviews_stage1_df, user_avg_deviation, on='user_id', how='left')

    avg_dev_before = reviews_stage1_df['user_avg_deviation'].mean()

    # Apply filter (excluding Elite users)
    MAX_AVG_DEVIATION = 1.8
    condition = (
        (reviews_stage1_df['user_avg_deviation'] > MAX_AVG_DEVIATION) & 
        (reviews_stage1_df['user_is_elite'] == False)
    )
    users_to_remove_deviation = reviews_stage1_df[condition]['user_id'].unique()
    reviews_stage1_df = reviews_stage1_df[~reviews_stage1_df['user_id'].isin(users_to_remove_deviation)]

    count_after = len(reviews_stage1_df)
    users_after = reviews_stage1_df['user_id'].nunique()
    avg_dev_after = reviews_stage1_df['user_avg_deviation'].mean() if not reviews_stage1_df.empty else 0

    print(f"Users identified with avg deviation > {MAX_AVG_DEVIATION} (non-elite): {len(users_to_remove_deviation)}")
    print(f"Reviews removed: {count_before - count_after}")
    print(f"Users removed: {users_before - users_after}")
    print(f"Avg user rating deviation before: {avg_dev_before:.2f}, after: {avg_dev_after:.2f}")

    # Drop temporary columns
    reviews_stage1_df.drop(columns=['business_avg_rating', 'rating_deviation', 'user_avg_deviation'], inplace=True)
    gc.collect()
else:
    print("Skipping Filter D: Input DataFrame is empty.")

In [None]:
if not reviews_stage1_df.empty:
    # (E) Filter by Low Engagement (Combined Rule)
    print("\n--- Filter E: Low Engagement ---")
    count_before = len(reviews_stage1_df)
    users_before = reviews_stage1_df['user_id'].nunique()

    # We kept 'review_count' (original user review count) from the chunk processing merge
    # If it wasn't kept, merge it back from users_df here:
    # if 'review_count' not in reviews_stage1_df.columns:
    #     reviews_stage1_df = pd.merge(reviews_stage1_df, users_df[['user_id', 'review_count']], on='user_id', how='left', suffixes=('', '_original'))

    condition = (
        (reviews_stage1_df['user_compliments_total'] == 0) & 
        (reviews_stage1_df['user_friends_count'] == 0) & 
        (reviews_stage1_df['user_is_elite'] == False) & 
        (reviews_stage1_df['review_count'] <= 5) # Using original review count
    )
    users_to_remove_low_engagement = reviews_stage1_df[condition]['user_id'].unique()

    # Evaluate stats for removed vs kept (before removing)
    removed_stats = reviews_stage1_df[reviews_stage1_df['user_id'].isin(users_to_remove_low_engagement)].agg(
        avg_compliments=('user_compliments_total', 'mean'),
        avg_friends=('user_friends_count', 'mean')
    )
    kept_stats = reviews_stage1_df[~reviews_stage1_df['user_id'].isin(users_to_remove_low_engagement)].agg(
        avg_compliments=('user_compliments_total', 'mean'),
        avg_friends=('user_friends_count', 'mean')
    )

    reviews_stage1_df = reviews_stage1_df[~reviews_stage1_df['user_id'].isin(users_to_remove_low_engagement)]

    count_after = len(reviews_stage1_df)
    users_after = reviews_stage1_df['user_id'].nunique()

    print(f"Users identified as low engagement: {len(users_to_remove_low_engagement)}")
    # Handle potential NaN if removed_stats is empty
    print(f"  Removed Avg Compliments: {removed_stats['avg_compliments']:.2f}, Avg Friends: {removed_stats['avg_friends']:.2f}" if not removed_stats.empty else "  Removed Stats: N/A (no users removed)")
    print(f"  Kept Avg Compliments: {kept_stats['avg_compliments']:.2f}, Avg Friends: {kept_stats['avg_friends']:.2f}" if not kept_stats.empty else "  Kept Stats: N/A (no users kept)")
    print(f"Reviews removed: {count_before - count_after}")
    print(f"Users removed: {users_before - users_after}")

    # Drop temporary column if it was merged in this step
    # if 'review_count_original' in reviews_stage1_df.columns:
    #     reviews_stage1_df.drop(columns=['review_count_original'], inplace=True)
    # We keep 'review_count' as it's needed for Phase 3
    gc.collect()
else:
    print("Skipping Filter E: Input DataFrame is empty.")

### Stage 1 Evaluation & Saving

In [None]:
# Calculate final metrics after all Stage 1 filters
print("\n--- Stage 1 Final Metrics ---")
if not reviews_stage1_df.empty:
    final_review_count_s1 = len(reviews_stage1_df)
    final_user_ids_s1 = reviews_stage1_df['user_id'].unique()
    final_user_count_s1 = len(final_user_ids_s1)

    print(f"Final Review Count (Stage 1): {final_review_count_s1}")
    print(f"Final Unique User Count (Stage 1): {final_user_count_s1}")

    print("Final Star Rating Distribution (% - Stage 1):")
    print(reviews_stage1_df['stars_review'].value_counts(normalize=True).sort_index() * 100)

    print(f"Final Avg Text Length (Stage 1): {reviews_stage1_df['text_length'].mean():.2f}")
    print(f"Final Avg Useful Votes per Review (Stage 1): {reviews_stage1_df['useful_review'].mean():.2f}")

    # Calculate final average user metrics (Stage 1)
    final_user_agg_s1 = reviews_stage1_df.drop_duplicates(subset=['user_id']).agg(
        avg_compliments=('user_compliments_total', 'mean'),
        avg_friends=('user_friends_count', 'mean'),
        pct_elite=('user_is_elite', 'mean')
    )
    print("\nFinal Avg User Metrics (Unique Users - Stage 1):")
    print(f"  Avg Compliments: {final_user_agg_s1['avg_compliments']:.2f}")
    print(f"  Avg Friends Count: {final_user_agg_s1['avg_friends']:.2f}")
    print(f"  % Elite Users: {final_user_agg_s1['pct_elite'] * 100:.2f}%")

    print("\n--- Comparison: Baseline (Post-Chunking) vs Stage 1 ---")
    print(f"Reviews: {baseline_review_count} -> {final_review_count_s1} ({final_review_count_s1 / baseline_review_count * 100:.1f}%) - Removed: {baseline_review_count - final_review_count_s1}")
    print(f"Users: {baseline_user_count} -> {final_user_count_s1} ({final_user_count_s1 / baseline_user_count * 100:.1f}%) - Removed: {baseline_user_count - final_user_count_s1}")
    # Note: Baseline metrics are from the combined df before Stage 1 filters
    print(f"Avg Text Length: {baseline_user_agg.get('avg_text_length', 'N/A')} -> {reviews_stage1_df['text_length'].mean():.2f}") # Baseline avg length not calculated earlier
    print(f"Avg Useful Votes: {baseline_user_agg.get('avg_useful_votes', 'N/A')} -> {reviews_stage1_df['useful_review'].mean():.2f}") # Baseline avg useful not calculated earlier
    print(f"Avg User Compliments: {baseline_user_agg['avg_compliments']:.2f} -> {final_user_agg_s1['avg_compliments']:.2f}")
    print(f"Avg User Friends: {baseline_user_agg['avg_friends']:.2f} -> {final_user_agg_s1['avg_friends']:.2f}")
    print(f"% Elite Users: {baseline_user_agg['pct_elite'] * 100:.2f}% -> {final_user_agg_s1['pct_elite'] * 100:.2f}%")
else:
    print("Stage 1 DataFrame is empty. No metrics to calculate.")
    final_review_count_s1 = 0
    final_user_count_s1 = 0

In [None]:
# Qualitative Check (Stage 1)
print("\n--- Qualitative Check (Stage 1) ---")
if not reviews_stage1_df.empty:
    kept_reviews_s1 = reviews_stage1_df
    # To find removed reviews, we need to compare against the state *before* Stage 1 filters
    # This requires reloading the combined data or keeping a copy, which might use too much memory.
    # Simplified check: Show samples of kept reviews.
    print(f"Total kept after Stage 1: {len(kept_reviews_s1)}")

    if len(kept_reviews_s1) >= 10:
        print("\nSample Kept Reviews (Stage 1):")
        print(kept_reviews_s1.sample(min(10, len(kept_reviews_s1)))[['review_id', 'user_id', 'stars_review', 'text']])
    else:
        print("\nNot enough kept reviews to sample.")
    # Note: Comparing removed reviews is harder with chunking without reloading.
else:
    print("Stage 1 DataFrame is empty. No qualitative check possible.")

In [None]:
# Save Cleaned Data (Stage 1)
print("\n--- Saving Stage 1 Data ---")
if not reviews_stage1_df.empty:
    # Select relevant columns for reviews output
    review_cols_to_keep_output = ['review_id', 'user_id', 'business_id', 'stars_review', 'useful_review', 'funny_review', 'cool_review', 'text', 'date']
    reviews_stage1_cleaned_df = reviews_stage1_df[review_cols_to_keep_output]

    output_reviews_s1_path = os.path.join(output_dir, 'reviews_stage1_cleaned.json')
    reviews_stage1_cleaned_df.to_json(output_reviews_s1_path, orient='records', lines=True, date_format='iso')
    print(f"Saved Stage 1 cleaned reviews ({len(reviews_stage1_cleaned_df)}) to: {output_reviews_s1_path}")

    # Filter original users_df to keep only those remaining
    remaining_user_ids_s1 = reviews_stage1_cleaned_df['user_id'].unique()
    users_stage1_cleaned_df = users_df[users_df['user_id'].isin(remaining_user_ids_s1)].copy() # Use copy to avoid SettingWithCopyWarning

    output_users_s1_path = os.path.join(output_dir, 'users_stage1_cleaned.json')
    users_stage1_cleaned_df.to_json(output_users_s1_path, orient='records', lines=True, date_format='iso')
    print(f"Saved Stage 1 cleaned users ({len(users_stage1_cleaned_df)}) to: {output_users_s1_path}")

    # Prepare for Stage 2 - Keep the filtered reviews DataFrame
    reviews_stage2_df = reviews_stage1_df.copy()
    del reviews_stage1_df # Free memory from the copy used for saving
    del reviews_stage1_cleaned_df
    gc.collect()
else:
    print("Stage 1 DataFrame is empty. Cannot save.")
    reviews_stage2_df = pd.DataFrame() # Ensure it exists for next phase check
    users_stage1_cleaned_df = pd.DataFrame() # Ensure it exists

## Phase 2: Basic Text & Content Heuristics

### Apply Text Filters

In [None]:
# Filter 2A (URLs/Emails) was moved to the chunk processing loop.
print("Filter 2A (URLs/Emails) was applied during chunk processing.")

In [None]:
# (B) Filter by Repetition (Optional - requires NLTK)
print("\n--- Filter 2B: Text Repetition (Unique Ratio) ---")

SKIP_REPETITION_FILTER = False # Set to True to skip this filter

if SKIP_REPETITION_FILTER:
    print("Skipping Filter 2B (Repetition) as configured.")
elif not reviews_stage2_df.empty:
    try:
        import nltk
        from nltk.corpus import stopwords
        from nltk.tokenize import word_tokenize

        try:
            nltk.data.find('corpora/stopwords')
        except nltk.downloader.DownloadError:
            print("Downloading NLTK stopwords...")
            nltk.download('stopwords')
        try:
            nltk.data.find('tokenizers/punkt')
        except nltk.downloader.DownloadError:
            print("Downloading NLTK punkt tokenizer...")
            nltk.download('punkt')

        stop_words = set(stopwords.words('english'))

        def calculate_unique_ratio(text):
            if pd.isna(text) or not isinstance(text, str) or len(text.strip()) == 0:
                return 1.0 # Assign high ratio if no text
            # Tokenize, lowercase, remove punctuation (simple regex), remove stopwords
            text = text.lower()
            text = re.sub(r'[^\w\s]', '', text) # Keep words and spaces
            tokens = word_tokenize(text)
            words = [word for word in tokens if word.isalpha() and word not in stop_words]
            if not words: # Handle case of no words after filtering
                return 1.0
            unique_words = set(words)
            ratio = len(unique_words) / len(words)
            return ratio

        print("Calculating unique ratio (may take time)...")
        reviews_stage2_df['unique_ratio'] = reviews_stage2_df['text'].apply(calculate_unique_ratio)

        avg_ratio_before = reviews_stage2_df['unique_ratio'].mean()

        MIN_UNIQUE_RATIO = 0.4
        count_before = len(reviews_stage2_df)
        users_before = reviews_stage2_df['user_id'].nunique()

        reviews_stage2_df = reviews_stage2_df[reviews_stage2_df['unique_ratio'] >= MIN_UNIQUE_RATIO]

        count_after = len(reviews_stage2_df)
        users_after = reviews_stage2_df['user_id'].nunique()
        avg_ratio_after = reviews_stage2_df['unique_ratio'].mean() if not reviews_stage2_df.empty else 0

        print(f"Reviews removed with unique ratio < {MIN_UNIQUE_RATIO}: {count_before - count_after}")
        print(f"Users removed: {users_before - users_after}")
        print(f"Avg unique ratio before: {avg_ratio_before:.3f}, after: {avg_ratio_after:.3f}")

        # Drop temporary column
        reviews_stage2_df.drop(columns=['unique_ratio'], inplace=True)
        gc.collect()

    except ImportError:
        print("Skipping Filter 2B (Repetition) because NLTK is not installed.")
        print("Please install NLTK (`pip install nltk`) and run the cell again if needed.")
else:
     print("Skipping Filter 2B: Input DataFrame is empty.")

### Stage 2 Evaluation & Saving

In [None]:
# Calculate final metrics after Stage 2 filters
print("\n--- Stage 2 Final Metrics ---")
if not reviews_stage2_df.empty:
    final_review_count_s2 = len(reviews_stage2_df)
    final_user_ids_s2 = reviews_stage2_df['user_id'].unique()
    final_user_count_s2 = len(final_user_ids_s2)

    print(f"Final Review Count (Stage 2): {final_review_count_s2}")
    print(f"Final Unique User Count (Stage 2): {final_user_count_s2}")

    print("\n--- Comparison: Stage 1 vs Stage 2 ---")
    print(f"Reviews: {final_review_count_s1} -> {final_review_count_s2} - Removed in Stage 2: {final_review_count_s1 - final_review_count_s2}")
    print(f"Users: {final_user_count_s1} -> {final_user_count_s2} - Removed in Stage 2: {final_user_count_s1 - final_user_count_s2}")
else:
    print("Stage 2 DataFrame is empty. No metrics to calculate.")
    final_review_count_s2 = 0
    final_user_count_s2 = 0

In [None]:
# Qualitative Check (Stage 2)
print("\n--- Qualitative Check (Stage 2) ---")
if not reviews_stage2_df.empty:
    kept_reviews_s2 = reviews_stage2_df
    # To find removed reviews, compare Stage 1 saved file vs current df
    try:
        temp_reviews_s1_df = pd.read_json(os.path.join(output_dir, 'reviews_stage1_cleaned.json'), lines=True)
        removed_review_ids_s2 = temp_reviews_s1_df[~temp_reviews_s1_df['review_id'].isin(kept_reviews_s2['review_id'])]['review_id']
        removed_reviews_s2 = temp_reviews_s1_df[temp_reviews_s1_df['review_id'].isin(removed_review_ids_s2)]
        print(f"Total kept: {len(kept_reviews_s2)}, Total removed in S2: {len(removed_reviews_s2)}")
        if len(removed_reviews_s2) >= 10:
            print("\nSample Removed Reviews (by Stage 2 Filters):")
            print(removed_reviews_s2.sample(min(10, len(removed_reviews_s2)))[['review_id', 'user_id', 'stars_review', 'text']])
        else:
            print("\nNot enough reviews removed in Stage 2 to sample.")
        del temp_reviews_s1_df, removed_reviews_s2 # cleanup
        gc.collect()
    except FileNotFoundError:
        print("Could not perform removed qualitative check: Stage 1 file not found.")
    except Exception as e:
         print(f"Error during Stage 2 qualitative check: {e}")
else:
    print("Stage 2 DataFrame is empty. No qualitative check possible.")

In [None]:
# Save Cleaned Data (Stage 2)
print("\n--- Saving Stage 2 Data ---")
if not reviews_stage2_df.empty:
    # Select columns for output
    reviews_stage2_cleaned_df = reviews_stage2_df[review_cols_to_keep_output]

    output_reviews_s2_path = os.path.join(output_dir, 'reviews_stage2_cleaned.json')
    reviews_stage2_cleaned_df.to_json(output_reviews_s2_path, orient='records', lines=True, date_format='iso')
    print(f"Saved Stage 2 cleaned reviews ({len(reviews_stage2_cleaned_df)}) to: {output_reviews_s2_path}")

    # Filter Stage 1 users based on remaining user_ids in Stage 2 reviews
    remaining_user_ids_s2 = reviews_stage2_cleaned_df['user_id'].unique()
    # Use the users_stage1_cleaned_df saved previously
    if not users_stage1_cleaned_df.empty:
        users_stage2_cleaned_df = users_stage1_cleaned_df[users_stage1_cleaned_df['user_id'].isin(remaining_user_ids_s2)].copy()
        output_users_s2_path = os.path.join(output_dir, 'users_stage2_cleaned.json')
        users_stage2_cleaned_df.to_json(output_users_s2_path, orient='records', lines=True, date_format='iso')
        print(f"Saved Stage 2 cleaned users ({len(users_stage2_cleaned_df)}) to: {output_users_s2_path}")
    else:
        print("Warning: Stage 1 users DataFrame was empty, cannot save Stage 2 users.")
        users_stage2_cleaned_df = pd.DataFrame() # Ensure exists

    # Clear Stage 2 DataFrames from memory
    del reviews_stage2_df
    del reviews_stage2_cleaned_df
    del users_stage1_cleaned_df # No longer needed
    gc.collect()
else:
    print("Stage 2 DataFrame is empty. Cannot save.")
    users_stage2_cleaned_df = pd.DataFrame() # Ensure exists for next phase

## Phase 3: Advanced Filtering (Isolation Forest)

In [None]:
# Load Stage 2 Data
print("--- Loading Stage 2 Data for Phase 3 ---")
reviews_stage3_input_df = pd.DataFrame()
users_stage3_input_df = pd.DataFrame()
try:
    reviews_stage3_input_path = os.path.join(output_dir, 'reviews_stage2_cleaned.json')
    reviews_stage3_input_df = pd.read_json(reviews_stage3_input_path, lines=True)
    reviews_stage3_input_df['date'] = pd.to_datetime(reviews_stage3_input_df['date'], errors='coerce') # Ensure date is datetime
    print(f"Loaded Stage 2 reviews: {reviews_stage3_input_df.shape}")
except FileNotFoundError:
    print(f"Error: Stage 2 review file not found at {reviews_stage3_input_path}")
except Exception as e:
    print(f"Error loading Stage 2 reviews: {e}")

try:
    users_stage3_input_path = os.path.join(output_dir, 'users_stage2_cleaned.json')
    users_stage3_input_df = pd.read_json(users_stage3_input_path, lines=True)
    print(f"Loaded Stage 2 users: {users_stage3_input_df.shape}")
except FileNotFoundError:
    print(f"Error: Stage 2 user file not found at {users_stage3_input_path}")
except Exception as e:
    print(f"Error loading Stage 2 users: {e}")

In [None]:
# Prepare Features for Isolation Forest
print("\nPreparing features for Isolation Forest...")
features_df = pd.DataFrame()
reviews_stage3_merged_df = pd.DataFrame()

if not reviews_stage3_input_df.empty and not users_stage3_input_df.empty:
    # Merge user data back into reviews
    reviews_stage3_merged_df = pd.merge(reviews_stage3_input_df, users_stage3_input_df, on='user_id', how='inner', suffixes=('_review', '_user'))
    print(f"Merged Stage 2 reviews and users: {reviews_stage3_merged_df.shape}")

    # Recalculate user_avg_deviation based on current (Stage 2) reviews
    print("  Recalculating business/user average ratings...")
    # 1. Calculate business avg stars from Stage 2 reviews
    business_avg_stars_s2 = reviews_stage3_merged_df.groupby('business_id')['stars_review'].mean().reset_index(name='business_avg_rating_s2')
    # 2. Merge into the merged df
    reviews_stage3_merged_df = pd.merge(reviews_stage3_merged_df, business_avg_stars_s2, on='business_id', how='left')
    # 3. Calculate review deviation
    reviews_stage3_merged_df['rating_deviation_s2'] = (reviews_stage3_merged_df['stars_review'] - reviews_stage3_merged_df['business_avg_rating_s2']).abs()
    # 4. Calculate user average deviation
    user_avg_deviation_s2 = reviews_stage3_merged_df.groupby('user_id')['rating_deviation_s2'].mean().reset_index(name='user_avg_deviation_s2')
    # 5. Merge user avg deviation back
    reviews_stage3_merged_df = pd.merge(reviews_stage3_merged_df, user_avg_deviation_s2, on='user_id', how='left')

    # Add other features from user data
    print("  Adding other features...")
    reviews_stage3_merged_df['text_length'] = reviews_stage3_merged_df['text'].astype(str).str.len()
    compliment_cols_s3 = [col for col in reviews_stage3_merged_df.columns if col.startswith('compliment_') and col != 'compliment_count'] # Exclude tip compliment if present
    reviews_stage3_merged_df['user_compliments_total'] = reviews_stage3_merged_df[compliment_cols_s3].sum(axis=1)
    reviews_stage3_merged_df['user_friends_count'] = reviews_stage3_merged_df['friends'].apply(count_friends) # Use function defined earlier

    # Select features for the model
    feature_cols = [
        'text_length',
        'useful_review',
        'stars_review',
        'review_count', # User's total review count (from users_df)
        'user_compliments_total',
        'user_friends_count',
        'user_avg_deviation_s2',
        'average_stars' # User's average rating (from users_df)
    ]

    # Ensure all feature columns exist
    missing_cols = [col for col in feature_cols if col not in reviews_stage3_merged_df.columns]
    if missing_cols:
        print(f"Warning: Missing required feature columns: {missing_cols}. Isolation Forest cannot run.")
    else:
        features_df = reviews_stage3_merged_df[feature_cols].copy()

        # Handle NaNs (e.g., fill with median or mean)
        print("  Handling NaNs in features...")
        for col in feature_cols:
            if features_df[col].isnull().any():
                median_val = features_df[col].median()
                features_df[col].fillna(median_val, inplace=True)
                print(f"    Filled NaNs in '{col}' with median ({median_val:.2f})")

        print("Feature preparation complete.")
else:
    print("Skipping feature preparation: Input DataFrames from Stage 2 are empty or failed to load.")

In [None]:
# Train & Predict with Isolation Forest
reviews_stage3_filtered_df = pd.DataFrame() # Initialize

if not features_df.empty and not reviews_stage3_merged_df.empty:
    print("\nTraining Isolation Forest and predicting anomalies...")
    try:
        # Initialize model
        # contamination='auto' lets the algorithm estimate the proportion of outliers
        # Or set explicitly, e.g., contamination=0.01 for 1%
        iso_forest = IsolationForest(contamination='auto', random_state=42, n_jobs=-1) # Use all available cores

        # Fit and predict
        # fit_predict returns 1 for inliers, -1 for outliers
        reviews_stage3_merged_df['anomaly_score'] = iso_forest.fit_predict(features_df)

        n_outliers = (reviews_stage3_merged_df['anomaly_score'] == -1).sum()
        print(f"Identified {n_outliers} potential outliers (anomaly_score = -1).")

        # Filter out anomalies
        count_before = len(reviews_stage3_merged_df)
        users_before = reviews_stage3_merged_df['user_id'].nunique()

        reviews_stage3_filtered_df = reviews_stage3_merged_df[reviews_stage3_merged_df['anomaly_score'] == 1].copy()

        count_after = len(reviews_stage3_filtered_df)
        users_after = reviews_stage3_filtered_df['user_id'].nunique()

        print(f"Reviews removed by Isolation Forest: {count_before - count_after}")
        print(f"Users removed: {users_before - users_after}")

        # Optional: Examine features of outliers vs inliers
        outliers_df = reviews_stage3_merged_df[reviews_stage3_merged_df['anomaly_score'] == -1]
        if not outliers_df.empty and not reviews_stage3_filtered_df.empty:
             print("\nFeature Averages (Outliers vs Inliers):")
             print(pd.DataFrame({'Outliers': outliers_df[feature_cols].mean(), 'Inliers': reviews_stage3_filtered_df[feature_cols].mean()}))
        del outliers_df
        gc.collect()

    except Exception as e:
        print(f"Error during Isolation Forest execution: {e}")
        # Keep all reviews if IF fails
        reviews_stage3_filtered_df = reviews_stage3_merged_df.copy()
        if 'anomaly_score' in reviews_stage3_filtered_df.columns:
             reviews_stage3_filtered_df.drop(columns=['anomaly_score'], inplace=True)
else:
    print("Skipping Isolation Forest: No features prepared or input data empty.")
    # If IF was skipped, the 'filtered' df is the 'merged' df from before
    if not reviews_stage3_merged_df.empty:
        reviews_stage3_filtered_df = reviews_stage3_merged_df.copy()
    else:
        reviews_stage3_filtered_df = pd.DataFrame() # Ensure it's an empty df

### Stage 3 Evaluation & Saving (Final)

In [None]:
# Final Evaluation after Stage 3
print("\n--- Stage 3 Final Metrics (Overall Final) ---")
if not reviews_stage3_filtered_df.empty:
    final_review_count_s3 = len(reviews_stage3_filtered_df)
    final_user_ids_s3 = reviews_stage3_filtered_df['user_id'].unique()
    final_user_count_s3 = len(final_user_ids_s3)
    final_business_ids_s3 = reviews_stage3_filtered_df['business_id'].unique()
    final_business_count_s3 = len(final_business_ids_s3)

    print(f"Final Review Count (End of Stage 3): {final_review_count_s3}")
    print(f"Final Unique User Count (End of Stage 3): {final_user_count_s3}")
    print(f"Final Unique Business Count (End of Stage 3): {final_business_count_s3}")

    print("\n--- Comparison: Stage 2 vs Stage 3 ---")
    print(f"Reviews: {final_review_count_s2} -> {final_review_count_s3} - Removed in Stage 3: {final_review_count_s2 - final_review_count_s3}")
    print(f"Users: {final_user_count_s2} -> {final_user_count_s3} - Removed in Stage 3: {final_user_count_s2 - final_user_count_s3}")
else:
    print("Stage 3 DataFrame is empty. No final metrics.")
    final_review_count_s3 = 0
    final_user_count_s3 = 0
    final_business_count_s3 = 0

In [None]:
# Save Final Cleaned Data
print("\n--- Saving Final Cleaned Data ---")

if not reviews_stage3_filtered_df.empty:
    # Final Reviews
    reviews_final_cleaned_df = reviews_stage3_filtered_df[review_cols_to_keep_output].copy() # Select original review columns
    output_reviews_final_path = os.path.join(output_dir, 'reviews_final_cleaned.json')
    reviews_final_cleaned_df.to_json(output_reviews_final_path, orient='records', lines=True, date_format='iso')
    print(f"Saved final cleaned reviews ({len(reviews_final_cleaned_df)}) to: {output_reviews_final_path}")

    # Final Users (Filter the users loaded from Stage 2)
    if not users_stage3_input_df.empty:
        users_final_cleaned_df = users_stage3_input_df[users_stage3_input_df['user_id'].isin(final_user_ids_s3)].copy()
        output_users_final_path = os.path.join(output_dir, 'users_final_cleaned.json')
        users_final_cleaned_df.to_json(output_users_final_path, orient='records', lines=True, date_format='iso')
        print(f"Saved final cleaned users ({len(users_final_cleaned_df)}) to: {output_users_final_path}")
    else:
        print("Warning: Stage 2 users input was empty, cannot save final users.")
        users_final_cleaned_df = pd.DataFrame()

    # Final Businesses (Filter the original business_df)
    if not business_df.empty:
        business_final_cleaned_df = business_df[business_df['business_id'].isin(final_business_ids_s3)].copy()
        output_business_final_path = os.path.join(output_dir, 'business_final_cleaned.json')
        business_final_cleaned_df.to_json(output_business_final_path, orient='records', lines=True, date_format='iso')
        print(f"Saved final cleaned businesses ({len(business_final_cleaned_df)}) to: {output_business_final_path}")
    else:
        print("Warning: Original business_df was empty, cannot save final businesses.")
        business_final_cleaned_df = pd.DataFrame()

    # Final Memory Cleanup
    del reviews_stage3_input_df
    del users_stage3_input_df
    del reviews_stage3_merged_df
    del reviews_stage3_filtered_df
    del features_df
    del reviews_final_cleaned_df
    del users_final_cleaned_df
    del business_final_cleaned_df
    del users_df # Original users no longer needed
    del business_df # Original business no longer needed
    gc.collect()
else:
    print("Stage 3 DataFrame is empty. Cannot save final files.")

## Phase 4: Post-Cleaning Summary

**Summary of Cleaning Process (Chunking Refactor):**

*   **Initial Data:** Started with Y users, Z businesses (Review count processed in chunks).
*   **Chunk Processing:** Reviews loaded in chunks. Merged with users, feature engineered, and filtered by text length (A), low usefulness (F), and URLs/Emails (2A) within each chunk.
*   **Combined Data (Baseline):** `baseline_review_count` reviews and `baseline_user_count` users remaining after combining chunks.
*   **After Stage 1 (User Aggregation Rules):** Reduced to `final_review_count_s1` reviews and `final_user_count_s1` users.
    *   Removed users/reviews based on burst rate (B), short activity windows (C), high rating deviation (D, non-elite), and low engagement (E).
*   **After Stage 2 (Text Heuristics):** Reduced to `final_review_count_s2` reviews and `final_user_count_s2` users.
    *   Removed reviews based on text repetition (B, optional/if NLTK present).
*   **After Stage 3 (Isolation Forest):** Reduced to `final_review_count_s3` reviews and `final_user_count_s3` users.
    *   Removed reviews identified as anomalies based on selected features.

**Final Output:**

*   `reviews_final_cleaned.json`: Contains `final_review_count_s3` cleaned reviews.
*   `users_final_cleaned.json`: Contains `final_user_count_s3` users corresponding to the final reviews.
*   `business_final_cleaned.json`: Contains `final_business_count_s3` businesses corresponding to the final reviews.

These files are located in the `yelp_dataset/cleaned_data/` directory.

In [None]:
print("\n--- Cleaning Process Complete ---")
print(f"Final review count: {final_review_count_s3 if 'final_review_count_s3' in locals() else 'N/A'}")
print(f"Final user count: {final_user_count_s3 if 'final_user_count_s3' in locals() else 'N/A'}")
print(f"Final business count: {final_business_count_s3 if 'final_business_count_s3' in locals() else 'N/A'}")
print(f"Cleaned files saved in: {output_dir}")