In [None]:
import pandas as pd
import random
import numpy as np
from typing import List, Dict, Any
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import quote

In [None]:
DATA_PATH = "../../../data/movies_to_scrape.csv"
pd.set_option("max_colwidth", None)
df = pd.read_csv(DATA_PATH)

In [None]:
class MovieRatingBatchProcessor:
    def __init__(self, df: pd.DataFrame, movie_name_column: str = 'Movie_name'):
        """
        Initialize the batch processor for fetching movie ratings.
        
        Args:
            df (pd.DataFrame): Input dataframe containing movie data
            movie_name_column (str): Name of the column containing movie titles
        """
        self.df = df.copy()
        self.movie_name_column = movie_name_column
        self.rating_column = 'Rating'
        self.batch_size = 50  # Default batch size
        #self.headers = {
        #    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        #    'Accept-Language': 'en-US,en;q=0.9'
        #}
        self.delay = random.uniform(3, 7)  # Longer random delay
    
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'
        ]
        self.headers = self._get_random_headers()

        # Initialize Rating column
        self.df[self.rating_column] = np.nan

    def _get_random_headers(self):
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept': 'text/html,application/xhtml+xml',
            'Cache-Control': 'max-age=0',
            'Connection': 'keep-alive'
        }
        
    def get_movie_rating(self, movie_title: str) -> float:
        """
        Gets IMDb rating for a movie with improved selectors and error handling.
        
        Args:
            movie_title (str): The title of the movie to search for
        
        Returns:
            float or np.nan: IMDb rating if found, np.nan if not found or error
        """
        try:
            if not isinstance(movie_title, str) or not movie_title.strip():
                return np.nan
                
            # Search for the movie
            search_url = f"https://www.imdb.com/find?q={quote(movie_title)}&s=tt&ttype=ft"

            time.sleep(self.delay)
            self.headers = self._get_random_headers()
        
            search_response = requests.get(search_url, headers=self.headers)
            
            if search_response.status_code != 200:
                print(f"Search failed for '{movie_title}' with status code: {search_response.status_code}")
                return np.nan
                
            search_soup = BeautifulSoup(search_response.text, 'html.parser')
            
            # Try multiple selector patterns for the first result
            first_result = (
                search_soup.find('a', href=lambda x: x and '/title/tt' in x) or
                search_soup.select_one('a[href*="/title/tt"]') or
                search_soup.find('a', {'data-testid': 'result-title'})
            )
            
            if not first_result:
                print(f"No results found for '{movie_title}'")
                return np.nan
                
            # Extract movie ID using regex
            movie_id_match = re.search(r'/title/(tt\d+)', first_result['href'])
            if not movie_id_match:
                print(f"Could not extract movie ID for '{movie_title}'")
                return np.nan
                
            movie_id = movie_id_match.group(1)
            movie_url = f"https://www.imdb.com/title/{movie_id}/"
            
            time.sleep(1)  # Respect rate limiting
            
            movie_response = requests.get(movie_url, headers=self.headers)
            if movie_response.status_code != 200:
                print(f"Movie page failed for '{movie_title}' with status code: {movie_response.status_code}")
                return np.nan
            
            movie_soup = BeautifulSoup(movie_response.text, 'html.parser')
            
            # Try multiple methods to find the rating
            rating = None
            
            # Method 1: Using data attributes
            rating_elem = movie_soup.find(['span', 'div'], {'data-testid': ['hero-rating-bar__aggregate-rating__score', 'rating']})
            
            # Method 2: Using JSON-LD data
            if not rating_elem:
                script_tag = movie_soup.find('script', {'type': 'application/ld+json'})
                if script_tag and 'aggregateRating' in script_tag.string:
                    import json
                    try:
                        json_data = json.loads(script_tag.string)
                        rating = json_data.get('aggregateRating', {}).get('ratingValue')
                    except json.JSONDecodeError:
                        pass
            
            # Method 3: Using common class patterns
            if not rating and not rating_elem:
                possible_selectors = [
                    'span.sc-bde20123-1',
                    'span.sc-7ab21ed2-1',
                    'span.ipc-rating-star--imdb',
                    '[class*="RatingScore"]',
                    '[class*="rating-bar__base"]'
                ]
                
                for selector in possible_selectors:
                    rating_elem = movie_soup.select_one(selector)
                    if rating_elem:
                        break
            
            if rating_elem:
                rating_text = rating_elem.text.strip()
                rating_match = re.search(r'(\d+\.?\d*)', rating_text)
                if rating_match:
                    rating = float(rating_match.group(1))
            
            return float(rating) if rating else np.nan
                
        except Exception as e:
            print(f"Error occurred for '{movie_title}': {str(e)}")
            return np.nan

    def fetch_rating_batch(self, movie_batch: pd.DataFrame) -> List[Dict[str, Any]]:
        """
        Fetch ratings for a batch of movies using IMDb scraping.
        
        Args:
            movie_batch (pd.DataFrame): Batch of movies to process
            
        Returns:
            List[Dict]: List of dictionaries containing movie indices and ratings
        """
        results = []
        
        for _, row in movie_batch.iterrows():
            try:
                movie_title = row[self.movie_name_column]
                rating = self.get_movie_rating(movie_title)
                    
                results.append({
                    'index': row.name,
                    'rating': rating,
                    'movie_name': movie_title
                })
                
            except Exception as e:
                print(f"Error in batch processing for movie '{row[self.movie_name_column]}': {str(e)}")
                results.append({
                    'index': row.name,
                    'rating': np.nan,
                    'movie_name': row[self.movie_name_column]
                })
                
        return results
    
    def process_in_batches(self, batch_size: int = None, save_interval = 1000) -> pd.DataFrame:
        """
        Process all movies in batches to fetch their ratings.
        
        Args:
            batch_size (int, optional): Size of each batch to process
            
        Returns:
            pd.DataFrame: Updated dataframe with filled rating values
        """
        if batch_size:
            self.batch_size = batch_size
            
        total_movies = len(self.df)
        print(f"Processing {total_movies} movies in batches of {self.batch_size}...")
        
        # Split into batches
        batches = [self.df.iloc[i:i + self.batch_size] 
                  for i in range(0, total_movies, self.batch_size)]
        
        # Process batches in parallel
        with ThreadPoolExecutor() as executor:
            all_results = []
            for batch_results in executor.map(self.fetch_rating_batch, batches):
                all_results.extend(batch_results)
                
                # Print progress
                processed = len(all_results)
                print(f"Progress: {processed}/{total_movies} movies processed "
                      f"({(processed/total_movies*100):.1f}%)")
        
        
        # Update the original dataframe
        successful_fetches = 0
        for result in all_results:
            if pd.notna(result['rating']):
                successful_fetches += 1
            self.df.loc[result['index'], self.rating_column] = result['rating']

        # Save intermediate results
        if processed % save_interval == 0:
            self.df.to_csv(DATA_PATH, index=False)
            print(f"Intermediate results saved to {DATA_PATH}")
            
        print(f"\nProcessing complete. Successfully fetched ratings for "
              f"{successful_fetches}/{total_movies} movies "
              f"({(successful_fetches/total_movies*100):.1f}%)")
        
        return self.df
    
    def get_processing_stats(self) -> Dict[str, Any]:
        """
        Get statistics about the rating processing results.
        
        Returns:
            Dict: Statistics about the processing
        """
        total_rows = len(self.df)
        successful_fetches = self.df[self.rating_column].notna().sum()
        failed_fetches = self.df[self.rating_column].isna().sum()
        
        return {
            'total_movies': total_rows,
            'successful_fetches': successful_fetches,
            'failed_fetches': failed_fetches,
            'success_rate': (successful_fetches / total_rows * 100)
        }

