In [1]:
# 本项目主要用于测试对主项目图像爬取目标网站中视频数据的爬取功能.
# 本地测试虚拟环境为：kf_bf.

In [2]:
"""
这个版本的代码没有考虑代理服务器的问题。不适用代理，则原始网站可以访问，但视频下载可能会受到限制。
在开启代理服务，而没有修改代码的情况下，可能会导致下载失败，报错如下：
代码运行后出现如下错误：正在处理第 1 页: https://c4afcd.com/video/kaifang/
请求失败: https://c4afcd.com/video/kaifang/, 错误: HTTPSConnectionPool(host='c4afcd.com', port=443): Max retries exceeded with url: /video/kaifang/ (Caused by ProxyError('Unable to connect to proxy', OSError(0, 'Error')))


关闭代理服务器后，直接运行，则报错信息如下：
正在处理第 1 页: https://c4afcd.com/video/kaifang/
本页找到 28 个视频
处理视频: 炮友激情操逼特写小嘴吃jb，小时激情不断操，骑乘暴插多毛骚穴，娇喘呻吟爽翻
下载失败: https://d1.xia12345.com/video/202503/67dda0bdba8bde5fd5418e2b/hd.mp4, 错误: ('Connection aborted.', ConnectionResetError(10054, '远程主机强迫关闭了一个现有的连接。', None, 10054, None))
"""
import requests
import base64
import csv
import os
import re
import time
import random
from urllib.parse import urljoin
from bs4 import BeautifulSoup

# 配置参数
BASE_URL = "https://c4afcd.com/video/kaifang/"
OUTPUT_DIR = "../downloaded_videos"
CSV_FILE = "./video_list.csv"
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Referer": BASE_URL
}
# 延迟设置 (单位: 秒)
DELAY_BETWEEN_REQUESTS = (5, 10)  # 请求间的随机延迟范围
DELAY_BETWEEN_PAGES = (6, 8)         # 页面间的随机延迟范围

# 创建输出目录
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 解密函数 (模拟网页中的d函数)
def decode_title(encoded_str):
    try:
        # Base64解码
        decoded_bytes = base64.b64decode(encoded_str)
        # 转换为字符串 (使用正确的编码，这里通常是utf-8)
        decoded_str = decoded_bytes.decode('utf-8')
        return decoded_str
    except:
        return "解码失败"

# 获取页面内容
def get_page(url):
    try:
        # 添加随机延迟
        delay = random.uniform(*DELAY_BETWEEN_REQUESTS)
        time.sleep(delay)
        
        response = requests.get(url, headers=HEADERS, timeout=15)
        response.encoding = 'utf-8'
        
        # 检查响应状态
        if response.status_code != 200:
            print(f"请求失败: {url}, 状态码: {response.status_code}")
            return None
            
        return response.text
    except Exception as e:
        print(f"请求失败: {url}, 错误: {e}")
        return None

# 解析列表页获取视频链接和标题
def parse_list_page(html):
    soup = BeautifulSoup(html, 'html.parser')
    videos = []
    
    for dd in soup.select('dd'):
        a_tag = dd.find('a')
        if not a_tag:
            continue
            
        # 提取相对链接并转换为绝对URL
        rel_url = a_tag.get('href')
        if not rel_url:
            continue
        full_url = urljoin(BASE_URL, rel_url)
        
        # 提取加密标题
        script_tag = a_tag.find('script')
        if script_tag:
            # 从script内容中提取加密字符串
            script_text = script_tag.string
            if script_text:
                # 使用正则提取d('...')中的内容
                match = re.search(r"d\('([^']+)'\)", script_text)
                if match:
                    encoded_title = match.group(1)
                    title = decode_title(encoded_title)
                else:
                    # 如果无法提取加密标题，使用h3中的文本
                    title = a_tag.find('h3').get_text(strip=True)
            else:
                title = a_tag.find('h3').get_text(strip=True)
        else:
            title = a_tag.find('h3').get_text(strip=True)
        
        # 清理标题中的非法字符
        clean_title = re.sub(r'[\\/*?:"<>|]', '', title).strip()
        
        if clean_title and full_url:
            videos.append((clean_title, full_url))
    
    return videos

# 解析详情页获取视频地址
def parse_detail_page(html):
    soup = BeautifulSoup(html, 'html.parser')
    download_div = soup.find('div', class_='download')
    if download_div:
        input_tag = download_div.find('input', id='url')
        if input_tag:
            return input_tag.get('value')
    return None

