# Clean Data

In [None]:
import pandas as pd
import numpy as np
import re
import requests
import ast
from bs4 import BeautifulSoup
from tqdm import tqdm
import time

from scripts.utilites import column_stats

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

In [None]:
df = pd.read_csv('merged_data/merged.csv')

display(df[df["title"].str.contains("Star Wars", case=False, na=False)])


### Limit to Labeled Data

In [None]:
display(column_stats(df))

## Combine Duplicate Columns

### Release Date
We are just using this for adjusting the box office so just the year is good enough

In [None]:
date_cols = ["release_date_theaters", "release_year", "release_date"]

display(df[date_cols])

In [None]:
# Drop if they can't be filled via future data mining
df.dropna(subset=date_cols + ['wiki_page'], inplace=True, how='all')
df.reset_index(drop=True, inplace=True)

In [None]:
display(df[date_cols])

In [None]:
# Convert columns to just the year
df["release_date_theaters"] = pd.to_datetime(df["release_date_theaters"], errors="coerce")
df["release_date"] = pd.to_datetime(df["release_date"], errors="coerce")

# Convert all columns to just the year
df["release_date_theaters"] = df["release_date_theaters"].dt.year
df["release_date"] = df["release_date"].dt.year

In [None]:
display(df[date_cols])

In [None]:
# Find rows where at least two different years exist (ignoring NaNs)
mismatch_mask = df[["release_date_theaters", "release_year", "release_date"]].nunique(axis=1, dropna=True) > 1

df = df.loc[~mismatch_mask].reset_index(drop=True)

In [None]:
df["release_year"] = df["release_year"].fillna(df["release_date_theaters"]).fillna(df["release_date"])
df.drop(columns=["release_date_theaters", "release_date"] , inplace=True)

display(df)
display(column_stats(df))

### Box Office

In [None]:
display(df[["box_office", "revenue"]])

In [None]:
# Drop if they can't be filled via future data mining
df[["box_office", "revenue"]] = df[["box_office", "revenue"]].replace(0.0, np.nan)

df.dropna(subset=["box_office", "revenue"] + ['wiki_page'], inplace=True, how='all')
df.reset_index(drop=True, inplace=True)

display(df[["box_office", "revenue"]])

In [None]:
def convert_revenue(value):
    value = str(value)
    value = value.replace('$', '').replace(',', '')
    if 'M' in value:
        return float(value.replace('M', '')) * 1_000_000
    elif 'K' in value:
        return float(value.replace('K', '')) * 1_000
    elif 'B' in value:
        return float(value.replace('B', '')) * 1_000_000_000
    else:
        return float(value)

df['box_office'] = df['box_office'].apply(convert_revenue)

display(df[["box_office", "revenue"]])

In [None]:
df["box_office"] = df["revenue"].fillna(df["box_office"])
df.drop("revenue", inplace=True, axis=1)

In [None]:
df.reset_index(drop=True, inplace=True)
display(df)
display(column_stats(df))

### Runtime

In [None]:
# Compute the absolute percentage difference
mask_mismatch_runtime = (
    (df["runtime_minutes"].notna()) & (df["runtime"].notna()) &  # Ensure both values are present
    (abs(df["runtime_minutes"] - df["runtime"]) / df["runtime"] > 0.10)  # Check >10% difference
)

# Display the mismatched rows
display(df.loc[mask_mismatch_runtime, ["runtime_minutes", "runtime", "title"]])

# Collapse into a single 'runtime' column, prioritizing non-null values
df["runtime"] = df["runtime"].fillna(df["runtime_minutes"])

# Drop the old 'runtime_minutes' column
df.drop(columns=["runtime_minutes"], inplace=True)

# Display the cleaned DataFrame
display(df)
display(column_stats(df))

In [None]:
display(df[df["title"].str.contains("Star Wars", case=False, na=False)])

### Genre

In [None]:
# Drop if they can't be filled from other columns
df.dropna(subset=["genre_x", "genre_y", "genres"], inplace=True, how='all')

display(df[["genre_x", "genre_y", "genres"]])

In [None]:
def combine_genres(row):
    genres = set()
    invalid_values = {"unknown", "nan", ""}

    if pd.notna(row["genre_x"]):
        genres.update(x.strip().lower() for x in row["genre_x"].split(',') if x.strip().lower() not in invalid_values)
    if pd.notna(row["genre_y"]):
        genres.update(x.strip().lower() for x in row["genre_y"].split(',') if x.strip().lower() not in invalid_values)

    if pd.notna(row["genres"]) and isinstance(row["genres"], list):
        genres.update(genre["name"].lower() for genre in row["genres"] if "name" in genre and genre["name"].lower() not in invalid_values)

    return ", ".join(genres) if genres else None

