In [None]:
# Webscraping Rotten Tomatoes User Reviews (Python 3 + Selenium)
# Updated 2026 to work with RT's current JavaScript-rendered site.
#
# Strategy:
#   1. Use Selenium to load the user reviews page and click "Load More"
#   2. Intercept internal API (XHR/fetch) responses for clean JSON data
#   3. Fall back to HTML parsing if API interception doesn't work
#
# Requirements:
#   pip install selenium beautifulsoup4 pandas lxml webdriver-manager

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException, NoSuchElementException, ElementClickInterceptedException
)
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
import pandas as pd
import json
import time
import random
import re
import os

In [None]:
# ── Configuration ──────────────────────────────────────────────
# Change this slug to scrape a different movie.
# The slug is the part after /m/ in the RT URL, e.g.:
#   https://www.rottentomatoes.com/m/the_hurt_locker  →  "the_hurt_locker"

MOVIE_SLUG = "the_hurt_locker"

# Where to save the output CSV (defaults to this notebook's directory)
OUTPUT_DIR = os.path.dirname(os.path.abspath("__file__"))

# Set to False to run Chrome visibly (useful for debugging selectors)
HEADLESS = True

In [None]:
# ── Helper: create a Selenium Chrome driver ───────────────────

def make_driver(headless=True):
    """Create and return a Chrome WebDriver with network logging."""
    opts = Options()
    if headless:
        opts.add_argument("--headless=new")
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-dev-shm-usage")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--window-size=1920,1080")
    opts.add_argument(
        "user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
        "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
    )
    opts.set_capability("goog:loggingPrefs", {"performance": "ALL"})
    service = Service(ChromeDriverManager().install())
    return webdriver.Chrome(service=service, options=opts)

In [None]:
# ── Step 1: Load the user reviews page and discover its structure ──
# The user reviews URL uses /reviews?type=user (or sometimes /reviews/user).
# Run this cell to see what the rendered page looks like and what API calls RT makes.

url = f"https://www.rottentomatoes.com/m/{MOVIE_SLUG}/reviews?type=user"
print(f"Loading: {url}")

driver = make_driver(headless=HEADLESS)
driver.get(url)
time.sleep(5)

# ── Inspect network logs for API endpoints ──
print("=" * 60)
print("NETWORK REQUESTS (looking for review/user-related API calls):")
print("=" * 60)
logs = driver.get_log("performance")
api_urls = set()
for entry in logs:
    try:
        msg = json.loads(entry["message"])["message"]
        if msg["method"] == "Network.requestWillBeSent":
            req_url = msg["params"]["request"]["url"]
            if any(kw in req_url.lower() for kw in ["review", "napi", "user", "audience"]):
                api_urls.add(req_url)
                print(f"  → {req_url}")
    except (KeyError, json.JSONDecodeError):
        pass
if not api_urls:
    print("  (no review-related API calls detected yet)")

# ── Inspect the rendered DOM ──
print("\n" + "=" * 60)
print("DOM INSPECTION:")
print("=" * 60)
soup = BeautifulSoup(driver.page_source, "lxml")

# Check for common review container patterns
for cls in ["review-row", "audience-reviews__review", "review_table_row",
            "audience-review-row", "review-card"]:
    found = soup.find_all(attrs={"class": lambda c: c and cls in str(c)})
    if found:
        print(f"  .{cls}: {len(found)} elements")

# data-qa attributes
data_qa_elements = soup.find_all(attrs={"data-qa": True})
qa_values = sorted(set(el["data-qa"] for el in data_qa_elements))
print(f"  data-qa attributes: {qa_values}")

# Custom RT web components
all_tags = set(tag.name for tag in soup.find_all(True))
rt_tags = sorted(t for t in all_tags if t and t.startswith("rt-"))
print(f"  Custom rt-* components: {rt_tags}")

# Classes containing 'review' or 'audience'
review_classes = set()
for el in soup.find_all(attrs={"class": True}):
    for cls in el.get("class", []):
        if "review" in cls.lower() or "audience" in cls.lower():
            review_classes.add(cls)
print(f"  Classes with 'review'/'audience': {sorted(review_classes)}")

# Print visible text
print(f"\n--- First 3000 chars of visible text ---")
body = soup.find("body")
if body:
    text = body.get_text(separator="\n", strip=True)
    print(text[:3000])

