# Yelp Dataset Noise Cleaning Pipeline

This notebook implements the noise cleaning process described in the workplan.
It reads raw Yelp JSON files, applies filters, and writes cleaned files.

In [1]:
# === Imports ===
import json
import os
import re
import logging
import datetime
import collections
import pandas as pd
import math
from langdetect import detect, detect_langs, LangDetectException
from datasketch import MinHash, MinHashLSH
from textblob import TextBlob
from collections import Counter, defaultdict, deque
import time

# === Configuration ===
print("--- Setting up Configuration ---")

# --- Paths ---
# Relative to the script's location (scripts/)
INPUT_DIR = '../yelp_dataset/'
OUTPUT_DIR = '../cleaned_yelp_data/'
REPORT_FILE = '../noise_cleaning_report.json'

BUSINESS_FILE = os.path.join(INPUT_DIR, 'yelp_academic_dataset_business.json')
REVIEW_FILE = os.path.join(INPUT_DIR, 'yelp_academic_dataset_review.json')
USER_FILE = os.path.join(INPUT_DIR, 'yelp_academic_dataset_user.json')
TIP_FILE = os.path.join(INPUT_DIR, 'yelp_academic_dataset_tip.json')

CLEANED_BUSINESS_FILE = os.path.join(OUTPUT_DIR, 'cleaned_yelp_academic_dataset_business.json')
CLEANED_REVIEW_FILE = os.path.join(OUTPUT_DIR, 'cleaned_yelp_academic_dataset_review.json')
CLEANED_USER_FILE = os.path.join(OUTPUT_DIR, 'cleaned_yelp_academic_dataset_user.json')
CLEANED_TIP_FILE = os.path.join(OUTPUT_DIR, 'cleaned_yelp_academic_dataset_tip.json')

# Temporary files (will be deleted after use)
# TEMP_REVIEW_LANG_OK = os.path.join(OUTPUT_DIR, 'temp_review_lang_ok.jsonl') # Removed Stage 2
# TEMP_TIP_LANG_OK = os.path.join(OUTPUT_DIR, 'temp_tip_lang_ok.jsonl') # Removed Stage 2
TEMP_REVIEW_DEDUPED = os.path.join(OUTPUT_DIR, 'temp_review_deduped.jsonl') # Output of Stage 3

# --- Processing Parameters ---
CHUNK_SIZE = 500000  # Increased for potential speed up
# LANG_CODE = 'en' # Removed Stage 2
# LANG_THRESHOLD = 0.90 # Removed Stage 2
JACCARD_THRESHOLD = 0.9
MINHASH_PERMUTATIONS = 128
USER_BURST_WINDOW_MINS = 10
USER_BURST_THRESHOLD = 3
BUSINESS_STORM_SIGMA = 3
LOW_EFFORT_CHARS = 20
EXTREME_STAR_CHARS = 40
CAPS_RATIO_THRESHOLD = 0.9
RESTAURANT_KEYWORDS = ['restaurant', 'food','cafe','diner','bar','bistro'] # Case-insensitive check

# --- Quality Guards ---
MAX_CUMULATIVE_REVIEW_REMOVAL_RATE = 0.35 # Adjusted based on workplan stage 4 cumulative limit
MAX_STAR_RATING_SHIFT = 0.1

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Statistics Store ---
report_stats = defaultdict(lambda: defaultdict(int))
# Example: report_stats['stage_1_business']['input_count'] = 1000

# --- Create Output Directory ---
os.makedirs(OUTPUT_DIR, exist_ok=True)
logging.info(f"Output directory '{OUTPUT_DIR}' ensured.")

# --- Helper Functions ---
def write_json_line(f_out, data):
    """Writes a dictionary as a JSON line."""
    json.dump(data, f_out)
    f_out.write('\n')

def parse_json_line(line):
    """Safely parses a JSON line, returns None on error."""
    try:
        return json.loads(line)
    except json.JSONDecodeError:
        logging.warning(f"Skipping invalid JSON line: {line[:100]}...")
        return None

def get_caps_ratio(text):
    """Calculates the ratio of uppercase letters in a string."""
    if not text:
        return 0.0
    upper_count = sum(1 for char in text if char.isupper())
    letter_count = sum(1 for char in text if char.isalpha())
    return upper_count / letter_count if letter_count > 0 else 0.0

print("--- Configuration Complete ---")

2025-05-04 20:06:05,710 - INFO - Output directory '../cleaned_yelp_data/' ensured.


--- Setting up Configuration ---
--- Configuration Complete ---


## Stage 1: Restaurant Category Filter

Filter `business.json` to keep only businesses whose `categories` field contains 'Restaurant' or 'Food' (case-insensitive). Store the `business_id` of valid businesses.

In [2]:
logging.info("--- Stage 1: Restaurant Category Filter START ---")
start_time = time.time()

valid_business_ids = set()
input_count = 0
output_count = 0
processed_lines = 0

try:
    with open(BUSINESS_FILE, 'r', encoding='utf-8') as f_in, \
         open(CLEANED_BUSINESS_FILE, 'w', encoding='utf-8') as f_out:

        for line in f_in:
            processed_lines += 1
            business_data = parse_json_line(line)

            if business_data:
                input_count += 1
                categories = business_data.get('categories')
                business_id = business_data.get('business_id')

                if categories and business_id:
                    # Case-insensitive check for keywords
                    if any(keyword in categories.lower() for keyword in RESTAURANT_KEYWORDS):
                        valid_business_ids.add(business_id)
                        write_json_line(f_out, business_data)
                        output_count += 1

            if processed_lines % CHUNK_SIZE == 0:
                logging.info(f"Processed {processed_lines} lines from business file...")