df["genre"] = df.apply(combine_genres, axis=1)
df.drop(columns=["genre_x", "genre_y", "genres"], inplace=True)

display(df)

### Language

In [None]:
lang_cols = ["original_language_x", "original_language_y"]
for col in lang_cols:
    print(df[col].unique())

In [None]:
language_map = {
    "en": "english", "fr": "french", "es": "spanish", "de": "german", "it": "italian",
    "pt": "portuguese", "ru": "russian", "zh": "chinese", "ja": "japanese", "ko": "korean",
    "hi": "hindi", "ar": "arabic", "bn": "bengali", "pa": "punjabi", "ur": "urdu",
    "fa": "persian", "tr": "turkish", "pl": "polish", "nl": "dutch", "sv": "swedish",
    "fi": "finnish", "no": "norwegian", "da": "danish", "cs": "czech", "el": "greek",
    "hu": "hungarian", "ro": "romanian", "th": "thai", "vi": "vietnamese",
    "he": "hebrew", "id": "indonesian", "uk": "ukrainian", "xx": None
}

def clean_language(lang):
    if pd.isna(lang) or lang.lower() in {"unknown language", "nan", ""}:
        return None

    lang = lang.lower().strip()

    lang = re.split(r'\s*\(', lang)[0].strip()

    return language_map.get(lang, lang)

df["original_language_x"] = df["original_language_x"].apply(clean_language)

df["language"] = df["original_language_x"].fillna(df["original_language_y"].apply(clean_language))
df.drop(columns=["original_language_x", "original_language_y", "spoken_languages"], inplace=True)

display(df)

### Production Companies

In [None]:
df.drop("production_companies", inplace=True, axis=1)

## Drop non-english, pre 1977 movies

In [None]:
display(df[df["title"].str.contains("Star Wars", case=False, na=False)])
df = df[df["release_year"] >= 1977]
df = df[df["language"].str.contains("English", case=False, na=False)]
display(df[df["title"].str.contains("Star Wars", case=False, na=False)])

display(column_stats(df))

## Wikipedia Data Mining

In [None]:
display(column_stats(df))

In [None]:
def fetch_wikipedia_page(url, session):
    """Fetch the full HTML content of a Wikipedia page using a session."""
    if pd.isna(url) or not isinstance(url, str) or not url.startswith("http"):
        return None, None  # Ensure both values are returned

    try:
        start_time = time.time()  # Track request start time
        response = session.get(url, headers={"User-Agent": "Mozilla/5.0"})
        elapsed_time = time.time() - start_time  # Calculate time taken

        if response.status_code != 200:
            return None, elapsed_time  # Return time even on failure

        return response.text, elapsed_time  # Store HTML and request time

    except Exception as e:
        print(f"Error fetching {url}: {e}")  # Log errors for debugging
        return None, None  # Always return a tuple


def scrape_wikipedia_pages(df, url_column, output_column):
    """Scrape Wikipedia pages with a progress bar and dynamic ETA display."""
    missing_mask = df[output_column].isna()
    urls_to_scrape = df.loc[missing_mask, url_column]
    times = []  # Store request times for rolling average

    session = requests.Session()  # Use a session for efficiency

    print(f"Scraping {len(urls_to_scrape)} pages...")
    progress_bar = tqdm(urls_to_scrape, desc="Fetching pages", unit="page")

    for idx, url in enumerate(progress_bar):
        html, elapsed_time = fetch_wikipedia_page(url, session)

        if elapsed_time:
            times.append(elapsed_time)
            if len(times) > 10:  # Keep a rolling average over the last 10 requests
                times.pop(0)

        # Estimate time remaining
        avg_time = sum(times) / len(times) if times else 0
        remaining_time = avg_time * (len(urls_to_scrape) - idx - 1)
        eta = time.strftime("%H:%M:%S", time.gmtime(remaining_time))

        # Update progress bar with ETA instead of printing new lines
        progress_bar.set_postfix({"ETA": eta})

        # Store results properly using .loc
        df.loc[urls_to_scrape.index[idx], output_column] = html

    return df

df["wiki_page_html"] = pd.NA
df = scrape_wikipedia_pages(df, "wiki_page", "wiki_page_html")

In [None]:
df.to_csv("wiki_scraped_data.csv", index=False)
df = pd.read_csv("wiki_scraped_data.csv")