In [None]:
# ── Step 2: Click "Load More" and capture network responses ───

def extract_review_json_from_logs(driver):
    """Extract review data from Chrome performance logs (network responses)."""
    results = []
    logs = driver.get_log("performance")
    for entry in logs:
        try:
            msg = json.loads(entry["message"])["message"]
            if msg["method"] == "Network.responseReceived":
                resp_url = msg["params"]["response"]["url"]
                if any(kw in resp_url.lower() for kw in ["review", "/napi/", "user", "audience"]):
                    request_id = msg["params"]["requestId"]
                    try:
                        body = driver.execute_cdp_cmd(
                            "Network.getResponseBody",
                            {"requestId": request_id}
                        )
                        data = json.loads(body.get("body", "{}"))
                        results.append({"url": resp_url, "data": data})
                    except Exception:
                        pass
        except (KeyError, json.JSONDecodeError):
            pass
    return results

def find_load_more_button(driver):
    """Find the Load More button using multiple strategies."""
    for selector in [
        "button[data-qa='dlp-load-more-button']",
        "rt-button[data-loadmore]",
        "[data-qa='load-more-btn']",
        "button.load-more-button",
        "button.js-load-more-btn",
    ]:
        try:
            btn = driver.find_element(By.CSS_SELECTOR, selector)
            if btn.is_displayed():
                return btn
        except NoSuchElementException:
            continue

    # XPath fallback
    try:
        btn = driver.find_element(
            By.XPATH, "//button[contains(translate(., 'LOADMRE', 'loadmre'), 'load more')]"
        )
        if btn.is_displayed():
            return btn
    except NoSuchElementException:
        pass

    # Iterate all buttons
    for btn in driver.find_elements(By.TAG_NAME, "button"):
        txt = btn.text.strip().lower()
        if "load more" in txt or "show more" in txt:
            if btn.is_displayed():
                return btn

    # Check shadow DOM inside rt-button elements
    try:
        for rt_btn in driver.find_elements(By.CSS_SELECTOR, "rt-button"):
            shadow = driver.execute_script("return arguments[0].shadowRoot", rt_btn)
            if shadow:
                inner_btn = shadow.find_element(By.CSS_SELECTOR, "button")
                if "load more" in inner_btn.text.strip().lower():
                    return rt_btn
    except Exception:
        pass

    return None

def load_all_reviews(driver, max_clicks=500, pause_range=(1.0, 2.5)):
    """Click 'Load More' until all reviews are loaded. Returns captured API data."""
    all_api_data = []
    clicks = 0

    while clicks < max_clicks:
        load_more = find_load_more_button(driver)
        if load_more is None:
            print(f"No more 'Load More' button after {clicks} clicks. All reviews loaded.")
            break

        driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", load_more)
        time.sleep(0.3)
        try:
            load_more.click()
        except ElementClickInterceptedException:
            driver.execute_script("arguments[0].click();", load_more)

        clicks += 1
        if clicks % 10 == 0:
            print(f"  Clicked 'Load More' {clicks} times...")

        time.sleep(random.uniform(*pause_range))
        all_api_data.extend(extract_review_json_from_logs(driver))

    return clicks, all_api_data

total_clicks, captured_api_data = load_all_reviews(driver)
print(f"\nDone. Clicked 'Load More' {total_clicks} times.")
print(f"Captured {len(captured_api_data)} API responses containing review data.")

if captured_api_data:
    print("\nCaptured API endpoints:")
    for item in captured_api_data[:5]:
        print(f"  → {item['url']}")
        if isinstance(item['data'], dict):
            print(f"    Keys: {list(item['data'].keys())[:10]}")

In [None]:
# ── Step 3: Parse user reviews (API JSON → HTML fallback) ─────