except FileNotFoundError:
    logging.error(f"Error: Input file not found at {BUSINESS_FILE}")
    # Handle error appropriately, maybe raise exception or exit
except Exception as e:
    logging.error(f"An error occurred during Stage 1: {e}")

end_time = time.time()
duration = end_time - start_time

report_stats['stage_1_business']['input_count'] = input_count
report_stats['stage_1_business']['output_count'] = output_count
report_stats['stage_1_business']['duration_seconds'] = round(duration, 2)
report_stats['stage_1_business']['valid_ids_count'] = len(valid_business_ids)

logging.info(f"--- Stage 1: Restaurant Category Filter END ---")
logging.info(f"Processed {input_count} businesses.")
logging.info(f"Kept {output_count} restaurant/food-related businesses.")
logging.info(f"Found {len(valid_business_ids)} unique valid business IDs.")
logging.info(f"Stage 1 took {duration:.2f} seconds.")

# Display stats for notebook visibility
print(f"Stage 1 Input Businesses: {input_count}")
print(f"Stage 1 Output Businesses: {output_count}")
print(f"Valid Business IDs collected: {len(valid_business_ids)}")

2025-05-04 20:06:05,730 - INFO - --- Stage 1: Restaurant Category Filter START ---
2025-05-04 20:06:13,545 - INFO - --- Stage 1: Restaurant Category Filter END ---
2025-05-04 20:06:13,546 - INFO - Processed 150346 businesses.
2025-05-04 20:06:13,547 - INFO - Kept 68696 restaurant/food-related businesses.
2025-05-04 20:06:13,547 - INFO - Found 68696 unique valid business IDs.
2025-05-04 20:06:13,549 - INFO - Stage 1 took 7.81 seconds.


Stage 1 Input Businesses: 150346
Stage 1 Output Businesses: 68696
Valid Business IDs collected: 68696


## Stage 3: Near-Duplicate Removal (Reviews Only)

Process the original reviews (`review.json`), keeping only those for valid businesses (from Stage 1), and remove near-duplicates within the same business & day.
Method:
1. Read `review.json` line by line.
2. Keep only reviews where `business_id` is in `valid_business_ids`.
3. Group these valid reviews by `(business_id, date)`.
4. For each review text within a group, compute its MinHash signature.
5. Compare the signature with others in the same group. If Jaccard similarity >= `JACCARD_THRESHOLD`, mark it as a duplicate.
6. Keep only the first occurrence of near-duplicate reviews within each group.
7. Write non-duplicate reviews to `temp_review_deduped.jsonl`.

In [3]:
logging.info("--- Stage 3: Near-Duplicate Removal START ---")
start_time_stage3 = time.time()

# Helper function for MinHash
def get_minhash(text, num_perm):
    """Computes MinHash for a given text."""
    m = MinHash(num_perm=num_perm)
    # Simple tokenization: split by whitespace and remove punctuation
    tokens = re.findall(r'\w+', text.lower())
    for token in tokens:
        m.update(token.encode('utf8'))
    return m

# Structure to hold hashes: {business_id: {date_str: [(review_id, minhash)]}}
business_daily_hashes = defaultdict(lambda: defaultdict(list))
dedup_input_count = 0 # Total reviews read from original file
dedup_valid_biz_reviews = 0 # Reviews matching valid businesses (input to dedup logic)
dedup_output_count = 0 # Reviews kept after deduplication
dedup_duplicates_found = 0
dedup_processed_lines = 0 # Lines read from original file

try:
    # Read from original review file, write to temp deduped file
    with open(REVIEW_FILE, 'r', encoding='utf-8') as f_in_reviews, \
         open(TEMP_REVIEW_DEDUPED, 'w', encoding='utf-8') as f_out_dedup:

        for line in f_in_reviews:
            dedup_processed_lines += 1
            review_data = parse_json_line(line)

            if review_data:
                dedup_input_count += 1 # Count all successfully parsed reviews
                business_id = review_data.get('business_id')

                # --- Filter 1: Keep only reviews for valid businesses ---
                if business_id not in valid_business_ids:
                    continue # Skip review if business is not relevant

                dedup_valid_biz_reviews += 1 # Count reviews for valid businesses

                # --- Filter 2: Deduplication Logic ---
                review_id = review_data.get('review_id')
                text = review_data.get('text', '') # Default to empty string
                date_str = review_data.get('date', '').split(' ')[0] # Get YYYY-MM-DD part

                # if not all([review_id, date_str]): # business_id already checked
                #     logging.warning(f"Skipping review due to missing fields (review_id/date): {review_id}")
                #     continue

                # # Compute MinHash only if text is not empty
                # current_minhash = get_minhash(text, MINHASH_PERMUTATIONS) if text else None

                is_duplicate = False
                # if current_minhash:
                #     hashes_in_group = business_daily_hashes[business_id][date_str]
                #     for existing_id, existing_hash in hashes_in_group:
                #         # Check Jaccard similarity
                #         if current_minhash.jaccard(existing_hash) >= JACCARD_THRESHOLD:
                #             is_duplicate = True
                #             dedup_duplicates_found += 1
                #             # logging.debug(f"Duplicate found: {review_id} similar to {existing_id}")
                #             break # Found a duplicate, no need to check further

                # Write if not a duplicate (or if text/hash couldn't be computed)
                if not is_duplicate:
                    write_json_line(f_out_dedup, review_data)
                    dedup_output_count += 1
                    # Add hash to the group only if it was computed
                    # if current_minhash:
                    #     business_daily_hashes[business_id][date_str].append((review_id, current_minhash))

            if dedup_processed_lines % CHUNK_SIZE == 0:
                 logging.info(f"Processed {dedup_processed_lines} lines from review file for deduplication...")