# 下载视频
def download_video(url, filename):
    safe_filename = re.sub(r'[\\/*?:"<>|]', '', filename)
    filepath = os.path.join(OUTPUT_DIR, f"{safe_filename}.mp4")
    
    if os.path.exists(filepath):
        print(f"文件已存在: {safe_filename}")
        return filepath
    
    try:
        # 添加随机延迟
        delay = random.uniform(*DELAY_BETWEEN_REQUESTS)
        time.sleep(delay)
        
        with requests.get(url, stream=True, headers=HEADERS, timeout=60) as r:
            r.raise_for_status()
            
            # 获取文件大小
            total_size = int(r.headers.get('content-length', 0))
            downloaded_size = 0
            
            with open(filepath, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:  # 过滤掉保持连接的新块
                        f.write(chunk)
                        downloaded_size += len(chunk)
                        # 显示进度
                        if total_size > 0:
                            percent = downloaded_size / total_size * 100
                            print(f"下载中: {safe_filename} - {percent:.1f}%", end='\r')
            
            print(f"下载成功: {safe_filename} ({downloaded_size//1024}KB)")
            return filepath
    except Exception as e:
        print(f"下载失败: {url}, 错误: {e}")
        return None

# 主函数
def main():
    # 准备CSV文件
    with open(CSV_FILE, 'w', newline='', encoding='utf-8') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(['视频文件名', '视频地址'])
        
        # 处理前10页
        for page in range(1, 11):
            if page == 1:
                list_url = BASE_URL
            else:
                list_url = urljoin(BASE_URL, f"index_{page}.html")
            
            print(f"正在处理第 {page} 页: {list_url}")
            html = get_page(list_url)
            if not html:
                continue
                
            videos = parse_list_page(html)
            print(f"本页找到 {len(videos)} 个视频")
            
            for title, detail_url in videos:
                print(f"处理视频: {title}")
                
                # 获取详情页
                detail_html = get_page(detail_url)
                if not detail_html:
                    continue
                    
                # 提取视频地址
                video_url = parse_detail_page(detail_html)
                if not video_url:
                    print(f"未找到视频地址: {title}")
                    continue
                
                # 写入CSV
                csv_writer.writerow([title, video_url])
                csvfile.flush()  # 立即写入磁盘
                
                # 下载视频
                download_video(video_url, title)
            
            # 页面间延迟
            if page < 10:
                delay = random.uniform(*DELAY_BETWEEN_PAGES)
                print(f"等待 {delay:.1f} 秒后继续下一页...")
                time.sleep(delay)

if __name__ == '__main__':
    main()
    print("任务完成! 视频已保存到", OUTPUT_DIR)
    print("视频列表已保存到", CSV_FILE)

正在处理第 1 页: https://c4afcd.com/video/kaifang/


KeyboardInterrupt: 

In [4]:
import requests
import base64
import csv
import os
import re
import time
import random
import urllib3
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# 禁用安全警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# ======== 配置区域 ========
# 基础URL
BASE_URL = "https://c4afcd.com/video/kaifang/"
# 输出目录
OUTPUT_DIR = "../downloaded_videos"
# CSV文件名
CSV_FILE = "video_list.csv"
# 使用系统代理设置
USE_SYSTEM_PROXY = True  # 设置为True以使用系统代理设置
# =========================

# 请求头设置
HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Referer": BASE_URL,
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
    "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    "Connection": "keep-alive",
    "Upgrade-Insecure-Requests": "1",
    "DNT": "1"
}

# 延迟设置 (单位: 秒)
DELAY_BETWEEN_REQUESTS = (1.5, 3.0)
DELAY_BETWEEN_PAGES = (3, 6)

# 创建输出目录
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 创建带重试机制的会话
def create_session():
    session = requests.Session()
    
    # 配置重试策略
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "POST"]
    )
    
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    return session

# 初始化会话
session = create_session()

# 获取系统代理设置
def get_system_proxies():
    try:
        from urllib.request import getproxies
        proxies = getproxies()
        print(f"检测到系统代理设置: {proxies}")
        return proxies
    except:
        print("未检测到系统代理设置")
        return {}

# 解密函数 (模拟网页中的d函数)
def decode_title(encoded_str):
    try:
        # Base64解码
        decoded_bytes = base64.b64decode(encoded_str)
        # 转换为字符串 (使用正确的编码，这里通常是utf-8)
        decoded_str = decoded_bytes.decode('utf-8')
        return decoded_str
    except:
        return "解码失败"