def parse_user_reviews_from_api(api_data_list):
    """
    Extract user reviews from captured API JSON responses.
    User reviews typically have: username, rating/score, review text, date,
    and sometimes verified/super-reviewer flags.
    """
    reviews = []
    seen = set()

    for item in api_data_list:
        data = item["data"]

        review_lists = []
        if isinstance(data, list):
            review_lists.append(data)
        elif isinstance(data, dict):
            for key in ["reviews", "items", "results", "data", "rows", "userReviews"]:
                if key in data and isinstance(data[key], list):
                    review_lists.append(data[key])
            for key, val in data.items():
                if isinstance(val, dict):
                    for subkey in ["reviews", "items", "results"]:
                        if subkey in val and isinstance(val[subkey], list):
                            review_lists.append(val[subkey])

        for review_list in review_lists:
            for r in review_list:
                if not isinstance(r, dict):
                    continue

                # Username
                username = ""
                user_obj = r.get("user") or r.get("author") or {}
                if isinstance(user_obj, dict):
                    username = user_obj.get("displayName") or user_obj.get("name") or user_obj.get("username", "")
                elif isinstance(user_obj, str):
                    username = user_obj
                if not username:
                    username = r.get("userName") or r.get("username") or r.get("displayName", "")

                # User ID
                user_id = ""
                if isinstance(user_obj, dict):
                    user_id = str(user_obj.get("userId") or user_obj.get("id", ""))
                if not user_id:
                    user_id = str(r.get("userId") or r.get("user_id", ""))

                # Review text
                review_text = (
                    r.get("review") or r.get("reviewText") or r.get("text")
                    or r.get("quote") or r.get("comment") or ""
                )

                # Date
                date = (
                    r.get("createDate") or r.get("creationDate") or r.get("date")
                    or r.get("reviewDate") or r.get("submittedDate") or ""
                )

                # Rating (user reviews use star ratings, typically 0.5 - 5.0)
                rating = r.get("rating") or r.get("score") or r.get("stars") or ""
                if isinstance(rating, dict):
                    rating = rating.get("value") or rating.get("score", "")

                # Verified / super reviewer flags
                is_verified = r.get("isVerified", False) or r.get("verified", False)
                is_super_reviewer = r.get("isSuperReviewer", False)
                has_spoilers = r.get("hasSpoilers", False) or r.get("isSpoiler", False)

                # De-duplicate
                dedup_key = (str(username), str(review_text)[:50])
                if dedup_key in seen or (not review_text and not username):
                    continue
                seen.add(dedup_key)

                reviews.append({
                    "Username": str(username).strip(),
                    "User ID": str(user_id).strip(),
                    "Date": str(date).strip(),
                    "User Review": str(review_text).strip(),
                    "Rating": rating if rating != "" else "NA",
                    "Verified": is_verified,
                    "Super Reviewer": is_super_reviewer,
                    "Has Spoilers": has_spoilers,
                })

    return reviews