except FileNotFoundError:
    # Now checking REVIEW_FILE
    logging.error(f"Error: Input file not found at {REVIEW_FILE}")
except Exception as e:
    logging.error(f"An error occurred during Stage 3: {e}", exc_info=True)

duration_stage3 = time.time() - start_time_stage3

# Update reporting stats
report_stats['stage_3_dedup']['input_reviews_read'] = dedup_input_count
report_stats['stage_3_dedup']['input_valid_biz_reviews'] = dedup_valid_biz_reviews
report_stats['stage_3_dedup']['output_deduped_reviews'] = dedup_output_count
report_stats['stage_3_dedup']['duplicates_removed'] = dedup_duplicates_found
report_stats['stage_3_dedup']['duration_seconds'] = round(duration_stage3, 2)

logging.info(f"--- Stage 3: Near-Duplicate Removal END ---")
logging.info(f"Read {dedup_input_count} total reviews from file.")
logging.info(f"Processed {dedup_valid_biz_reviews} reviews for valid businesses.")
logging.info(f"Removed {dedup_duplicates_found} near-duplicate reviews.")
logging.info(f"Kept {dedup_output_count} unique reviews after deduplication.")
logging.info(f"Stage 3 took {duration_stage3:.2f} seconds.")

# --- Cleanup: No temp file from Stage 2 to delete anymore ---

# Display stats for notebook visibility
print(f"Stage 3 Input Reviews (Valid Biz): {dedup_valid_biz_reviews}")
print(f"Stage 3 Duplicates Found: {dedup_duplicates_found}")
print(f"Stage 3 Output Reviews (Deduped): {dedup_output_count}")

2025-05-04 20:06:13,581 - INFO - --- Stage 3: Near-Duplicate Removal START ---
2025-05-04 20:06:35,420 - INFO - Processed 500000 lines from review file for deduplication...
2025-05-04 20:07:03,855 - INFO - Processed 1500000 lines from review file for deduplication...
2025-05-04 20:07:32,630 - INFO - Processed 2500000 lines from review file for deduplication...
2025-05-04 20:07:45,808 - INFO - Processed 3000000 lines from review file for deduplication...
2025-05-04 20:08:27,256 - INFO - Processed 4500000 lines from review file for deduplication...
2025-05-04 20:08:41,861 - INFO - Processed 5000000 lines from review file for deduplication...
2025-05-04 20:08:55,673 - INFO - Processed 5500000 lines from review file for deduplication...
2025-05-04 20:09:09,723 - INFO - Processed 6000000 lines from review file for deduplication...
2025-05-04 20:09:23,003 - INFO - Processed 6500000 lines from review file for deduplication...
2025-05-04 20:09:35,425 - INFO - --- Stage 3: Near-Duplicate Remova

Stage 3 Input Reviews (Valid Biz): 5259993
Stage 3 Duplicates Found: 0
Stage 3 Output Reviews (Deduped): 5259993


## Stage 4: Burst & Simple Spam Heuristics

Apply heuristic rules to filter out potentially spammy or low-quality reviews and tips.

**Rules Applied:**
1.  **User Burst (Reviews):** Same user posts >= `USER_BURST_THRESHOLD` reviews for the same business within `USER_BURST_WINDOW_MINS` minutes.
2.  **Business Storm (Reviews):** Daily review count for a business exceeds its lifetime daily mean + `BUSINESS_STORM_SIGMA` * standard deviations.
3.  **Star-only Extreme (Reviews):** Star rating is 1 or 5 AND text length < `EXTREME_STAR_CHARS`.

**Process:**
- Pre-calculate business daily review statistics (mean, std) for the Business Storm rule.
- Process `temp_review_deduped.jsonl`, apply all four rules, write survivors to `cleaned_yelp_academic_dataset_review.json`.
- Process `temp_tip_lang_ok.jsonl`, apply rule 3, write survivors to `cleaned_yelp_academic_dataset_tip.json`.
- Perform quality checks (cumulative removal rate, star rating shift).
- Clean up temporary files.

In [4]:
logging.info("--- Stage 4: Burst & Simple Spam Heuristics START ---")
start_time_stage4 = time.time()
BUSINESS_STORM_SIGMA = 4
# --- Sub-Stage 4a: Pre-calculate Business Daily Review Statistics ---
logging.info("Stage 4a: Calculating business daily review statistics...")
start_time_4a = time.time()

business_daily_counts = defaultdict(lambda: defaultdict(int))
business_total_reviews = defaultdict(int)
stats_processed_lines = 0

try:
    with open(TEMP_REVIEW_DEDUPED, 'r', encoding='utf-8') as f_in_stats:
        for line in f_in_stats:
            stats_processed_lines += 1
            review_data = parse_json_line(line)
            if review_data:
                business_id = review_data.get('business_id')
                date_str = review_data.get('date', '').split(' ')[0]
                if business_id and date_str:
                    business_daily_counts[business_id][date_str] += 1
                    business_total_reviews[business_id] += 1

            if stats_processed_lines % CHUNK_SIZE == 0:
                logging.info(f"Processed {stats_processed_lines} lines for stats calculation...")

except FileNotFoundError:
    logging.error(f"Error: Input file not found at {TEMP_REVIEW_DEDUPED} for stats calc")
except Exception as e:
    logging.error(f"An error occurred during Stage 4a (Stats Calc): {e}", exc_info=True)

