In [6]:
pip install webdriver-manager

Note: you may need to restart the kernel to use updated packages.


In [13]:
import os
import re
import time
import requests
import pandas as pd
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException

from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service
from dotenv import load_dotenv
load_dotenv()

CLIENT_ID = os.getenv("MAL_CLIENT_ID")
OUTPUT_CSV = "songs.csv"
BATCH_SIZE = 3
SLEEP_INTERVAL = 1
START_OFFSET = 1
MAX_OFFSET = 4
# so start offset to max_offset processed in batches of nbatch_size 
OUTPUT_MODE = "append"


class BotDetectionException(Exception):
    pass


def clean_text(text):
    if not text:
        return text
    cleaned = re.sub(r'[^\w\s]', ' ', text, flags=re.UNICODE)
    cleaned = re.sub(r'\s+', ' ', cleaned)
    return cleaned.strip()

def check_bot_detection(page_text):
    # anidb shows unban me with checkbox after being detected as bot so fot tgat
    keywords = ["unban me"]
    lower_text = page_text.lower()
    return any(keyword in lower_text for keyword in keywords)

def get_top_anime_batch(batch_size, start_offset):

    url = "https://api.myanimelist.net/v2/anime/ranking"
    anime_list = []
    params = {
        "ranking_type": "all",
        "limit": batch_size,
        "offset": start_offset,
        "fields": "genres,mean,rank,popularity,status,num_episodes,start_date,end_date,studios,source,broadcast,rating,duration,themes"
    }
    headers = {"X-MAL-CLIENT-ID": CLIENT_ID}
    try:
        response = requests.get(url, headers=headers, params=params)
        if response.status_code == 200:
            data = response.json()
            if "data" not in data or not data["data"]:
                print(f"Empty data response for offset {start_offset}.")
                return []
            for entry in data["data"]:
                node = entry["node"]
                anime_data = {
                    "MAL_ID": node.get("id"),
                    "MAL_Title": node.get("title"),
                    "MAL_Score": node.get("mean"),
                    "MAL_Rank": node.get("rank"),
                    "MAL_Popularity": node.get("popularity"),
                    "MAL_Status": node.get("status"),
                    "MAL_Genres": ", ".join([genre["name"] for genre in node.get("genres", [])]) if node.get("genres") else None,
                    "MAL_Num_Episodes": node.get("num_episodes"),
                    "MAL_Start_Date": node.get("start_date"),
                    "MAL_End_Date": node.get("end_date"),
                    "MAL_Studios": ", ".join([studio["name"] for studio in node.get("studios", [])]) if node.get("studios") else None,
                    "MAL_Source": node.get("source"),
                    "MAL_Broadcast": node.get("broadcast"),
                    "MAL_Rating": node.get("rating"),
                    "MAL_Duration": node.get("duration"),
                    "MAL_Themes": ", ".join([theme["name"] for theme in node.get("themes", [])]) if node.get("themes") else None,
                    "AniDB_Link": None  # to be filled from MAL page scraping
                }
                anime_list.append(anime_data)
        else:
            print(f"MAL API error at offset {start_offset} (status {response.status_code}).")
    except Exception as e:
        print(f"Exception during MAL API call at offset {start_offset}: {e}")
    time.sleep(SLEEP_INTERVAL)
    return anime_list