def parse_user_reviews_from_html(driver):
    """
    Fallback: parse user reviews from rendered HTML.
    Uses multiple strategies for RT's evolving page structure.
    """
    soup = BeautifulSoup(driver.page_source, "lxml")
    reviews = []

    # ── Strategy A: div.review-row or audience-review-row ──
    rows = soup.find_all("div", class_="review-row")
    if not rows:
        rows = soup.find_all("div", class_=lambda c: c and "audience" in c and "review" in c)
    if rows:
        print(f"  HTML Strategy A: Found {len(rows)} review row elements")
        for row in rows:
            review = {}

            # Username
            for el in [
                row.find("a", attrs={"data-qa": lambda v: v and "user" in v.lower()}),
                row.find("a", class_=lambda c: c and "user" in c.lower()),
                row.find("rt-link", attrs={"slot": lambda v: v and "user" in v.lower()}),
                row.find("a", href=lambda h: h and "/user/" in h),
            ]:
                if el:
                    review["Username"] = el.get_text(strip=True)
                    href = el.get("href", "")
                    review["User ID"] = href.split("/user/")[-1].strip("/") if "/user/" in href else ""
                    break
            else:
                review["Username"] = ""
                review["User ID"] = ""

            # Date
            for el in [
                row.find(attrs={"data-qa": lambda v: v and "date" in v.lower()}),
                row.find("rt-text", attrs={"slot": lambda v: v and "date" in v.lower()}),
                row.find("span", class_=lambda c: c and "date" in c.lower()),
            ]:
                if el:
                    review["Date"] = el.get_text(strip=True)
                    break
            else:
                review["Date"] = ""

            # Review text
            for el in [
                row.find(attrs={"data-qa": lambda v: v and "review" in v.lower() and "text" in v.lower()}),
                row.find("p", class_=lambda c: c and "review" in c.lower()),
                row.find("div", class_=lambda c: c and "review" in c.lower() and "text" in c.lower()),
                row.find("rt-text", attrs={"slot": lambda v: v and "review" in v.lower()}),
            ]:
                if el:
                    review["User Review"] = el.get_text(strip=True)
                    break
            else:
                review["User Review"] = ""

            # Rating — look for star elements or score displays
            rating = "NA"
            star_el = row.find(attrs={"class": lambda c: c and "star" in c.lower()})
            if star_el:
                filled = row.find_all(attrs={"class": lambda c: c and "filled" in c.lower()})
                half = row.find_all(attrs={"class": lambda c: c and "half" in c.lower()})
                rating = len(filled) + 0.5 * len(half) if filled or half else "NA"
            score_el = row.find(attrs={"data-qa": lambda v: v and "score" in v.lower()})
            if score_el and rating == "NA":
                try:
                    rating = float(score_el.get_text(strip=True).split("/")[0])
                except (ValueError, IndexError):
                    pass
            review["Rating"] = rating

            review["Verified"] = False
            review["Super Reviewer"] = False
            review["Has Spoilers"] = False
            reviews.append(review)

        return reviews

    # ── Strategy B: data-qa based ──
    rows = soup.find_all(attrs={"data-qa": lambda v: v and "review" in v.lower()})
    if rows:
        print(f"  HTML Strategy B: Found {len(rows)} data-qa review elements")
        for row in rows:
            texts = [t.strip() for t in row.stripped_strings if t.strip()]
            reviews.append({
                "Username": texts[0] if len(texts) > 0 else "",
                "User ID": "",
                "Date": texts[-1] if len(texts) > 1 else "",
                "User Review": " ".join(texts[1:-1]) if len(texts) > 2 else "",
                "Rating": "NA",
                "Verified": False,
                "Super Reviewer": False,
                "Has Spoilers": False,
            })
        return reviews

    # ── Strategy C: JavaScript DOM access ──
    try:
        js_reviews = driver.execute_script("""
            const rows = document.querySelectorAll(
                'div[class*="review"], [data-qa*="review"], .review-row'
            );
            return Array.from(rows).map(r => r.innerText);
        """)
        if js_reviews:
            print(f"  HTML Strategy C (JS): Found {len(js_reviews)} text blocks")
            for text in js_reviews:
                lines = [l.strip() for l in text.split('\n') if l.strip()]
                reviews.append({
                    "Username": lines[0] if len(lines) > 0 else "",
                    "User ID": "",
                    "Date": "",
                    "User Review": " ".join(lines[1:]) if len(lines) > 1 else "",
                    "Rating": "NA",
                    "Verified": False,
                    "Super Reviewer": False,
                    "Has Spoilers": False,
                })
            return reviews
    except Exception:
        pass

    print("WARNING: No reviews found with any strategy.")
    print("Set HEADLESS = False, re-run Step 1, and inspect the page in the browser.")
    return reviews


# ── Try API data first, fall back to HTML ──
reviews = []
if captured_api_data:
    reviews = parse_user_reviews_from_api(captured_api_data)
    print(f"Parsed {len(reviews)} user reviews from captured API responses.")

if not reviews:
    print("API parsing yielded 0 reviews. Falling back to HTML parsing...")
    reviews = parse_user_reviews_from_html(driver)
    print(f"Parsed {len(reviews)} user reviews from HTML.")

# Add review IDs
for i, r in enumerate(reviews, start=1):
    r["Review ID"] = i

print(f"\nTotal user reviews: {len(reviews)}")
if reviews:
    print(f"\nSample review:")
    for k, v in reviews[0].items():
        print(f"  {k}: {v}")

In [None]:
# ── Step 4: Save to CSV ───────────────────────────────────────

df = pd.DataFrame(reviews)

# Reorder columns
col_order = ["Review ID", "Username", "User ID", "Date", "User Review",
             "Rating", "Verified", "Super Reviewer", "Has Spoilers"]
df = df[[c for c in col_order if c in df.columns]]

csv_filename = f"{MOVIE_SLUG}_user_reviews.csv"
csv_path = os.path.join(OUTPUT_DIR, csv_filename)
df.to_csv(csv_path, index=False, encoding="utf-8")
print(f"Saved {len(df)} reviews to: {csv_path}")
print(f"\nColumn summary:")
for col in df.columns:
    non_empty = df[col].astype(str).str.strip().ne("").sum()
    print(f"  {col}: {non_empty}/{len(df)} non-empty")
df.head(10)

In [None]:
# ── Step 5: Clean up ──────────────────────────────────────────

driver.quit()
print("Browser closed.")