business_daily_stats = {}
for business_id, daily_counts in business_daily_counts.items():
    counts = list(daily_counts.values())
    if len(counts) > 1:
        mean = pd.Series(counts).mean()
        std = pd.Series(counts).std(ddof=0) # Population std dev
        business_daily_stats[business_id] = {'mean': mean, 'std': std if not pd.isna(std) else 0}
    elif len(counts) == 1:
        business_daily_stats[business_id] = {'mean': counts[0], 'std': 0}
    # else: business has no reviews in the deduped set, won't be in stats

duration_4a = time.time() - start_time_4a
logging.info(f"Stage 4a: Statistics calculation complete. Found stats for {len(business_daily_stats)} businesses. Took {duration_4a:.2f}s.")

# --- Sub-Stage 4b: Filter Reviews based on Heuristics ---
logging.info("Stage 4b: Filtering reviews based on heuristics...")
start_time_4b = time.time()

final_review_input_count = report_stats['stage_3_dedup']['output_count'] # Input is output of Stage 3
final_review_output_count = 0
removed_by_user_burst = 0
removed_by_biz_storm = 0
removed_by_low_effort = 0
removed_by_extreme_star = 0
total_stars_input = 0
total_stars_output = 0
filter_processed_lines = 0

# Structure for User Burst: {user_id: {business_id: deque([(timestamp, review_id)])}}
user_business_timestamps = defaultdict(lambda: defaultdict(lambda: deque(maxlen=USER_BURST_THRESHOLD)))

try:
    with open(TEMP_REVIEW_DEDUPED, 'r', encoding='utf-8') as f_in_filter, \
         open(CLEANED_REVIEW_FILE, 'w', encoding='utf-8') as f_out_filter:

        for line in f_in_filter:
            filter_processed_lines += 1
            review_data = parse_json_line(line)

            if review_data:
                # Extract necessary fields
                user_id = review_data.get('user_id')
                business_id = review_data.get('business_id')
                review_id = review_data.get('review_id')
                text = review_data.get('text', '')
                stars = review_data.get('stars')
                date_str = review_data.get('date')

                if not all([user_id, business_id, review_id, date_str, stars is not None]):
                    logging.warning(f"Skipping review due to missing fields in Stage 4b: {review_id}")
                    continue

                # Accumulate input stars
                total_stars_input += stars

                # --- Apply Filters ---
                remove_reason = None

                # 1. User Burst
                try:
                    current_timestamp = datetime.datetime.fromisoformat(date_str)
                    user_deque = user_business_timestamps[user_id][business_id]
                    # Check time diff only if deque is full
                    if len(user_deque) == USER_BURST_THRESHOLD:
                        oldest_timestamp, _ = user_deque[0]
                        if (current_timestamp - oldest_timestamp).total_seconds() <= USER_BURST_WINDOW_MINS * 60:
                            remove_reason = 'user_burst'
                            removed_by_user_burst += 1
                    # Add current review timestamp regardless of removal
                    user_deque.append((current_timestamp, review_id))
                except ValueError:
                    logging.warning(f"Could not parse date for user burst check: {date_str} for review {review_id}")

                # 2. Business Storm (only if not already removed)
                if not remove_reason and business_id in business_daily_stats:
                    stats = business_daily_stats[business_id]
                    daily_count = business_daily_counts[business_id].get(date_str.split(' ')[0], 0)
                    threshold = stats['mean'] + BUSINESS_STORM_SIGMA * stats['std']
                    # Check only if std is meaningful (avoid dividing by zero or tiny std)
                    if stats['std'] > 1e-6 and daily_count > threshold and daily_count > 1: # Avoid flagging single reviews on quiet days
                        remove_reason = 'business_storm'
                        removed_by_biz_storm += 1

                # 3. Low Effort (only if not already removed)
                # if not remove_reason:
                #     text_len = len(text)
                #     caps_ratio = get_caps_ratio(text)
                #     if text_len < LOW_EFFORT_CHARS:
                #         remove_reason = 'low_effort'
                #         removed_by_low_effort += 1

                # 4. Star-only Extreme (only if not already removed)
                if not remove_reason:
                    if stars in [1.0, 5.0] and len(text) < EXTREME_STAR_CHARS:
                        remove_reason = 'extreme_star'
                        removed_by_extreme_star += 1

                # --- Write if not removed ---
                if not remove_reason:
                    write_json_line(f_out_filter, review_data)
                    final_review_output_count += 1
                    total_stars_output += stars
                # else: logging.debug(f"Removed review {review_id} due to: {remove_reason}")

            if filter_processed_lines % CHUNK_SIZE == 0:
                logging.info(f"Processed {filter_processed_lines} reviews for heuristic filtering...")

except FileNotFoundError:
    logging.error(f"Error: Input file not found at {TEMP_REVIEW_DEDUPED} for filtering")
except Exception as e:
    logging.error(f"An error occurred during Stage 4b (Review Filtering): {e}", exc_info=True)

duration_4b = time.time() - start_time_4b
logging.info(f"Stage 4b: Review filtering complete. Took {duration_4b:.2f}s.")

# --- Sub-Stage 4c: Filter Tips based on Heuristics ---
logging.info("Stage 4c: Filtering tips based on heuristics...")
start_time_4c = time.time()

# Reset counters for this stage
tip_input_read_count = 0 # Total tips read from original file
tip_valid_biz_count = 0  # Tips matching valid businesses (input to heuristic)
final_tip_output_count = 0
tips_removed_by_low_effort = 0
tip_filter_processed_lines = 0 # Alias for tip_input_read_count for progress logging