In [None]:
display(column_stats(df))

In [None]:
def extract_raw_box_office_text(html):
    """Extract the raw box office revenue text from stored Wikipedia HTML."""
    if pd.isna(html):
        return None  # Skip missing HTML

    try:
        soup = BeautifulSoup(html, "html.parser")

        # Find the box office section in the Wikipedia infobox
        infobox = soup.find("table", class_="infobox")
        if not infobox:
            return None  # No infobox found

        for row in infobox.find_all("tr"):
            header = row.find("th")
            if header and "Box office" in header.text:
                value = row.find("td").text.strip()
                return value  # Store raw text without conversion

    except Exception as e:
        print(f"Error extracting box office data: {e}")

    return None  # Return None if no valid box office data was found

def extract_box_office_single_threaded(df, html_column, output_column):
    """Extract box office data from HTML with progress tracking (single-threaded)."""
    mask = df[output_column].isna()  # Only process rows where output is missing
    html_data = df.loc[mask, html_column]

    results = []

    for html in tqdm(html_data, total=len(html_data), desc="Extracting Box Office", unit="page"):
        results.append(extract_raw_box_office_text(html))

    df.loc[mask, output_column] = results  # Store extracted values
    return df

# Run the function with multithreading
df["box_office_wiki_mined"] = pd.NA
df = extract_box_office_single_threaded(df, "wiki_page_html", "box_office_wiki_mined")

In [None]:
def clean_raw_box_office_text(text):
    if not isinstance(text, str):
        return np.nan  # Non-string inputs become NaN

    # Normalize whitespace: replace HTML breaks with a space and collapse whitespace.
    text = text.replace("<br>", " ")
    text = re.sub(r"\s+", " ", text)

    # Process only if there is a US dollar sign with a digit after it.
    if not re.search(r"\$\d", text):
        return np.nan

    # Regex explanation:
    # - \$: literal dollar sign.
    # - (?P<number>(?:\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?)):
    #       Either a properly comma-formatted number (e.g. "13,747,138") or a plain number (e.g. "6.3" or "4").
    # - (?:\s*[–-]\s*(?P<number2>(?:\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?)))?:
    #       Optionally capture a range (e.g. "25.1-29.6") but we will use only the first number.
    # - (?=[^\d,\.]|$):
    #       Ensure that we stop matching once digits (or commas/periods) that might be part of a second, concatenated number appear.
    # - (?:\s*(?P<scale>million|millon|billion))?:
    #       Optionally capture a scale word (accepting “millon” as a typo).
    pattern = re.compile(
        r"\$(?P<number>(?:\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?))"
        r"(?:\s*[–-]\s*(?P<number2>(?:\d{1,3}(?:,\d{3})+|\d+(?:\.\d+)?)))?"
        r"(?=[^\d,\.]|$)"
        r"(?:\s*(?P<scale>million|millon|billion))?",
        re.IGNORECASE
    )

    match = pattern.search(text)
    if not match:
        return np.nan

    # Always take the first number if a range is present.
    amount_str = match.group("number")
    scale = match.group("scale")

    # Remove commas for conversion
    amount_str = amount_str.replace(',', '')

    try:
        amount = float(amount_str)
    except ValueError:
        return np.nan

    # Adjust for scale if present.
    if scale:
        scale = scale.lower()
        if scale in ("million", "millon"):
            amount *= 1_000_000
        elif scale == "billion":
            amount *= 1_000_000_000

    return amount

df["cleaned_box_office_wiki_mined"] = df["box_office_wiki_mined"].apply(clean_raw_box_office_text)

df["box_office"] = df["cleaned_box_office_wiki_mined"].fillna(df["box_office"])

In [None]:
pd.set_option('display.max_rows', 100)

In [None]:
box_office_cols = ["box_office", "box_office_wiki_mined", "cleaned_box_office_wiki_mined"]

df[box_office_cols]

## Drop row where Box Office is still missing

In [None]:
df.dropna(inplace=True, subset=["box_office"])

In [None]:
display(column_stats(df))

In [None]:
df.to_csv("wip_data.csv", index=False)
df = pd.read_csv("wip_data.csv")

### Writer, Director, Distributor, SoundMix, budget, runtime, language, cast

