In [None]:
import pandas as pd
import numpy as np
import json
import ast

def load_data(filepath):
    """Load and preprocess the raw movie metadata."""
    movies = pd.read_csv(filepath, low_memory=False)
    
    # Set display options
    pd.set_option('display.float_format', lambda x: f"{x:,.2f}")
    pd.set_option('display.max_columns', None)
    
    return movies

def select_columns(df):
    """Select relevant columns for analysis."""
    columns_to_keep = [
        'imdb_id', 'id', 'title', 'budget', 'revenue', 'release_date', 
        'status', 'runtime', 'genres', 'original_language', 'overview',
        'vote_average', 'vote_count', 'popularity', 'production_companies',
        'production_countries'
    ]
    return df[columns_to_keep]

def clean_data_types(df):
    """Convert data types and handle missing values."""
    # Convert to numeric types
    df['budget'] = pd.to_numeric(df['budget'], errors='coerce')
    df['id'] = pd.to_numeric(df['id'], errors='coerce')
    df['revenue'] = pd.to_numeric(df['revenue'], errors='coerce')
    
    # Convert to datetime and handle invalid dates
    df['release_date'] = pd.to_datetime(
        df['release_date'], 
        format='%Y-%m-%d', 
        errors='coerce'
    )
    
    # Handle missing IDs
    df['id'] = df['id'].fillna(0).astype(int)
    
    return df

def filter_data(df):
    """Filter movies based on status and financial criteria."""
    # Only keep released movies
    df = df[df['status'] == 'Released']
    
    # Apply financial filters
    df = df[(df['budget'] >= 10000) & (df['revenue'] >= 1000)]
    df = df[df['revenue'] <= (df['budget'] * 10)]
    
    return df

def process_genres(df):
    """Parse and process genre information."""
    # First clean the genres column - handle NaN and malformed strings
    df['genres'] = df['genres'].fillna('[]')  # Replace NaN with empty list
    
    # Parse JSON strings to get genre names
    def parse_genres(x):
        try:
            if isinstance(x, str):
                # Handle cases where strings might have single quotes
                x = x.replace("'", '"')
                return [g['name'] for g in json.loads(x)]
            return []
        except (json.JSONDecodeError, TypeError):
            return []
    
    df['genres_parsed'] = df['genres'].apply(parse_genres)
    
    # Add genre count column
    df['num_genres'] = df['genres_parsed'].apply(len)
    
    return df

def create_genre_tables(df):
    """Create normalized tables for genres."""
    # Create genre lookup table
    all_genres = []
    
    def extract_genres(genre_str):
        try:
            if pd.isna(genre_str):
                return []
            if isinstance(genre_str, str):
                genre_str = genre_str.replace("'", '"')
                return json.loads(genre_str)
            return []
        except (json.JSONDecodeError, TypeError):
            return []
    
    for genres_list in df['genres']:
        genres = extract_genres(genres_list)
        for genre in genres:
            if isinstance(genre, dict):
                all_genres.append(genre)
    
    genre_df = pd.DataFrame(all_genres).drop_duplicates()
    genre_df = genre_df.rename(columns={'id': 'genre_id', 'name': 'genre_name'})
    
    # Create movie-genre relationship table
    movie_genre_pairs = []
    
    for idx, row in df.iterrows():
        imdb_id = row['imdb_id']
        genres = extract_genres(row['genres'])
        for genre in genres:
            if isinstance(genre, dict) and 'id' in genre:
                movie_genre_pairs.append({
                    'imdb_id': imdb_id,
                    'genre_id': genre['id']
                })
    
    genre_film = pd.DataFrame(movie_genre_pairs).drop_duplicates()
    
    return genre_df, genre_film

def process_production_data(df):
    """Parse and process production company and country data."""
    # Parse production companies
    def parse_companies(x):
        try:
            if pd.isna(x):
                return []
            if isinstance(x, str):
                return ast.literal_eval(x)
            return []
        except (ValueError, SyntaxError):
            return []
    
    df['production_companies'] = df['production_companies'].apply(parse_companies)
    
    # Parse production countries
    def parse_countries(x):
        try:
            if pd.isna(x):
                return []
            if isinstance(x, str):
                return ast.literal_eval(x)
            return []
        except (ValueError, SyntaxError):
            return []
    
    df['production_countries'] = df['production_countries'].apply(parse_countries)
    
    return df