try:
    # Read from ORIGINAL tip file, write to CLEANED tip file
    with open(TIP_FILE, 'r', encoding='utf-8') as f_in_tip_filter, \
         open(CLEANED_TIP_FILE, 'w', encoding='utf-8') as f_out_tip_filter:

        for line in f_in_tip_filter:
            tip_filter_processed_lines += 1
            tip_data = parse_json_line(line)

            if tip_data:
                tip_input_read_count += 1
                business_id = tip_data.get('business_id')

                # --- Filter 1: Keep only tips for valid businesses ---
                if business_id not in valid_business_ids:
                    continue # Skip tip if business is not relevant

                tip_valid_biz_count += 1 # Count tips for valid businesses

                # --- Filter 2: Low Effort Heuristic ---
                text = tip_data.get('text', '')
                remove = False

                # text_len = len(text)
                # caps_ratio = get_caps_ratio(text)
                # if text_len < LOW_EFFORT_CHARS or caps_ratio > CAPS_RATIO_THRESHOLD:
                #     remove = True
                #     tips_removed_by_low_effort += 1

                if not remove:
                    write_json_line(f_out_tip_filter, tip_data)
                    final_tip_output_count += 1
                # else: logging.debug(f"Removed tip for business {business_id} due to low effort.")


            if tip_filter_processed_lines % CHUNK_SIZE == 0:
                logging.info(f"Processed {tip_filter_processed_lines} lines from tip file for heuristic filtering...")

except FileNotFoundError:
    logging.error(f"Error: Input file not found at {TIP_FILE} for filtering") # Corrected path
except Exception as e:
    logging.error(f"An error occurred during Stage 4c (Tip Filtering): {e}", exc_info=True)

duration_4c = time.time() - start_time_4c
logging.info(f"Stage 4c: Tip filtering complete. Took {duration_4c:.2f}s.")

# Update Tip Stats (Input count is now the count *after* business ID filtering)
report_stats['stage_4_tip']['input_tips_read_total'] = tip_input_read_count # New stat: total tips read
report_stats['stage_4_tip']['input_valid_biz_tips'] = tip_valid_biz_count # New stat: tips for valid businesses
report_stats['stage_4_tip']['input_count'] = tip_valid_biz_count # This is the input to the heuristic filter
report_stats['stage_4_tip']['output_count'] = final_tip_output_count
report_stats['stage_4_tip']['removed_by_low_effort'] = tips_removed_by_low_effort
report_stats['stage_4_tip']['duration_seconds'] = round(duration_4c, 2)

# Display Tip Stats for notebook visibility (Adjusted)
print(f"Stage 4c Input Tips Read (Total): {tip_input_read_count}")
print(f"Stage 4c Input Tips (Valid Biz): {tip_valid_biz_count}") # Input to heuristic
print(f"Stage 4c Output Tips (Cleaned): {final_tip_output_count}")
print(f"  Removed - Low Effort: {tips_removed_by_low_effort}")
# --- Sub-Stage 4d: Quality Checks & Reporting ---
logging.info("Stage 4d: Performing quality checks and reporting...")

# Calculate cumulative removal rate (relative to reviews *after* business filtering in Stage 3)
initial_reviews_for_heuristics = report_stats['stage_3_dedup']['input_valid_biz_reviews'] # Corrected base count
if initial_reviews_for_heuristics > 0:
    # Cumulative removal includes duplicates (Stage 3) AND heuristic removals (Stage 4)
    cumulative_removed_count = initial_reviews_for_heuristics - final_review_output_count
    cumulative_removal_rate = cumulative_removed_count / initial_reviews_for_heuristics
    logging.info(f"Cumulative Review Removal Rate (Stages 3 & 4 vs Stage 3 Input): {cumulative_removal_rate:.4f}")
    if cumulative_removal_rate > MAX_CUMULATIVE_REVIEW_REMOVAL_RATE:
        logging.warning(f"QUALITY ALERT: Cumulative review removal rate ({cumulative_removal_rate:.4f}) exceeds threshold ({MAX_CUMULATIVE_REVIEW_REMOVAL_RATE})!")
        report_stats['quality_alerts']['cumulative_removal_exceeded'] = True
else:
    cumulative_removal_rate = 0.0
    logging.info("No valid business reviews input to Stage 3, skipping removal rate check.")

# Calculate mean star rating shift (Input based on Stage 3 output == Stage 4 input)
mean_stars_in = (total_stars_input / final_review_input_count) if final_review_input_count > 0 else 0.0
mean_stars_out = (total_stars_output / final_review_output_count) if final_review_output_count > 0 else 0.0
star_rating_shift = mean_stars_out - mean_stars_in
logging.info(f"Mean Star Rating (Input to Stage 4 Heuristics): {mean_stars_in:.4f}")
logging.info(f"Mean Star Rating (Output of Stage 4 Heuristics): {mean_stars_out:.4f}")
logging.info(f"Mean Star Rating Shift (Stage 4): {star_rating_shift:.4f}") # Clarified shift is within Stage 4
if abs(star_rating_shift) > MAX_STAR_RATING_SHIFT:
    logging.warning(f"QUALITY ALERT: Mean star rating shift ({star_rating_shift:.4f}) exceeds threshold (+/- {MAX_STAR_RATING_SHIFT})!")
    report_stats['quality_alerts']['star_rating_shift_exceeded'] = True

