In [29]:
import asyncio
import json
import time
import aiohttp
import aiofiles
import os
from dataclasses import dataclass, field
from typing import List, Optional
from bs4 import BeautifulSoup
import cloudscraper
from tqdm.notebook import tqdm
from tqdm import tqdm as sync_tqdm  # 保留同步进度条
import requests



@dataclass
class DownloadLink:
    """视频格式数据类"""
    download_url: str   # 视频下载链接
    suffix: str         # 视频后缀/格式（如mp4, flv）
    clarity: int        # 视频清晰度（整数，如720, 1080）

@dataclass
class Video:
    """视频数据类"""
    title: str                   # 视频标题
    url: str                     # 视频原始链接
    sources: List[DownloadLink] = field(default_factory=list)  # 视频格式列表
    
    def add_download_link(self, DownloadLink):
       self.sources.append(DownloadLink)
    
    def get_best_quality(self) -> Optional[DownloadLink]:
        """获取最高清晰度的视频格式"""
        return self.sources[0]

    def download(self, path: str, retries: int = 3):
        """同步下载方法"""
        if not self.sources[0]:
            print(f"未找到下载源")
            return
            
        return self._sync_download(path, self.sources[0], retries)
    
    async def download_async(self, path: str, retries: int = 3, position: int = 0):
        """异步下载方法，增加位置参数"""     
        if not self.sources:
            print(f"视频 {self.title} 未找到下载源")
            return
            
        return await self._async_download(path, self.sources[0], retries, position)

    def _sync_download(self, path: str, source: DownloadLink, retries: int):
        """同步下载实现"""
        target_folder = path
        os.makedirs(target_folder, exist_ok=True)
        file_name = f"{self.title}.{source.suffix}"
        full_path = os.path.join(target_folder, file_name)
        
        for attempt in range(retries):
            try:
                response = requests.get(
                    source.download_url, 
                    stream=True, 
                    timeout=(10, 60)  # 连接超时10秒，读取超时60秒
                )
                response.raise_for_status()
                
                total_size = int(response.headers.get('content-length', 0))
                progress_bar = sync_tqdm(
                    total=total_size, 
                    unit='B', 
                    unit_scale=True,
                    desc=f"下载 {file_name}",
                    unit_divisor=1024
                )
                
                with open(full_path, 'wb') as file:
                    for chunk in response.iter_content(chunk_size=8192):
                        if chunk:
                            file.write(chunk)
                            progress_bar.update(len(chunk))
                
                progress_bar.close()
                print(f"下载完成: {full_path}")
                return
                
            except Exception as e:
                if attempt < retries - 1:
                    print(f"下载失败 ({attempt+1}/{retries}), 重试中...: {str(e)}")
                    time.sleep(2 ** attempt)  # 指数退避
                else:
                    print(f"下载失败 ({attempt+1}/{retries}): {str(e)}")
                    return
    
    async def _async_download(self, path: str, source: DownloadLink, retries: int, position: int = 0):
        """增强版断点续传下载（处理Accept-Ranges:bytes但续传失败的情况）"""
        target_folder = path
        os.makedirs(target_folder, exist_ok=True)
        file_name = f"{self.title}.{source.suffix}"
        full_path = os.path.join(target_folder, file_name)
        temp_path = full_path + ".part"
        
        # 修复1: 下载前检查文件是否已完成
        if os.path.exists(full_path):
            existing_size = os.path.getsize(full_path)
            # 尝试获取服务器文件大小进行验证
            async with aiohttp.ClientSession() as session:
                try:
                    async with session.head(
                        source.download_url,
                        timeout=aiohttp.ClientTimeout(total=5)
                    ) as head_response:
                        head_response.raise_for_status()
                        total_size = int(head_response.headers.get('content-length', 0))
                        
                        # 文件已存在且大小匹配
                        if total_size > 0 and existing_size == total_size:
                            tqdm.write(f"文件已存在: {file_name}")
                            return {
                                "success": True,
                                "file_name": file_name,
                                "file_size": existing_size,
                                "message": f"文件已存在: {file_name}"
                            }
                except Exception:
                    # 无法验证服务器文件大小，但本地文件存在
                    tqdm.write(f"文件已存在但无法验证完整性: {file_name} 即将重新下载")
                    os.remove(full_path)
                    
        
        async with aiohttp.ClientSession() as session:
            for attempt in range(retries):
                try:
                    # 每次重试都重新获取状态
                    downloaded_size = 0
                    if os.path.exists(temp_path):
                        downloaded_size = os.path.getsize(temp_path)
                        tqdm.write(f"发现部分下载文件: {file_name} (已下载: {downloaded_size} bytes)")
                    
                    total_size = 0
                    accept_ranges = "none"
                    
                    # 获取服务器文件信息
                    async with session.head(
                        source.download_url,
                        timeout=aiohttp.ClientTimeout(total=10)
                    ) as head_response:
                        head_response.raise_for_status()
                        total_size = int(head_response.headers.get('content-length', 0))
                        accept_ranges = head_response.headers.get('Accept-Ranges', 'none').lower()
                        last_modified = head_response.headers.get('Last-Modified', '')
                        etag = head_response.headers.get('ETag', '')
                    
                    # 检查文件完整性
                    if downloaded_size > 0:
                        if total_size > 0 and downloaded_size >= total_size:
                            # 修复2: 确保重命名后删除临时文件
                            os.rename(temp_path, full_path)
                            if os.path.exists(temp_path + ".meta"):
                                os.remove(temp_path + ".meta")
                            return {
                                "success": True,
                                "file_name": file_name,
                                "file_size": downloaded_size,
                                "message": f"文件已完整: {file_name}"
                            }
                        
                        # 验证文件元数据是否匹配
                        if os.path.exists(temp_path + ".meta"):
                            with open(temp_path + ".meta", "r") as meta_file:
                                saved_meta = json.load(meta_file)
                                if saved_meta.get('size') != total_size or \
                                saved_meta.get('etag') != etag or \
                                saved_meta.get('last_modified') != last_modified:
                                    tqdm.write("文件元数据已变更，重新下载")
                                    os.remove(temp_path)
                                    os.remove(temp_path + ".meta")
                                    downloaded_size = 0
                    
                    # 准备Range请求
                    headers = {}
                    if downloaded_size > 0 and accept_ranges == 'bytes':
                        headers = {'Range': f'bytes={downloaded_size}-'}
                        if etag:
                            headers['If-Range'] = etag
                        elif last_modified:
                            headers['If-Range'] = last_modified
                    
                    # 执行下载
                    async with session.get(
                        source.download_url, 
                        timeout=aiohttp.ClientTimeout(total=180),
                        headers=headers,
                        raise_for_status=True
                    ) as response:
                        if response.status == 206:
                            content_range = response.headers.get('Content-Range', '')
                            if 'bytes' in content_range and '/' in content_range:
                                total_size = int(content_range.split('/')[-1])
                        
                        elif response.status == 200 and downloaded_size > 0:
                            tqdm.write("服务器忽略Range请求，重新下载完整文件")
                            if os.path.exists(temp_path):
                                os.remove(temp_path)
                            downloaded_size = 0
                        
                        # 创建进度条
                        with tqdm(
                            total=total_size if total_size > 0 else None,
                            unit='B', 
                            unit_scale=True,
                            desc=f"下载 {file_name[:30]}",
                            unit_divisor=1024,
                            miniters=1,
                            mininterval=0.1,
                            ascii=True,
                            dynamic_ncols=True,
                            position=position,
                            leave=False,
                            initial=downloaded_size,
                            bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]'
                        ) as progress_bar:
                            # 保存元数据
                            if attempt == 0 and (etag or last_modified):
                                with open(temp_path + ".meta", "w") as meta_file:
                                    json.dump({
                                        'size': total_size,
                                        'etag': etag,
                                        'last_modified': last_modified
                                    }, meta_file)
                            
                            # 写入文件
                            async with aiofiles.open(temp_path, 'ab' if downloaded_size > 0 else 'wb') as file:
                                async for chunk in response.content.iter_chunked(65536):
                                    await file.write(chunk)
                                    progress_bar.update(len(chunk))
                        
                        # 下载完成后处理
                        final_size = os.path.getsize(temp_path)
                        
                        # 修复3: 确保重命名后删除临时文件
                        os.rename(temp_path, full_path)
                        
                        # 清理元数据文件
                        if os.path.exists(temp_path + ".meta"):
                            os.remove(temp_path + ".meta")
                        
                        # 验证文件大小
                        if total_size > 0 and final_size != total_size:
                            tqdm.write(f"文件大小不匹配 ({final_size} vs {total_size})")
                            if os.path.exists(full_path):
                                os.remove(full_path)
                            if attempt < retries - 1:
                                tqdm.write(f"重试下载 ({attempt+1}/{retries})")
                                continue
                        
                        return {
                            "success": True,
                            "file_name": file_name,
                            "file_size": final_size,
                            "message": f"下载完成: {file_name}"
                        }
                    
                except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                    current_size = os.path.getsize(temp_path) if os.path.exists(temp_path) else 0
                    
                    if attempt < retries - 1:
                        wait_time = min(2 ** attempt, 30)
                        tqdm.write(f"下载失败 ({attempt+1}/{retries}), {wait_time}秒后重试: {str(e)}")
                        tqdm.write(f"已下载: {current_size} bytes")
                        await asyncio.sleep(wait_time)
                    else:
                        # 修复4: 失败时清理临时元数据文件
                        if os.path.exists(temp_path + ".meta"):
                            os.remove(temp_path + ".meta")
                        return {
                            "success": False,
                            "file_name": file_name,
                            "error": str(e),
                            "message": f"下载失败: {file_name}"
                        }