def create_production_tables(df):
    """Create normalized tables for production companies and countries."""
    # Production companies table
    all_companies = []
    for idx, row in df.iterrows():
        for company in row['production_companies']:
            if isinstance(company, dict):
                all_companies.append({
                    'company_id': company.get('id'),
                    'company_name': company.get('name')
                })
    companies_df = pd.DataFrame(all_companies).drop_duplicates()
    
    # Movie-company relationships
    movie_companies = []
    for idx, row in df.iterrows():
        imdb_id = row['imdb_id']
        for company in row['production_companies']:
            if isinstance(company, dict) and 'id' in company:
                movie_companies.append({
                    'imdb_id': imdb_id,
                    'company_id': company['id']
                })
    movie_companies_df = pd.DataFrame(movie_companies).drop_duplicates()
    
    # Production countries table
    all_countries = []
    for countries in df['production_countries']:
        for country in countries:
            if isinstance(country, dict):
                all_countries.append({
                    'country_code': country.get('iso_3166_1'),
                    'country_name': country.get('name')
                })
    countries_df = pd.DataFrame(all_countries).drop_duplicates()
    
    # Movie-country relationships
    movie_countries = []
    for idx, row in df.iterrows():
        imdb_id = row['imdb_id']
        for country in row['production_countries']:
            if isinstance(country, dict) and 'iso_3166_1' in country:
                movie_countries.append({
                    'imdb_id': imdb_id,
                    'country_code': country['iso_3166_1']
                })
    movie_countries_df = pd.DataFrame(movie_countries).drop_duplicates()
    
    return companies_df, movie_companies_df, countries_df, movie_countries_df

def create_final_tables(df):
    """Create the final analysis tables."""
    # Finance table
    finance_df = df[['imdb_id', 'budget', 'revenue']].drop_duplicates()
    
    # Films table
    films_df = df[['imdb_id', 'title', 'release_date']].drop_duplicates()
    films_df = films_df.rename(columns={'title': 'film_title'})
    
    # Convert release_date to string in ISO format (YYYY-MM-DD)
    films_df['release_date'] = films_df['release_date'].dt.strftime('%Y-%m-%d')
    
    # Film info table
    film_info_df = df[['imdb_id', 'id', 'runtime', 'original_language', 'overview']].drop_duplicates()
    film_info_df = film_info_df.rename(columns={'id': 'id_kaggle'})
    film_info_df['id_kaggle'] = pd.to_numeric(film_info_df['id_kaggle'], errors='coerce').astype('Int64')
    
    return finance_df, films_df, film_info_df

def save_tables(tables, output_dir='./'):
    """Save all tables to CSV files."""
    table_names = [
        'genre', 'genre_film', 
        'production_company', 'film_company',
        'production_country', 'film_country',
        'finance', 'films', 'film_info'
    ]
    
    for name, table in zip(table_names, tables):
        if name == 'films':
            table = table.copy()
            table['release_date'] = table['release_date'].dt.strftime('%Y-%m-%d')
        table.to_csv(f"{output_dir}{name}.csv", index=False)

def load_films_checked(csv_path):
    """Load films CSV with guaranteed datetime conversion and validation"""
    df = pd.read_csv(
        csv_path,
        parse_dates=['release_date'],
        date_format='%Y-%m-%d'
    )
    
    # Double-check conversion and handle any remaining issues
    df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')
    
    if not pd.api.types.is_datetime64_any_dtype(df['release_date']):
        raise ValueError("Failed to convert release_date to datetime")
    
    return df

def clean_movie_data(filepath, output_dir='./'):
    """Main function to clean and process movie data."""
    # Load and process data
    movies = load_data(filepath)
    movies = select_columns(movies)
    movies = clean_data_types(movies)
    movies = filter_data(movies)
    movies = process_genres(movies)
    movies = process_production_data(movies)
    
    # Create normalized tables
    genre_df, genre_film = create_genre_tables(movies)
    companies_df, movie_companies_df, countries_df, movie_countries_df = create_production_tables(movies)
    finance_df, films_df, film_info_df = create_final_tables(movies)
    
    # Convert release_date back to datetime after saving CSV
    films_df['release_date'] = pd.to_datetime(films_df['release_date'], errors='coerce')
    
    # Save all tables (with the updated films_df)
    all_tables = [
        genre_df, genre_film,
        companies_df, movie_companies_df,
        countries_df, movie_countries_df,
        finance_df, films_df, film_info_df
    ]
    
    save_tables(all_tables, output_dir)

    # VALIDATION: Verify the saved films.csv has proper datetime
    try:
        validated_films = load_films_checked(f"{output_dir}films.csv")
        print("✓ Successfully validated release_date conversion")
    except Exception as e:
        print(f"! Validation error: {str(e)}")
        raise
    
    return movies, all_tables


# Example usage:
cleaned_movies, tables = clean_movie_data("/raw_data/movies_metadata.csv")


✓ Successfully validated release_date conversion