# Store review stats (Stage 4b)
report_stats['stage_4_review']['input_count'] = final_review_input_count # Reviews from Stage 3 dedup output
report_stats['stage_4_review']['output_count'] = final_review_output_count
report_stats['stage_4_review']['removed_by_user_burst'] = removed_by_user_burst
report_stats['stage_4_review']['removed_by_biz_storm'] = removed_by_biz_storm
report_stats['stage_4_review']['removed_by_low_effort'] = removed_by_low_effort
report_stats['stage_4_review']['removed_by_extreme_star'] = removed_by_extreme_star
report_stats['stage_4_review']['total_removed'] = final_review_input_count - final_review_output_count
report_stats['stage_4_review']['mean_stars_in'] = round(mean_stars_in, 4)
report_stats['stage_4_review']['mean_stars_out'] = round(mean_stars_out, 4)
report_stats['stage_4_review']['star_rating_shift'] = round(star_rating_shift, 4)
report_stats['stage_4_review']['duration_seconds'] = round(duration_4b, 2)

# Tip stats are already stored at the end of Stage 4c

# Add cumulative rate to report
report_stats['quality_checks']['cumulative_review_removal_rate'] = round(cumulative_removal_rate, 4)
report_stats['quality_checks']['max_cumulative_review_removal_rate'] = MAX_CUMULATIVE_REVIEW_REMOVAL_RATE
report_stats['quality_checks']['star_rating_shift'] = round(star_rating_shift, 4)
report_stats['quality_checks']['max_star_rating_shift'] = MAX_STAR_RATING_SHIFT

# Display combined Stage 4 stats for notebook visibility
print(f"\n--- Stage 4 Heuristics Summary ---")
print(f"Stage 4 Input Reviews (Deduped): {final_review_input_count}")
print(f"Stage 4 Output Reviews (Cleaned): {final_review_output_count}")
print(f"  Removed - User Burst: {removed_by_user_burst}")
print(f"  Removed - Biz Storm: {removed_by_biz_storm}")
print(f"  Removed - Low Effort (Review): {removed_by_low_effort}")
print(f"  Removed - Extreme Star: {removed_by_extreme_star}")
print(f"Stage 4 Input Tips (Valid Biz): {report_stats['stage_4_tip']['input_valid_biz_tips']}") # Use correct stat
print(f"Stage 4 Output Tips (Cleaned): {final_tip_output_count}")
print(f"  Removed - Low Effort (Tip): {tips_removed_by_low_effort}")
print(f"Mean Star Rating Shift (Stage 4): {star_rating_shift:.4f}")
print(f"Cumulative Review Removal Rate (Stages 3 & 4): {cumulative_removal_rate:.4f}")
logging.info("Stage 4e: Cleaning up temporary files...")
# TEMP_TIP_LANG_OK is removed as it's no longer created
files_to_delete = [TEMP_REVIEW_DEDUPED]
for file_path in files_to_delete:
    try:
        if os.path.exists(file_path):
            logging.info(f"Deleting temporary file: {file_path}")
            os.remove(file_path)
            logging.info(f"Successfully deleted {file_path}.")
        else:
             logging.warning(f"Temporary file not found for deletion: {file_path}")
    except OSError as e:
        logging.error(f"Error deleting file {file_path}: {e.strerror}")

duration_stage4 = time.time() - start_time_stage4 # Total duration for all of Stage 4
report_stats['stage_4_total_duration_seconds'] = round(duration_stage4, 2) # Store total duration
logging.info(f"--- Stage 4: Burst & Simple Spam Heuristics END --- Took {duration_stage4:.2f} seconds ---")

2025-05-04 20:09:35,465 - INFO - --- Stage 4: Burst & Simple Spam Heuristics START ---
2025-05-04 20:09:35,466 - INFO - Stage 4a: Calculating business daily review statistics...


2025-05-04 20:09:39,288 - INFO - Processed 500000 lines for stats calculation...
2025-05-04 20:09:43,032 - INFO - Processed 1000000 lines for stats calculation...
2025-05-04 20:09:46,996 - INFO - Processed 1500000 lines for stats calculation...
2025-05-04 20:09:50,784 - INFO - Processed 2000000 lines for stats calculation...
2025-05-04 20:09:54,564 - INFO - Processed 2500000 lines for stats calculation...
2025-05-04 20:09:58,374 - INFO - Processed 3000000 lines for stats calculation...
2025-05-04 20:10:02,247 - INFO - Processed 3500000 lines for stats calculation...
2025-05-04 20:10:06,142 - INFO - Processed 4000000 lines for stats calculation...
2025-05-04 20:10:10,099 - INFO - Processed 4500000 lines for stats calculation...
2025-05-04 20:10:13,953 - INFO - Processed 5000000 lines for stats calculation...
2025-05-04 20:10:28,460 - INFO - Stage 4a: Statistics calculation complete. Found stats for 68696 businesses. Took 52.99s.
2025-05-04 20:10:28,461 - INFO - Stage 4b: Filtering revie

Stage 4c Input Tips Read (Total): 908915
Stage 4c Input Tips (Valid Biz): 741599
Stage 4c Output Tips (Cleaned): 741599
  Removed - Low Effort: 0

--- Stage 4 Heuristics Summary ---
Stage 4 Input Reviews (Deduped): 0
Stage 4 Output Reviews (Cleaned): 5084370
  Removed - User Burst: 3132
  Removed - Biz Storm: 171192
  Removed - Low Effort (Review): 0
  Removed - Extreme Star: 1299
Stage 4 Input Tips (Valid Biz): 741599
Stage 4 Output Tips (Cleaned): 741599
  Removed - Low Effort (Tip): 0
Mean Star Rating Shift (Stage 4): 3.8046
Cumulative Review Removal Rate (Stages 3 & 4): 0.0334


2025-05-04 20:14:09,375 - INFO - Successfully deleted ../cleaned_yelp_data/temp_review_deduped.jsonl.
2025-05-04 20:14:09,376 - INFO - --- Stage 4: Burst & Simple Spam Heuristics END --- Took 273.91 seconds ---