In [None]:
def extract_primary_writer(html):
    """Extract the primary writer's name from stored Wikipedia HTML."""
    if pd.isna(html):
        return None  # Skip missing HTML

    try:
        soup = BeautifulSoup(html, "html.parser")

        # Find the infobox in the Wikipedia HTML
        infobox = soup.find("table", class_="infobox")
        if not infobox:
            return None  # No infobox found

        for row in infobox.find_all("tr"):
            header = row.find("th")
            if header and "Written by" in header.text:
                writer_cell = row.find("td")
                if writer_cell:
                    # Try to extract the first linked name (if available)
                    first_link = writer_cell.find("a")
                    if first_link:
                        return first_link.text.strip()

                    # If no links, get the first text entry
                    return writer_cell.text.strip().split(",")[0]  # Take only the first name if comma-separated

    except Exception as e:
        print(f"Error extracting writer data: {e}")

    return None  # Return None if no valid writer data was found

def extract_primary_writer_single_threaded(df, html_column, output_column):
    """Extract the primary writer data from HTML with progress tracking (single-threaded)."""
    mask = df[output_column].isna()  # Only process rows where output is missing
    html_data = df.loc[mask, html_column]

    results = []

    for html in tqdm(html_data, total=len(html_data), desc="Extracting Primary Writer", unit="page"):
        results.append(extract_primary_writer(html))

    df.loc[mask, output_column] = results  # Store extracted values
    return df

# Run the function for writers
df["primary_writer_wiki_mined"] = pd.NA
df = extract_primary_writer_single_threaded(df, "wiki_page_html", "primary_writer_wiki_mined")

In [None]:
df["writer"] = df["primary_writer_wiki_mined"].fillna(df["writer"])
# df.drop(["primary_writer_wiki_mined"], inplace=True, axis=1)

In [None]:
display(column_stats(df))

In [None]:
def extract_movie_info(html):
    """Extract movie metadata (director, distributor, budget, runtime, language, top 5 cast) from Wikipedia HTML."""
    if pd.isna(html):
        return None, None, None, None, None, None  # Return all fields as None for missing HTML

    try:
        soup = BeautifulSoup(html, "html.parser")
        infobox = soup.find("table", class_="infobox")
        if not infobox:
            return None, None, None, None, None, None  # No infobox found

        data = {
            "director": None,
            "distributor": None,
            "budget": None,
            "runtime": None,
            "language": None,
            "top_5_cast": None
        }

        for row in infobox.find_all("tr"):
            header = row.find("th")
            if not header:
                continue

            key = header.text.strip()
            value_cell = row.find("td")
            if not value_cell:
                continue

            # Extract text values
            if key == "Directed by":
                first_director = value_cell.find("a")  # Prefer linked names
                data["director"] = first_director.text.strip() if first_director else value_cell.text.strip()

            elif key == "Distributed by":
                distributors = [a.text.strip() for a in value_cell.find_all("a")]  # Get all linked names
                if not distributors:
                    distributors = [value_cell.text.strip()]  # Fallback to plain text
                data["distributor"] = ", ".join(distributors)  # Store as a comma-separated string

            elif key == "Budget":
                data["budget"] = value_cell.text.strip()

            elif key == "Running time":
                data["runtime"] = value_cell.text.strip()

            elif key == "Language":
                languages = [a.text.strip() for a in value_cell.find_all("a")]
                if not languages:
                    languages = [value_cell.text.strip()]
                data["language"] = ", ".join(languages)

            elif key == "Starring":
                cast = [a.text.strip() for a in value_cell.find_all("a")]  # Get all linked cast members
                if not cast:
                    cast = value_cell.text.strip().split("\n")  # Fallback for unlinked names
                data["top_5_cast"] = ", ".join(cast[:5])  # Limit to the top 5

        return (data["director"], data["distributor"], data["budget"],
                data["runtime"], data["language"], data["top_5_cast"])

    except Exception as e:
        print(f"Error extracting movie data: {e}")

    return None, None, None, None, None, None

def extract_movie_data(df, html_column):
    """Extract multiple metadata fields from Wikipedia HTML (single pass)."""
    mask = df["director_mined"].isna()  # Only process rows where output is missing
    html_data = df.loc[mask, html_column]

    results = []
    for html in tqdm(html_data, total=len(html_data), desc="Extracting Movie Data", unit="page"):
        results.append(extract_movie_info(html))

    # Store results in corresponding DataFrame columns with '_mined' suffix
    df.loc[mask, ["director_mined", "distributor_mined", "budget_mined",
                  "runtime_mined", "language_mined", "cast_mined"]] = results
    return df

# Initialize new columns with '_mined' suffix
df[["director_mined", "distributor_mined", "budget_mined",
    "runtime_mined", "language_mined", "cast_mined"]] = pd.NA