In [30]:

class HanimeDownloadManage:
    def __init__(self, url, download_root_dir='./download'):
        self.url = url
        self._download_root_dir = download_root_dir
        self.title = None
        self._download_dir = None
        self.player_list = []
        self.headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
                'Referer': 'https://hanime1.me/'
            }
        self.scraper = cloudscraper.create_scraper(
                browser={
                    'browser': 'chrome',
                    'platform': 'windows',
                    'desktop': True
                }
            )
    
    def __str__(self):
        """返回格式化的视频信息"""
        head_title = f"{'='*(54-len(self.title)//2)}❤{self.title}❤{'='*(54-len(self.title)//2)}\n" 
        info_list = [head_title]
        for index, video in enumerate(self.player_list, 1):
            # 标题部分
            title_str = f"{index}. 📺 视频标题: {video.title}\t"
            
            # 原始链接
            url_str = f"🌐 原始链接: {video.url}\n"
            
            # 下载源部分
            sources_str = ""
            if video.sources:
                sources_str = "\t📥 可用下载源:\n"
                
                # 为每个下载源添加序号和格式化的信息
                for i, source in enumerate(video.sources, 1):
                    sources_str += f"\t\t🔢 源 {i}: {"🏆 推荐" if i == 1 else "" }\n\t\t\turl: {source.download_url}\n\t\t\tfile type: {source.suffix}\n\t\t\tresolution: {source.clarity}p\n"
            else:
                sources_str = "\t\t⚠️ 无可用下载源\n"

            info_list.append(''.join([title_str, url_str, sources_str]))
        return '\n'.join(info_list)

    @property
    def download_dir(self):
        if self._download_dir is not None:
            return self._download_dir
        else:
            if self.title is None:
                return None
            else:
                self._download_dir = os.path.join(self._download_root_dir, self.title)
                return self._download_dir

    def update(self):
        # 发送请求
        response = self.scraper.get(self.url, headers=self.headers, timeout=20)
        response.raise_for_status()  # 检查HTTP错误

        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')

        # 解析标题
        title = soup.select_one('#video-playlist-wrapper > div.single-icon-wrapper.video-playlist-top > h4:nth-child(1)')
        if title:
            self.title = title.get_text(strip=True)

        # 查找播放列表（返回第一个匹配项）
        playlist_scroll = soup.select_one('#playlist-scroll')
        if playlist_scroll:
            target_elements = playlist_scroll.find_all('div', recursive=False)
            
            for element in target_elements:
                # 提取所需信息
                href = element.select_one('a.overlay')['href'],    # href 属性

                title = element.select_one('div.card-mobile-title').get_text(strip=True)  # title 属性
                self.player_list.append(Video(title=title, url=href[0]))
                
            self.player_list.reverse()
        if len(self.player_list) > 0:
            self._update_download_link()
        print(self)

    def _update_download_link(self):
        for video in self.player_list:
            # 发送请求
            response = self.scraper.get(video.url, headers=self.headers, timeout=20)
            response.raise_for_status()  # 检查HTTP错误
            # 解析HTML
            soup = BeautifulSoup(response.text, 'html.parser')
            plyr_list = soup.select('#player > source')

            for source in plyr_list:
                src = source.get('src')
                parts = source.get('type').split('/', 1)
                suffix = parts[1] if len(parts) > 1 else ""
                size = int(source.get('size'))
                video.add_download_link(DownloadLink(download_url=src, suffix=suffix, clarity=size))
            # 按 clarity 降序排序
            video.sources.sort(key=lambda x: x.clarity, reverse=True)

    def download(self):
        for video in self.player_list:
            video.download(self.download_dir)
        return None

    async def download_async(self, max_concurrent=2):
        """批量下载队列中的所有视频，控制并发数"""
        # 创建信号量控制并发数量
        semaphore = asyncio.Semaphore(max_concurrent)

        # 在下载管理器中添加锁
        async def download_with_position(video, position):
            async with semaphore:
                # 使用position确保在任务开始前输出信息不会混乱
                await video.download_async(self.download_dir, position=position)
                
        # 创建所有下载任务的协程列表并分配位置
        tasks = [
            download_with_position(video, index)
            for index, video in enumerate(self.player_list)
        ]
        
        # 运行所有任务
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        for index, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"文件 {self.player_list[index].title} 下载失败: {result}")
            else:
                print(f"文件 {self.player_list[index].title} 下载成功！")