## Stage 5: User Pruning

Filter the original `user.json` file to keep only users relevant to the cleaned dataset.
A user is kept if:
1. They authored at least one review in `cleaned_yelp_academic_dataset_review.json`.
2. OR they authored at least one tip in `cleaned_yelp_academic_dataset_tip.json`.
3. OR they have at least one friend who meets criteria 1 or 2.

In [5]:
logging.info("--- Stage 5: User Pruning START ---")
start_time_stage5 = time.time()

# --- Sub-Stage 5a: Collect Active User IDs ---
logging.info("Stage 5a: Collecting active user IDs from cleaned reviews and tips...")
start_time_5a = time.time()

active_user_ids = set()
files_to_scan = [CLEANED_REVIEW_FILE, CLEANED_TIP_FILE]

for file_path in files_to_scan:
    processed_lines = 0
    try:
        with open(file_path, 'r', encoding='utf-8') as f_scan:
            logging.info(f"Scanning {file_path} for user IDs...")
            for line in f_scan:
                processed_lines += 1
                data = parse_json_line(line)
                if data and 'user_id' in data:
                    active_user_ids.add(data['user_id'])

                if processed_lines % CHUNK_SIZE == 0:
                    logging.info(f"Scanned {processed_lines} lines from {os.path.basename(file_path)}...")
        logging.info(f"Finished scanning {file_path}. Found {len(active_user_ids)} unique active users so far.")
    except FileNotFoundError:
        logging.warning(f"File not found while collecting active users: {file_path}. Skipping.")
    except Exception as e:
        logging.error(f"An error occurred scanning {file_path} for users: {e}", exc_info=True)

duration_5a = time.time() - start_time_5a
logging.info(f"Stage 5a: Active user ID collection complete. Found {len(active_user_ids)} active users. Took {duration_5a:.2f}s.")

# --- Sub-Stage 5b: Filter User File ---
logging.info("Stage 5b: Filtering main user file...")
start_time_5b = time.time()

user_input_count = 0
user_output_count = 0
user_processed_lines = 0

try:
    with open(USER_FILE, 'r', encoding='utf-8') as f_in_user, \
         open(CLEANED_USER_FILE, 'w', encoding='utf-8') as f_out_user:

        for line in f_in_user:
            user_processed_lines += 1
            user_data = parse_json_line(line)

            if user_data:
                user_input_count += 1
                user_id = user_data.get('user_id')
                friends_str = user_data.get('friends', '')

                if not user_id:
                    logging.warning(f"Skipping user record with missing user_id.")
                    continue

                keep_user = False
                # Check if user is directly active
                if user_id in active_user_ids:
                    keep_user = True
                else:
                    # Check if any friend is active
                    if friends_str and friends_str.lower() != 'none':
                        friend_ids = set(f.strip() for f in friends_str.split(',') if f.strip())
                        if not friend_ids.isdisjoint(active_user_ids):
                            keep_user = True

                if keep_user:
                    write_json_line(f_out_user, user_data)
                    user_output_count += 1

            if user_processed_lines % CHUNK_SIZE == 0:
                logging.info(f"Processed {user_processed_lines} lines from user file...")

except FileNotFoundError:
    logging.error(f"Error: Input file not found at {USER_FILE}")
except Exception as e:
    logging.error(f"An error occurred during Stage 5b (User Filtering): {e}", exc_info=True)

duration_5b = time.time() - start_time_5b
duration_stage5 = time.time() - start_time_stage5

report_stats['stage_5_user']['input_count'] = user_input_count
report_stats['stage_5_user']['output_count'] = user_output_count
report_stats['stage_5_user']['active_users_found'] = len(active_user_ids)
report_stats['stage_5_user']['duration_seconds'] = round(duration_stage5, 2)

logging.info(f"--- Stage 5: User Pruning END ---")
logging.info(f"Processed {user_input_count} users from original file.")
logging.info(f"Kept {user_output_count} relevant users.")
logging.info(f"Stage 5 took {duration_stage5:.2f} seconds.")

# Display stats for notebook visibility
print(f"Stage 5 Input Users: {user_input_count}")
print(f"Stage 5 Active Users (from reviews/tips): {len(active_user_ids)}")
print(f"Stage 5 Output Users (Cleaned): {user_output_count}")

2025-05-04 20:14:09,399 - INFO - --- Stage 5: User Pruning START ---
2025-05-04 20:14:09,400 - INFO - Stage 5a: Collecting active user IDs from cleaned reviews and tips...


2025-05-04 20:14:09,437 - INFO - Scanning ../cleaned_yelp_data/cleaned_yelp_academic_dataset_review.json for user IDs...
2025-05-04 20:14:13,555 - INFO - Scanned 500000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:17,052 - INFO - Scanned 1000000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:20,379 - INFO - Scanned 1500000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:23,628 - INFO - Scanned 2000000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:26,941 - INFO - Scanned 2500000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:30,278 - INFO - Scanned 3000000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:33,446 - INFO - Scanned 3500000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:36,815 - INFO - Scanned 4000000 lines from cleaned_yelp_academic_dataset_review.json...
2025-05-04 20:14:40,250 - INFO - Scanned

Stage 5 Input Users: 1987897
Stage 5 Active Users (from reviews/tips): 1516292
Stage 5 Output Users (Cleaned): 1677963


## Stage 6: Summary Report

Consolidate statistics collected from all stages and write the final report to `noise_cleaning_report.json`.

In [6]:
logging.info("--- Stage 6: Summary Report START ---")
start_time_stage6 = time.time()