def get_anidb_link_from_mal_page(mal_driver, mal_url):
    try:
        mal_driver.get(mal_url)
        WebDriverWait(mal_driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
        page_source = mal_driver.page_source
        soup = BeautifulSoup(page_source, "html.parser")
        
        mal_title_jp = None
        mal_title_en = None
        title_container = soup.find("div", itemprop="name")
        if title_container:
            jp_tag = title_container.find("h1", class_=re.compile("title-name"))
            en_tag = title_container.find("p", class_=re.compile("title-english"))
            if jp_tag:
                mal_title_jp = clean_text(jp_tag.get_text(strip=True))
            if en_tag:
                mal_title_en = clean_text(en_tag.get_text(strip=True))
        if not mal_title_jp and soup.title:
            full_title = soup.title.get_text(strip=True)
            if "(" in full_title:
                parts = full_title.split(" - ")[0]
                match = re.match(r'^(.*?)\s*\((.*?)\)$', parts)
                if match:
                    mal_title_jp = clean_text(match.group(1).strip())
                    mal_title_en = clean_text(match.group(2).strip())
        

        anidb_link = None
        resources_header = soup.find("h2", string=lambda text: text and "Resources" in text)
        if resources_header:
            external_links_div = resources_header.find_next_sibling("div", class_="external_links")
            if external_links_div:
                anidb_a = external_links_div.find("a", href=lambda href: href and "anidb.net" in href)
                if anidb_a:
                    anidb_link = anidb_a.get("href")
                    if "perl-bin/animedb.pl" in anidb_link and "aid=" in anidb_link:
                        parsed = urlparse(anidb_link)
                        qs = parse_qs(parsed.query)
                        aid = qs.get("aid", [None])[0]
                        if aid:
                            anidb_link = f"https://anidb.net/anime/{aid}"
        
        return {'AniDB_Link': anidb_link, 'MAL_Title_JP': mal_title_jp, 'MAL_Title_EN': mal_title_en}
    except Exception as e:
        print(f"Error processing MAL page {mal_url}: {e}")
        return {'AniDB_Link': None, 'MAL_Title_JP': None, 'MAL_Title_EN': None}


def scrape_anidb_songs(ani_driver, url):

    try:
        ani_driver.get(url)
        WebDriverWait(ani_driver, 15).until(EC.presence_of_element_located((By.ID, "songlist")))
        page_source = ani_driver.page_source
        if check_bot_detection(page_source):
            raise BotDetectionException(f"Bot detection triggered on AniDB page: {url}")
    except TimeoutException:
        print(f"Timeout waiting for songlist on {url}.")
        return pd.DataFrame()
    except BotDetectionException:
        raise
    except Exception as e:
        print(f"Exception accessing AniDB page {url}: {e}")
        return pd.DataFrame()
    
    soup = BeautifulSoup(page_source, "html.parser")
    
    anime_header = soup.find("h1", class_="anime")
    anime_name = anime_header.get_text(strip=True).replace("Anime: ", "") if anime_header else "Unknown"
    
    anime_name = clean_text(anime_name)
    
    tags = []
    tags_row = soup.find("tr", class_=lambda c: c and "tags" in c)
    if not tags_row:
        tags_row = soup.find("tr", lambda tag: tag.name=="tr" and tag.find("th") and "Tags" in tag.find("th").get_text())
    if tags_row:
        tag_cells = tags_row.find_all("span", class_="tagname")
        tags = [clean_text(tag.get_text(strip=True)) for tag in tag_cells]
    
    table = soup.find("table", id="songlist")
    if table is None:
        print(f"Song table not found for {url}.")
        return pd.DataFrame()
    
    tbody = table.find("tbody")
    rows = tbody.find_all("tr")
    songs = []
    current_relation = ""
    song_map = {}
    
    for row in rows:
        rel_cell = row.find("td", class_="reltype")
        if rel_cell:
            current_relation = clean_text(rel_cell.get_text(strip=True))
        
        song_cell = row.find("td", class_="name song")
        if song_cell:
            song_name = clean_text(song_cell.get_text(strip=True))
            song_url = song_cell.find("a").get("href", "") if song_cell.find("a") else ""
            song_data = {
                "relation": current_relation,
                "song": song_name,
                "song_url": song_url,
                "episodes": "",
                "rating": None,
                "vocals": "",
                "lyrics": "",
                "composition": "",
                "arrangement": "",
                "chorus": "",
                "tags": ", ".join(tags),
                "anime": anime_name
            }
            eprange = row.find("td", class_="eprange")
            song_data["episodes"] = clean_text(eprange.get_text(strip=True)) if eprange else ""
            rating = row.find("td", class_="rating")
            if rating:
                rating_text = rating.get_text(strip=True)
                m = re.match(r"([\d\.]+)", rating_text)
                song_data["rating"] = float(m.group(1)) if m else None
            songs.append(song_data)
            song_map[song_name] = song_data
        
        credit_cell = row.find("td", class_="credit")
        creator_cell = row.find("td", class_="name creator")
        if credit_cell and creator_cell:
            credit_type = clean_text(re.sub(r'\s*\(.*?\)', '', credit_cell.get_text(strip=True)))
            creators = ", ".join([clean_text(a.get_text(strip=True)) for a in creator_cell.find_all("a")])
            if song_name in song_map:
                if "Vocals" in credit_type:
                    song_map[song_name]["vocals"] = creators
                elif "Lyrics" in credit_type:
                    song_map[song_name]["lyrics"] = creators
                elif "Music Composition" in credit_type:
                    song_map[song_name]["composition"] = creators
                elif "Music Arrangement" in credit_type:
                    song_map[song_name]["arrangement"] = creators
                elif "Chorus" in credit_type:
                    song_map[song_name]["chorus"] = creators
                    
    return pd.DataFrame(songs)

def append_to_csv(df, filename, write_header=False):
    mode = "w" if write_header else "a"
    df.to_csv(filename, mode=mode, index=False, header=write_header)


def main():
    if OUTPUT_MODE.lower() == "append" and os.path.exists(OUTPUT_CSV):
        first_batch = False
    else:
        first_batch = True

    offset = START_OFFSET
    total_batches_processed = 0
    total_songs_count = 0

    mal_options = webdriver.ChromeOptions()
    mal_options.add_argument("--headless")
    ani_options = webdriver.ChromeOptions()
    ani_options.add_argument("--headless")
    try:
        mal_driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()),options=mal_options)
        ani_driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()),options=ani_options)
    except Exception as e:
        print(f"Error initializing Selenium: {e}")
        return

    try:
        while offset < MAX_OFFSET:
            print(f"\n offset {offset} ")
            batch_anime = get_top_anime_batch(BATCH_SIZE, offset)
            if not batch_anime:
                print(" no more mal ")
                break

            batch_df = pd.DataFrame(batch_anime)
            enriched_records = []
            for _, row in batch_df.iterrows():
                mal_url = f"https://myanimelist.net/anime/{row['MAL_ID']}"
                try:
                    result = get_anidb_link_from_mal_page(mal_driver, mal_url)
                    row["AniDB_Link"] = result.get("AniDB_Link")
                    row["MAL_Title_JP"] = result.get("MAL_Title_JP")
                    row["MAL_Title_EN"] = result.get("MAL_Title_EN")
                    enriched_records.append(row)
                except Exception as e:
                    print(f"Error processing MAL page for anime ID {row['MAL_ID']}: {e}. Skipping.")
                    continue
                time.sleep(SLEEP_INTERVAL)
            enriched_df = pd.DataFrame(enriched_records)

            batch_songs = []
            for idx, row in enriched_df.iterrows():
                title_api = row["MAL_Title"]
                anidb_link = row.get("AniDB_Link")
                if not anidb_link:
                    print(f" '{title_api}'. anidb error .")
                    continue
                print(f" scraping songs for '{title_api}' from {anidb_link} ")
                
                retry_attempts = 0
                while True:
                    try:
                        songs_df = scrape_anidb_songs(ani_driver, anidb_link)
                        break
                    except Exception as e:
                        print(f" Error  '{title_api}': {e}.")
                        songs_df = pd.DataFrame()
                        break

                if songs_df.empty:
                    print(f" No songs for '{title_api}'.")
                else:
                    for col in ["MAL_ID", "MAL_Title", "MAL_Title_JP", "MAL_Title_EN",
                                "MAL_Score", "MAL_Rank", "MAL_Popularity", "MAL_Status",
                                "MAL_Genres", "MAL_Num_Episodes", "MAL_Start_Date", "MAL_End_Date",
                                "MAL_Studios", "MAL_Source", "MAL_Broadcast", "MAL_Rating",
                                "MAL_Duration", "MAL_Themes"]:
                        songs_df[col] = row.get(col, None)
                    songs_df["AniDB_Link"] = anidb_link
                    match = re.search(r'/anime/(\d+)', anidb_link)
                    songs_df["AniDB_ID"] = match.group(1) if match else None
                    batch_songs.append(songs_df)
                time.sleep(SLEEP_INTERVAL)
            
            non_empty_batch = [df for df in batch_songs if not df.empty]
            if non_empty_batch:
                combined_df = pd.concat(non_empty_batch, ignore_index=True)
                append_to_csv(combined_df, OUTPUT_CSV, write_header=first_batch)
                total_songs_count += len(combined_df)
                print(f"Appended batch of {len(combined_df)} songs to {OUTPUT_CSV}.")
                print(f"Total songs saved so far: {total_songs_count}")
                first_batch = False
            else:
                print(" No song data this batch.")
            
            offset += BATCH_SIZE
            total_batches_processed += 1

    except SystemExit as se:
        print(se)
    except Exception as e:
        print(f" error : {e}")
    finally:
        mal_driver.quit()
        ani_driver.quit()
        print("Selenium sessions closed.")
        print(f"Total batches processed: {total_batches_processed}")
        print(f"Total songs saved: {total_songs_count}")
        print("Processing completed.")

if __name__ == "__main__":
    main()



 offset 1 
 scraping songs for 'Fullmetal Alchemist: Brotherhood' from https://anidb.net/anime/6107 
 scraping songs for 'Steins;Gate' from https://anidb.net/anime/7729 
 scraping songs for 'Shingeki no Kyojin Season 3 Part 2' from https://anidb.net/anime/14444 


  combined_df = pd.concat(non_empty_batch, ignore_index=True)


Appended batch of 45 songs to songs.csv.
Total songs saved so far: 45
Selenium sessions closed.
Total batches processed: 1
Total songs saved: 45
Processing completed.


In [25]:
import pandas as pd

df = pd.read_csv("./songs.csv")

unique_titles_count = df['MAL_Title'].nunique()

print(f"Number of unique values in MAL_Title_EN: {unique_titles_count}")


Number of unique values in MAL_Title_EN: 3


In [27]:
import pandas as pd


df = pd.read_csv('songs.csv')


df_unique = df.drop_duplicates(subset=['MAL_Title', 'song_url', 'song'])

df_unique.to_csv('songs.csv', index=False)

df.to_csv("songs.csv")