将url替换为你要下载的视频链接，运行后即可开始下载该视频以及同系列下的所有视频。

In [31]:
hdm = HanimeDownloadManage('https://hanime1.me/watch?v=109872')
hdm.update()


1. 📺 视频标题: 住在隔壁的她	🌐 原始链接: https://hanime1.me/watch?v=109872
	📥 可用下载源:
		🔢 源 1: 🏆 推荐
			url: https://vdownload.hembed.com/109872-1080p.mp4?secure=LfR2PAAsjLs0u8X6y6czNg==,1751295854
			file type: mp4
			resolution: 1080p
		🔢 源 2: 
			url: https://vdownload.hembed.com/109872-720p.mp4?secure=6Wg4NfwX9oV9X_l-oreDUA==,1751295854
			file type: mp4
			resolution: 720p
		🔢 源 3: 
			url: https://vdownload.hembed.com/109872-480p.mp4?secure=s90jz9TIlqZAN3luW1FSkA==,1751295854
			file type: mp4
			resolution: 480p

2. 📺 视频标题: 被玷污的她	🌐 原始链接: https://hanime1.me/watch?v=109873
	📥 可用下载源:
		🔢 源 1: 🏆 推荐
			url: https://vdownload.hembed.com/109873-1080p.mp4?secure=JIE4p2zHNhbT9IWO99ZTBg==,1751295854
			file type: mp4
			resolution: 1080p
		🔢 源 2: 
			url: https://vdownload.hembed.com/109873-720p.mp4?secure=UAS-qGatZUfkb_13Px3UnQ==,1751295854
			file type: mp4
			resolution: 720p
		🔢 源 3: 
			url: https://vdownload.hembed.com/109873-480p.mp4?secure=OyjJFWqGd9w00Yk67lidIA==,1751295854
			file type: mp4
			

In [None]:

await hdm.download_async()