In [None]:
processor = MovieRatingBatchProcessor(df)
# Process in smaller batches due to rate limiting
results_df = processor.process_in_batches(batch_size=10)
stats = processor.get_processing_stats()

print("\nProcessing Stats:")
print(f"Total movies: {stats['total_movies']}")
print(f"Successful fetches: {stats['successful_fetches']}")
print(f"Failed fetches: {stats['failed_fetches']}")
print(f"Success rate: {stats['success_rate']:.1f}%")

Processing 62001 movies in batches of 10...


No results found for 'Otaku no Video'
Progress: 10/62001 movies processed (0.0%)
Progress: 20/62001 movies processed (0.0%)
Progress: 30/62001 movies processed (0.0%)
Progress: 40/62001 movies processed (0.1%)
Progress: 50/62001 movies processed (0.1%)
Progress: 60/62001 movies processed (0.1%)
Progress: 70/62001 movies processed (0.1%)
No results found for 'Mars Needs Women'
Progress: 80/62001 movies processed (0.1%)
Progress: 90/62001 movies processed (0.1%)
Progress: 100/62001 movies processed (0.2%)
Progress: 110/62001 movies processed (0.2%)
Progress: 120/62001 movies processed (0.2%)
Progress: 130/62001 movies processed (0.2%)
Progress: 140/62001 movies processed (0.2%)
Progress: 150/62001 movies processed (0.2%)
Progress: 160/62001 movies processed (0.3%)
Progress: 170/62001 movies processed (0.3%)
Progress: 180/62001 movies processed (0.3%)
Progress: 190/62001 movies processed (0.3%)
Progress: 200/62001 movies processed (0.3%)
Progress: 210/62001 movies processed (0.3%)
Progres

Successfully fetched ratings for 540/2000 movies (27.0%). 

For a sample of 2000 movies, time taken = 5min6s
Processing complete. Successfully fetched ratings for 678/2000 movies (33.9%)

Processing Stats:
Total movies: 2000
Successful fetches: 678
Failed fetches: 1322
Success rate: 33.9%

For modified code, taking 23min17s, for sample of 2000: (batch size = 5 and self.delay = random.uniform(3, 7)  # Longer random delay)
Processing complete. Successfully fetched ratings for 1913/2000 movies (95.7%)

Processing Stats:
Total movies: 2000
Successful fetches: 1913
Failed fetches: 87
Success rate: 95.7%

for batch_size = 10, 22min3s, Successfully fetched ratings for 1913/2000 movies (95.7%)

In [None]:
# Overwrite the original DataFrame
results_df.head(10)

In [None]:
df = results_df
 
df.to_csv(DATA_PATH, index=False)