# --- Add Overall Summary Stats ---
# Store overall timing using the initial start_time captured in the config cell
pipeline_start_timestamp = start_time # Should be defined very early in the script
report_stats['summary']['pipeline_start_time'] = datetime.datetime.fromtimestamp(pipeline_start_timestamp).strftime('%Y-%m-%d %H:%M:%S')
report_stats['summary']['pipeline_end_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
total_duration = (time.time() - pipeline_start_timestamp)
report_stats['summary']['total_duration_seconds'] = round(total_duration, 2)

# Calculate overall percentages using correct initial counts
initial_biz = report_stats['stage_1_business']['input_count']
final_biz = report_stats['stage_1_business']['output_count']
report_stats['summary']['business_retention_rate'] = round(final_biz / initial_biz, 4) if initial_biz > 0 else 0

# Initial reviews are those from valid businesses input to Stage 3
initial_rev = report_stats['stage_3_dedup']['input_valid_biz_reviews']
final_rev = report_stats['stage_4_review']['output_count'] # Final count after Stage 4 heuristics
report_stats['summary']['overall_review_retention_rate'] = round(final_rev / initial_rev, 4) if initial_rev > 0 else 0

# Initial tips are those from valid businesses input to Stage 4c heuristic filter
initial_tip = report_stats['stage_4_tip']['input_valid_biz_tips']
final_tip = report_stats['stage_4_tip']['output_count'] # Final count after Stage 4 heuristics
report_stats['summary']['overall_tip_retention_rate'] = round(final_tip / initial_tip, 4) if initial_tip > 0 else 0

initial_usr = report_stats['stage_5_user']['input_count']
final_usr = report_stats['stage_5_user']['output_count']
report_stats['summary']['user_retention_rate'] = round(final_usr / initial_usr, 4) if initial_usr > 0 else 0

# Add final counts to summary for easy access
report_stats['summary']['final_business_count'] = final_biz
report_stats['summary']['final_review_count'] = final_rev
report_stats['summary']['final_tip_count'] = final_tip
report_stats['summary']['final_user_count'] = final_usr

# Convert defaultdict to regular dict for JSON serialization
# Use a helper function to handle nested defaultdicts properly
def defaultdict_to_dict(d):
    if isinstance(d, defaultdict):
        d = {k: defaultdict_to_dict(v) for k, v in d.items()}
    return d

final_report = defaultdict_to_dict(report_stats)

# --- Write Report File ---
try:
    with open(REPORT_FILE, 'w', encoding='utf-8') as f_report:
        json.dump(final_report, f_report, indent=4)
    logging.info(f"Successfully wrote summary report to {REPORT_FILE}")
except Exception as e:
    logging.error(f"Failed to write summary report: {e}", exc_info=True)

duration_stage6 = time.time() - start_time_stage6
logging.info(f"--- Stage 6: Summary Report END --- Took {duration_stage6:.2f} seconds ---")

# --- Display Final Summary in Notebook ---
print("\n--- Pipeline Complete --- Summary ---")
print(f"Total Execution Time: {total_duration:.2f} seconds ({total_duration/60:.2f} minutes)")
print("\nInput Counts (Original Files):")
print(f"  Businesses: {initial_biz}")
print(f"  Reviews:    {report_stats['stage_3_dedup']['input_reviews_read']}") # Total reviews read in Stage 3
print(f"  Tips:       {report_stats['stage_4_tip']['input_tips_read_total']}") # Total tips read in Stage 4c
print(f"  Users:      {initial_usr}")

print("\nInput Counts (After Business Filtering - Base for Retention):")
print(f"  Reviews (Valid Biz): {initial_rev}")
print(f"  Tips (Valid Biz):    {initial_tip}")

print("\nFinal Output Counts (Cleaned):")
print(f"  Businesses: {final_biz} ({report_stats['summary']['business_retention_rate']:.2%})")
print(f"  Reviews:    {final_rev} ({report_stats['summary']['overall_review_retention_rate']:.2%})")
print(f"  Tips:       {final_tip} ({report_stats['summary']['overall_tip_retention_rate']:.2%})")
print(f"  Users:      {final_usr} ({report_stats['summary']['user_retention_rate']:.2%})")


# Check for the existence of the quality_alerts key before iterating
alerts_triggered = final_report.get('quality_alerts', {})
if any(alerts_triggered.values()):
    print("\nQuality Alerts Triggered:")
    for alert, triggered in alerts_triggered.items():
        if triggered:
            print(f"  - {alert}")
else:
    print("\nNo quality alerts triggered.")

print(f"\nDetailed report saved to: {REPORT_FILE}")

2025-05-04 20:16:54,863 - INFO - --- Stage 6: Summary Report START ---
2025-05-04 20:16:54,866 - INFO - Successfully wrote summary report to ../noise_cleaning_report.json
2025-05-04 20:16:54,867 - INFO - --- Stage 6: Summary Report END --- Took 0.00 seconds ---



--- Pipeline Complete --- Summary ---
Total Execution Time: 649.13 seconds (10.82 minutes)

Input Counts (Original Files):
  Businesses: 150346
  Reviews:    6990280
  Tips:       908915
  Users:      1987897

Input Counts (After Business Filtering - Base for Retention):
  Reviews (Valid Biz): 5259993
  Tips (Valid Biz):    741599

Final Output Counts (Cleaned):
  Businesses: 68696 (45.69%)
  Reviews:    5084370 (96.66%)
  Tips:       741599 (100.00%)
  Users:      1677963 (84.41%)

Quality Alerts Triggered:
  - star_rating_shift_exceeded

Detailed report saved to: ../noise_cleaning_report.json
