In [1]:
import re
import pandas as pd
from tqdm import tqdm
from time import sleep
from bs4 import BeautifulSoup as bs
from playwright.sync_api import sync_playwright

In [2]:
def anime_season(month: str) -> str:
    """
    This function converts a given month (as a string) into its corresponding season.

    Parameters:
    - month (str): A string representing the month in the format 'MM'. The valid values are '01' to '12'.

    Returns:
    - str: A string representing the season. The possible values are 'Winter', 'Spring', 'Summer', 'Fall', or 'Unspecified' if the input month is not within the range of 1 to 12.
    """
    month_num = int(month)
    seasons = ["Winter", "Spring", "Summer", "Fall"]
    return seasons[(month_num - 1) // 3] if 1 <= month_num <= 12 else "Unspecified"

In [3]:
def scrape_anime_data(anime_item) -> dict[str, str]:
    """
    Scrape anime data from the HTML content.
    
    Args:
    - anime_item (bs4.element.Tag): The BeautifulSoup object representing an anime item.
    
    Returns:
    - dict: A dictionary containing scraped data about the anime.
    """
    
    title = anime_item.find('a', class_='link-title').text.strip()
    
    voters_text = anime_item.find('div', class_='scormem-item member')
    voters = int(voters_text.text.strip().replace(',', '')) if voters_text else 'N/A'
    
    avg_score_text = anime_item.find('div', title='Score')
    avg_score = float(avg_score_text.text.strip()) if avg_score_text else 'N/A'
    
    start_date_text = anime_item.find('span', class_='item')
    start_date = start_date_text.text.strip() if start_date_text else 'N/A'
    
    status_text = anime_item.find('span', class_='item finished') or anime_item.find('span', class_='item airing')
    status = status_text.text.strip() if status_text else 'N/A'
    
    studio_text = anime_item.find('span', class_='producer')
    studio = studio_text.text.strip() if studio_text else 'N/A'
    
    genres = ', '.join([genre.text.strip() for genre in anime_item.find_all('span', class_='genre')]) if anime_item.find_all('span', class_='genre') else 'N/A'
    
    media_text = anime_item.find('span', class_='type')
    media = media_text.text.strip() if media_text else 'N/A'

    status_text = anime_item.find('span', class_='status')
    status = status_text.text.strip() if status_text else 'N/A'

    eps_text = anime_item.find('span', class_='eps')
    eps = eps_text.text.strip().split()[0] if eps_text else 'N/A'

    duration_text = anime_item.find('span', class_='duration')
    duration = duration_text.text.strip().split()[0] if duration_text else 'N/A'
    
    synopsis_text = anime_item.find('p', class_='preline')
    synopsis = synopsis_text.text.strip() if synopsis_text else 'N/A'
    
    return {
        'Title': title,
        'Voters': voters,
        'Avg Score': avg_score,
        'Start Date': start_date,
        'Status': status,
        'Studio': studio,
        'Genres': genres,
        'Media': media,
        'Status': status,
        'Eps': eps,
        'Duration': duration,
        'Synopsis': synopsis
    }

In [7]:
def playwright_scraper(url: str, last: int) -> list[dict[str, str]]:
    """
    This function uses Playwright to scrape anime data from a specified URL and its subsequent pages.
    
    -----
    Parameters:
    - url (str): The URL of the anime list to scrape.
    - last (int): The last page number to scrape.
    ----
    Returns:
    - list: A list of dictionaries, where each dictionary represents the scraped data of an anime.
    ----
    The function first launches a Chromium browser using Playwright and navigates to the specified URL. It then iterates through the pages from 1 to the specified last page, scraping anime data from each page. The scraped data is appended to a container list. Finally, the function closes the browser and returns the container list.
    """
    container = []

    with sync_playwright() as p:
        try:
            browser = p.chromium.launch(headless=True)  # Run in headless mode for efficiency
            page = browser.new_page()
            page.goto(url)

            data_name = page.inner_text('.h1').split()[0]
            print(f'Scraping data from {data_name}...')

            # Use tqdm to display progress bar for page processing
            for page_num in tqdm(range(1, last + 1), desc='Processing Pages', unit='page'):
                page_url = f'{url}?page={page_num}'
                
                try:
                    page.goto(page_url, wait_until='networkidle')
                    if page.query_selector('.error404'):
                        print(f'Page {page_num} of {data_name} does not exist.')
                        break

                    anime_list = page.query_selector_all('.js-anime-category-producer')
                    for anime_item in anime_list:
                        container.append(scrape_anime_data(anime_item))
                
                except Exception as e:
                    print(f'Error processing page {page_num}: {e}')
                    break

        except Exception as e:
            print(f'Error initializing Playwright: {e}')
        
        finally:
            browser.close()

    return container

In [5]:
def modeler(date: str, data: list[dict[str, str]]) -> None:
    """
    Processes and saves anime data to a CSV file.

    -----
    Parameters:
    - date (str): The date string used to name the CSV file.
    - data (List[Dict[str, str]]): A list of dictionaries containing anime data.

    -----
    The function converts the list of dictionaries to a DataFrame, removes duplicate entries,
    and saves the DataFrame to a CSV file in the 'data/processed' directory.
    """
    df = pd.DataFrame(data)
    df.drop_duplicates(inplace=True)
    file_path = f'data/processed/AnimeData_{date}.csv'
    df.to_csv(file_path, index=False)
    print(f'Data saved to {file_path}')

In [9]:
from datetime import datetime

# Preparation
date = datetime.now().strftime('%d%m%y')

# EXTRACT AND TRANSFORM
url_list = [   
    'https://myanimelist.net/anime/genre/1/',  # Action
    'https://myanimelist.net/anime/genre/2/',  # Adventure
    'https://myanimelist.net/anime/genre/5/',  # Avant Garde
    'https://myanimelist.net/anime/genre/4/',  # Comedy
    'https://myanimelist.net/anime/genre/8/',  # Drama
    'https://myanimelist.net/anime/genre/10/', # Fantasy
    'https://myanimelist.net/anime/genre/47/', # Gourmet
    'https://myanimelist.net/anime/genre/14/', # Horror
    'https://myanimelist.net/anime/genre/7/',  # Mystery
    'https://myanimelist.net/anime/genre/22/', # Romance
    'https://myanimelist.net/anime/genre/24/', # Sci-fi
    'https://myanimelist.net/anime/genre/36/', # Slice-of-life
    'https://myanimelist.net/anime/genre/30/', # Sport
    'https://myanimelist.net/anime/genre/37/', # Supernatural
    'https://myanimelist.net/anime/genre/41/'  # Suspense
]

if __name__ == '__main__':
    all_data = []
    for url in url_list:
        all_data.extend(playwright_scraper(url, 100))
    
    modeler(date, all_data)

Error: It looks like you are using Playwright Sync API inside the asyncio loop.
Please use the Async API instead.