# Run the function
df = extract_movie_data(df, "wiki_page_html")

In [None]:
#df.to_csv("wip_data.csv", index=False)
#df = pd.read_csv("wip_data.csv")

In [None]:
display(df)

display(column_stats(df))

In [None]:
df["cast"] = df["cast_mined"].fillna(df["cast"])
# df.drop(["cast_mined"], inplace=True, axis=1)

In [None]:
df["language"] = df["language_mined"].fillna(df["language"])
# df.drop(["language_mined"], inplace=True, axis=1)

In [None]:
df["director"] = df["director_mined"].fillna(df["director"])
# df.drop(["director_mined"], inplace=True, axis=1)

In [None]:
df["distributor_mined"] = df["distributor_mined"].str.replace(r"\[\d+\]", "", regex=True).str.strip()
df["distributor"] = df["distributor_mined"].fillna(df["distributor"])
# df.drop(["distributor_mined"], inplace=True, axis=1)

In [None]:
def clean_raw_budget_text(text):
    if not isinstance(text, str):
        return np.nan  # Return NaN for non-string inputs

    # Remove Wikipedia-style citations like [1], [2]
    text = re.sub(r"\[\d+\]", "", text)

    # Handle "or" cases by taking the first amount (e.g., "$1 million or 2 million")
    text = re.split(r"\s+or\s+", text, maxsplit=1)[0]

    # Extract dollar amount
    match = re.search(r"\$([\d,.]+(?:\s?-\s?[\d,.]+)?)(?:\s*(million|billion))?", text, re.IGNORECASE)
    if not match:
        return np.nan  # Return NaN if no valid dollar amount is found

    amount, scale = match.groups()
    amount = amount.replace(',', '')  # Remove commas from numbers

    # Handle ranges (e.g., "1.3-1.6 million" → take the first number)
    if '-' in amount:
        amount = amount.split('-')[0]

    try:
        amount = float(amount)  # Convert to float
    except ValueError:
        return np.nan  # If conversion fails, return NaN

    # Convert million/billion to raw numbers
    if scale:
        scale = scale.lower()
        if scale == "million":
            amount *= 1_000_000
        elif scale == "billion":
            amount *= 1_000_000_000

    return amount

df["budget_mined"] = df["budget_mined"].apply(clean_raw_budget_text)
display(column_stats(df))

In [None]:
df["budget"] = df["budget_mined"].fillna(df["budget"])
# df.drop(["budget_mined"], inplace=True, axis=1)

In [None]:
def clean_runtime_text(text):
    if not isinstance(text, str):
        return np.nan  # Return NaN for non-string inputs

    # Remove Wikipedia-style citations like [1], [2]
    text = re.sub(r"\[\d+\]", "", text).strip()

    # Extract numeric runtime
    match = re.search(r"(\d+)", text)
    if match:
        return float(match.group(1))  # Convert to float

    return np.nan  # Return NaN if no valid number is found
df["runtime_mined"] = df["runtime_mined"].apply(clean_runtime_text)

df["runtime"] = df["runtime_mined"].fillna(df["runtime"])
# df.drop(["runtime_mined"], inplace=True, axis=1)

In [None]:
df.drop(["wiki_page_html"], inplace=True, axis=1)
display(df)

display(column_stats(df))

In [None]:
def clean_rating_column(value):
    if isinstance(value, str):
        try:
            value = ast.literal_eval(value)  # Convert string representation of list to an actual list
            if isinstance(value, list):
                return ", ".join(value)  # Join list elements with a comma
        except (ValueError, SyntaxError):
            pass
    return value  # Return as is if not a valid list

df["rating_contents"] = df["rating_contents"].apply(clean_rating_column)
display(df)

In [None]:
df = df.apply(lambda x: x.str.lower() if x.dtype == "object" else x)
display(df)

In [None]:
df = df[df["language"].str.contains("English", case=False, na=False)]
display(df)
display(column_stats(df))

In [None]:
df = df[df["release_year"] >= 1977]
display(df)
display(column_stats(df))

In [None]:
# Drop unwanted index-related columns if they exist
df = df.loc[:, ~df.columns.str.match(r'level_0|index')]
df.drop(columns=["box_office_wiki_mined", "cleaned_box_office_wiki_mined", "primary_writer_wiki_mined", "director_mined", "distributor_mined", "budget_mined", "runtime_mined", "language_mined", "cast_mined"], inplace=True)
# Reset the index properly
df.reset_index(drop=True, inplace=True)

# Save to CSV
df.to_csv("final_data.csv", index=False)