# 获取页面内容
def get_page(url):
    try:
        # 添加随机延迟
        delay = random.uniform(*DELAY_BETWEEN_REQUESTS)
        time.sleep(delay)
        
        print(f"请求: {url}")
        
        # 根据设置选择是否使用代理
        if USE_SYSTEM_PROXY:
            proxies = get_system_proxies()
        else:
            proxies = {}
        
        response = session.get(
            url, 
            headers=HEADERS, 
            timeout=15,
            verify=False,  # 忽略SSL证书验证
            proxies=proxies
        )
        
        # 检查响应状态
        if response.status_code != 200:
            print(f"请求失败: {url}, 状态码: {response.status_code}")
            return None
            
        response.encoding = 'utf-8'
        return response.text
    except Exception as e:
        print(f"请求失败: {url}, 错误: {e}")
        return None

# 解析列表页获取视频链接和标题
def parse_list_page(html):
    if not html:
        return []
        
    try:
        soup = BeautifulSoup(html, 'html.parser')
        videos = []
        
        for dd in soup.select('dd'):
            a_tag = dd.find('a')
            if not a_tag:
                continue
                
            # 提取相对链接并转换为绝对URL
            rel_url = a_tag.get('href')
            if not rel_url:
                continue
            full_url = urljoin(BASE_URL, rel_url)
            
            # 提取加密标题
            script_tag = a_tag.find('script')
            title = ""
            if script_tag:
                # 从script内容中提取加密字符串
                script_text = script_tag.string
                if script_text:
                    # 使用正则提取d('...')中的内容
                    match = re.search(r"d\('([^']+)'\)", script_text)
                    if match:
                        encoded_title = match.group(1)
                        title = decode_title(encoded_title)
                    else:
                        # 如果无法提取加密标题，使用h3中的文本
                        h3_tag = a_tag.find('h3')
                        if h3_tag:
                            title = h3_tag.get_text(strip=True)
                else:
                    h3_tag = a_tag.find('h3')
                    if h3_tag:
                        title = h3_tag.get_text(strip=True)
            else:
                h3_tag = a_tag.find('h3')
                if h3_tag:
                    title = h3_tag.get_text(strip=True)
            
            # 清理标题中的非法字符
            clean_title = re.sub(r'[\\/*?:"<>|]', '', title).strip()
            
            if clean_title and full_url:
                videos.append((clean_title, full_url))
        
        return videos
    except Exception as e:
        print(f"解析列表页失败: {e}")
        return []

# 解析详情页获取视频地址
def parse_detail_page(html):
    if not html:
        return None
        
    try:
        soup = BeautifulSoup(html, 'html.parser')
        download_div = soup.find('div', class_='download')
        if download_div:
            input_tag = download_div.find('input', id='url')
            if input_tag:
                return input_tag.get('value')
        return None
    except Exception as e:
        print(f"解析详情页失败: {e}")
        return None

