In [None]:
# Standard imports
import pandas as pd
import requests
import json
import time
import os
import re
from tqdm import tqdm
from sqlalchemy import create_engine, text

# Configuration
OMDB_API_KEY = "36dc668c"
MOVIES_CSV = "movies.csv"
RATINGS_CSV = "ratings.csv"
CACHE_FILE = "omdb_cache.json"
OUTPUT_DB = "movie_data.db"
API_SLEEP = 0.25
CACHE_SAVE_INTERVAL = 50


# Load CSVs
movies = pd.read_csv(r"C:\Users\shash\OneDrive\Desktop\etl\movies.csv")
ratings = pd.read_csv(r"C:\Users\shash\OneDrive\Desktop\etl\ratings.csv")

print("Movies columns:", movies.columns.tolist())
print(movies.head(3))
print("Ratings columns:", ratings.columns.tolist())
print(ratings.head(3))

# Load or initialize cache(api extraction)
if os.path.exists(CACHE_FILE):
    with open(CACHE_FILE, 'r', encoding='utf-8') as f:
        cache = json.load(f)
else:
    cache = {}

def cache_key(title, year):
    k = title.strip().lower()
    return f"{k}_{year}" if year is not None else k

def fetch_omdb(title, year=None, api_key=OMDB_API_KEY):
    key = cache_key(title, year)
    if key in cache:
        return cache[key]

    params = {"t": title, "apikey": api_key}
    if year:
        params['y'] = str(year)

    try:
        r = requests.get('http://www.omdbapi.com/', params=params, timeout=8)
        data = r.json()

        if data.get('Response') == 'False' and year:
            params.pop('y', None)
            r = requests.get('http://www.omdbapi.com/', params=params, timeout=8)
            data = r.json()
    except requests.exceptions.RequestException as e:
        data = {"Response": "False", "Error": str(e)}

    cache[key] = data

    if len(cache) % CACHE_SAVE_INTERVAL == 0:
        with open(CACHE_FILE, 'w', encoding='utf-8') as f:
            json.dump(cache, f, ensure_ascii=False, indent=2)

    time.sleep(API_SLEEP)
    return data

# Enrich movies
results = []
for _, row in tqdm(movies.iterrows(), total=len(movies)):
    title = row['title_clean']
    year = int(row['year']) if pd.notna(row['year']) else None
    data = fetch_omdb(title, year)

    results.append({
        'movieId': int(row['movieId']),
        'title': row['title'],
        'title_clean': title,
        'year': year,
        'director': data.get('Director'),
        'plot': data.get('Plot'),
        'box_office': data.get('BoxOffice'),
        'runtime': data.get('Runtime'),
        'released': data.get('Released'),
        'language': data.get('Language'),
        'actors': data.get('Actors'),
        'imdb_id': data.get('imdbID'),
        'genre_api': data.get('Genre')
    })

with open(CACHE_FILE, 'w', encoding='utf-8') as f:
    json.dump(cache, f, ensure_ascii=False, indent=2)

enriched = pd.DataFrame(results)
print('Enriched rows:', enriched.shape)
print(enriched.head(3))

# Transformation and Merging
def parse_runtime(rt):
    if pd.isna(rt):
        return None
    m = re.search(r"(\d+)", str(rt))
    return int(m.group(1)) if m else None

movies_enriched = movies.merge(enriched.drop(columns=['title']), on='movieId', how='left')
movies_enriched['genres_final'] = movies_enriched['genre_api'].fillna(movies_enriched['genres'])
movies_enriched['runtime_mins'] = movies_enriched['runtime'].apply(parse_runtime)

movies_table = movies_enriched[[
    'movieId', 'title', 'title_clean', 'year', 'director', 'plot', 'box_office',
    'runtime', 'runtime_mins', 'released', 'language', 'actors', 'imdb_id', 'genres_final'
]].rename(columns={'genres_final': 'genres'})

ratings_table = ratings[['userId', 'movieId', 'rating', 'timestamp', 'timestamp_converted']].copy()

print('Movies table shape:', movies_table.shape)
print('Ratings table shape:', ratings_table.shape)

# Load to SQLite
engine = create_engine(f"sqlite:///{OUTPUT_DB}")

movies_table.to_sql('movies', con=engine, if_exists='replace', index=False)
ratings_table.to_sql('ratings', con=engine, if_exists='replace', index=False)

print('Data loaded successfully into', OUTPUT_DB)

# Validate the SQLite Database
with engine.connect() as conn:
    print('Movies count:', conn.execute(text('SELECT COUNT(*) FROM movies')).scalar())
    print('Ratings count:', conn.execute(text('SELECT COUNT(*) FROM ratings')).scalar())
    display(pd.read_sql('SELECT * FROM movies LIMIT 5', conn))
