In [None]:
# python = 3.13
# dependencies = [
#     "dataclass-wizard>=0.35.1",
#     "ipykernel>=7.1.0",
#     "requests>=2.32.5",
#     "tqdm>=4.67.1",
# ]

import json
import re
import requests
import time
from urllib.parse import urlencode
from tqdm import tqdm
from dataclasses import dataclass
from dataclass_wizard import JSONWizard

In [2]:
with open('data.json') as f:
    data = json.load(f)

In [3]:
# taken from https://github.com/rickylawson/freekeys
TMDB_API_KEY = 'fb7bb23f03b6994dafc674c074d01761'

In [None]:
possible_season_endding_patterns = [
    r'([\(（]?(第)?\s?(\d+|[一二三四五六七八九十]+|FINAL|1st|2nd|3rd|\d+th)\s?(SEASON|Season|season|シリーズ|シーズン|クール|期|章|季|部)[\)）]?$)',
    r'([\(（]?(Season|SEASON|season|Part|part|PART|Volume|シーズン)\s?(\d+|[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+|II|III|IV|V|VI|VII|VIII|IX|X)[\)）]?$)',
]

aggressive_season_endding_patterns = [
    r'\d+$',
]

In [5]:
@dataclass
class TMDBItem(JSONWizard):
    backdrop_path: str
    original_language: str
    overview: str
    poster_path: str
    genre_ids: list[int]
    media_type: str = ''
    adult: bool = ''
    id: int = 0
    title: str = ''
    name: str = ''
    original_title: str = ''
    original_name: str = ''
    popularity: float = ''
    release_date: str = ''
    first_air_date: str = ''
    video: bool = ''
    vote_average: float = ''
    vote_count: int = 0

@dataclass
class TMDBResponse(JSONWizard):
    results: list[TMDBItem]
    page: int = 0
    total_pages: int = 0
    total_results: int = 0

In [46]:
last_search_at = 0
rate_limit_per_second = 10
rate_limit_interval = 1 / rate_limit_per_second

def tmdb_search(query, year=None, type='tv') -> TMDBResponse:
    global last_search_at
    elapsed = time.time() - last_search_at
    if elapsed < rate_limit_interval:
        time.sleep(rate_limit_interval - elapsed)
    last_search_at = time.time()

    types = {
        'movie': 'movie',
        'tv': 'tv',
        'ova': 'tv',
        'web': 'tv',
    }
    year_params = {
        'movie': 'primary_release_year',
        'tv': 'first_air_date_year',
    }

    tmdb_type = types[type]
    year_param_key = year_params[tmdb_type]

    params = {
        'query': query,
        'api_key': TMDB_API_KEY,
    }

    if year:
        params[year_param_key] = year

    url = f'https://api.themoviedb.org/3/search/{tmdb_type}?{urlencode(params)}'

    retry = True
    while retry:
        try:
            text = requests.get(url).text
        except requests.exceptions.RequestException as e:
            if 'UNEXPECTED_EOF_WHILE_READING' in e.__repr__():
                retry = True
                time.sleep(10)
            else:
                raise e
        else:
            retry = False
    
    resp = TMDBResponse.from_json(text)
    return resp, tmdb_type


In [None]:
def is_single_episode(bangumi):
    """请求 Bangumi API，判断是否为单集番剧"""
    bangumi_site = [site['id'] for site in bangumi['sites'] if site['site'] == 'bangumi']
    if not bangumi_site:
        return False
    bangumi_id = bangumi_site[0]
    resp = requests.get(f"https://next.bgm.tv/p1/subjects/{bangumi_id}").text
    obj = json.loads(resp)
    return obj.get('eps', -1) == 1

In [48]:
# 偶尔会出现超时错误，这里把数据独立出来，方便继续跑

title_to_tmdb = {}
no_match_titles = []
unsure_titles = []

In [61]:
progress = tqdm(enumerate(data['items']), total=len(data['items']), maxinterval=0.75)

for i, item in progress:
    title = item['title']
    type = item['type']
    begin_date = item['begin']
    year = int(begin_date[:4] or 1000)
    is_first_day = begin_date == f'{year}-01-01' # 一年的第一天
    is_last_day = begin_date == f'{year}-12-31' # 一年的最后一天

    # 去除可能的季数后缀
    modified_title = title
    for pattern in possible_season_endding_patterns:
        modified_title = re.sub(pattern, '', modified_title).strip()

    # 跳过已处理的标题
    skip_conditions = [
        modified_title in title_to_tmdb,
        title in no_match_titles,
        title in unsure_titles,
    ]
    if any(skip_conditions):
        continue

    # 先按年份搜索
    resp, tmdb_type = tmdb_search(modified_title, year=year, type=type)
    # 年初或年末的，尝试前后一年
    if resp.total_results == 0 and is_first_day:
        resp, tmdb_type = tmdb_search(modified_title, year=year - 1, type=type)
    if resp.total_results == 0 and is_last_day:
        resp, tmdb_type = tmdb_search(modified_title, year=year + 1, type=type)
    # 仍无结果，且为非电影的单集番剧，尝试作为电影搜索
    if resp.total_results == 0 and type != 'movie' and is_single_episode(item):
        resp, tmdb_type = tmdb_search(modified_title, year=year, type='movie')
    # 最后再试一次不带年份搜索
    if resp.total_results == 0:
        resp, tmdb_type = tmdb_search(modified_title, type=type)

    # 唯一结果
    if resp.total_results == 1:
        title_to_tmdb[modified_title] = f"{tmdb_type}/{resp.results[0].id}"
    # 多种结果
    elif resp.total_results > 1:
        unsure_titles.append(title)
    # 无结果
    else:
        no_match_titles.append(title)

    progress.set_postfix({'matched': len(title_to_tmdb), 'no_match': len(no_match_titles), 'unsure': len(unsure_titles)})
    progress.refresh()

    if i % 50 == 0 or i == len(data['items']) - 1:
        with open('matched_data.json', 'w') as f:
            json.dump({
                'title_to_tmdb': title_to_tmdb,
                'no_match_titles': no_match_titles,
                'unsure_titles': unsure_titles,
                'progress': i,
            }, f, ensure_ascii=False, indent=2)


100%|██████████| 8292/8292 [00:57<00:00, 143.73it/s, matched=6358, no_match=1208, unsure=329]  