# 下载视频
def download_video(url, filename):
    safe_filename = re.sub(r'[\\/*?:"<>|]', '', filename)
    filepath = os.path.join(OUTPUT_DIR, f"{safe_filename}.mp4")
    
    if os.path.exists(filepath):
        print(f"文件已存在: {safe_filename}")
        return filepath
    
    try:
        # 添加随机延迟
        delay = random.uniform(*DELAY_BETWEEN_REQUESTS)
        time.sleep(delay)
        
        print(f"开始下载: {url}")
        
        # 根据设置选择是否使用代理
        if USE_SYSTEM_PROXY:
            proxies = get_system_proxies()
        else:
            proxies = {}
        
        with session.get(
            url, 
            stream=True, 
            headers=HEADERS, 
            timeout=60,
            verify=False,  # 忽略SSL证书验证
            proxies=proxies
        ) as r:
            r.raise_for_status()
            
            # 获取文件大小
            total_size = int(r.headers.get('content-length', 0))
            downloaded_size = 0
            
            with open(filepath, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:  # 过滤掉保持连接的新块
                        f.write(chunk)
                        downloaded_size += len(chunk)
                        # 显示进度
                        if total_size > 0:
                            percent = downloaded_size / total_size * 100
                            print(f"下载中: {safe_filename} - {percent:.1f}%", end='\r')
            
            print(f"\n下载成功: {safe_filename} ({downloaded_size//1024}KB)")
            return filepath
    except Exception as e:
        print(f"下载失败: {url}, 错误: {e}")
        return None

# 主函数
def main():
    # 准备CSV文件
    with open(CSV_FILE, 'w', newline='', encoding='utf-8') as csvfile:
        csv_writer = csv.writer(csvfile)
        csv_writer.writerow(['视频文件名', '视频地址'])
        
        # 处理前10页
        for page in range(1, 11):
            if page == 1:
                list_url = BASE_URL
            else:
                list_url = urljoin(BASE_URL, f"index_{page}.html")
            
            print(f"正在处理第 {page} 页: {list_url}")
            html = get_page(list_url)
            if not html:
                print(f"跳过第 {page} 页")
                continue
                
            videos = parse_list_page(html)
            print(f"本页找到 {len(videos)} 个视频")
            
            for i, (title, detail_url) in enumerate(videos, 1):
                print(f"处理视频 [{i}/{len(videos)}]: {title}")
                
                # 获取详情页
                detail_html = get_page(detail_url)
                if not detail_html:
                    print(f"跳过视频: {title}")
                    continue
                    
                # 提取视频地址
                video_url = parse_detail_page(detail_html)
                if not video_url:
                    print(f"未找到视频地址: {title}")
                    continue
                
                # 写入CSV
                csv_writer.writerow([title, video_url])
                csvfile.flush()  # 立即写入磁盘
                
                # 下载视频
                download_video(video_url, title)
            
            # 页面间延迟
            if page < 4:
                delay = random.uniform(*DELAY_BETWEEN_PAGES)
                print(f"等待 {delay:.1f} 秒后继续下一页...")
                time.sleep(delay)

if __name__ == '__main__':
    try:
        # 测试网络连接
        test_url = "https://www.baidu.com"
        print(f"测试网络连接: {test_url}")
        try:
            response = session.get(test_url, timeout=5, verify=False)
            print(f"网络连接测试成功! 状态码: {response.status_code}")
        except Exception as e:
            print(f"网络连接测试失败! 请检查网络设置: {e}")
            print("提示: 可能需要启用系统代理或VPN")
            exit(1)
        
        # 检查系统代理设置
        if USE_SYSTEM_PROXY:
            print("=" * 60)
            print("使用系统代理设置")
            print("=" * 60)
            print(f"检测到的系统代理: {get_system_proxies()}")
        else:
            print("=" * 60)
            print("不使用代理")
            print("=" * 60)
        
        main()
        print("任务完成! 视频已保存到", OUTPUT_DIR)
        print("视频列表已保存到", CSV_FILE)
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    except Exception as e:
        print(f"程序运行出错: {e}")

测试网络连接: https://www.baidu.com
网络连接测试成功! 状态码: 200
使用系统代理设置
检测到系统代理设置: {}
检测到的系统代理: {}
正在处理第 1 页: https://c4afcd.com/video/kaifang/
请求: https://c4afcd.com/video/kaifang/
检测到系统代理设置: {}
本页找到 28 个视频
处理视频 [1/28]: 炮友激情操逼特写小嘴吃jb，小时激情不断操，骑乘暴插多毛骚穴，娇喘呻吟爽翻
请求: https://c4afcd.com/html/202506/111982.html
检测到系统代理设置: {}
文件已存在: 炮友激情操逼特写小嘴吃jb，小时激情不断操，骑乘暴插多毛骚穴，娇喘呻吟爽翻
处理视频 [2/28]: 激情操逼特写小嘴吃jb，小时激情不断操，骑乘暴插多毛骚穴，娇喘呻吟爽翻
请求: https://c4afcd.com/html/202506/111981.html
检测到系统代理设置: {}
文件已存在: 激情操逼特写小嘴吃jb，小时激情不断操，骑乘暴插多毛骚穴，娇喘呻吟爽翻
处理视频 [3/28]: 激情啪啪，口活很棒让小哥吃奶舔逼舔菊花，各种体位无套抽插好刺激
请求: https://c4afcd.com/html/202506/111977.html
检测到系统代理设置: {}
文件已存在: 激情啪啪，口活很棒让小哥吃奶舔逼舔菊花，各种体位无套抽插好刺激
处理视频 [4/28]: 激情名场面，双屌轮流吃，唇钉嫩妹妹，穿上白丝骑乘位，自己上下动，侧入操嫩穴
请求: https://c4afcd.com/html/202506/111922.html
检测到系统代理设置: {}
文件已存在: 激情名场面，双屌轮流吃，唇钉嫩妹妹，穿上白丝骑乘位，自己上下动，侧入操嫩穴
处理视频 [5/28]: 漏奶情趣装，口交乳交大鸡巴，床上床下多体位蹂躏爆草，浪叫呻吟不止
请求: https://c4afcd.com/html/202506/111921.html
检测到系统代理设置: {}
文件已存在: 漏奶情趣装，口交乳交大鸡巴，床上床下多体位蹂躏爆草，浪叫呻吟不止
处理视频 [6/28]: 漂亮美眉身材娇小家具厂慰问工人小哥捉迷藏谁先找到有逼操否则就是看别人