In [1]:
import os
import json
import textwrap

# 创建项目文件夹结构 - 修正为平级目录
folders = [
    'crawler',
    'fixtures',  # 测试用的HTML文件
    'tests',     # 测试代码文件
    'outputs'
]

for folder in folders:
    os.makedirs(folder, exist_ok=True)
    print(f"创建文件夹: {folder}")

# 创建必要的文件
files_to_create = {
    'requirements.txt': 'aiohttp==3.9.0\naiosqlite==0.19.0\npandas==2.0.3\nplotly==5.15.0\nbeautifulsoup4==4.12.2\nhttpx==0.24.1\nnest_asyncio==1.5.8',
    'crawler/__init__.py': '# 爬虫包初始化',
    'README.md': '# Async MiniCrawler\n\n基于Jupyter Notebook的异步爬虫项目'
}

for file_path, content in files_to_create.items():
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(content)
    print(f"创建文件: {file_path}")

print(" 项目结构创建完成！")

# 创建测试fixtures - 放在fixtures目录（不是tests/fixtures）
test_fixtures = {
    'fixtures/quotes_page1.html': '''<!DOCTYPE html>
<html>
<head>
    <title>Quotes to Scrape</title>
    <meta charset="UTF-8">
</head>
<body>
    <div class="quote">
        <span class="text">"The best way to predict the future is to invent it."</span>
        <span>by <small class="author">Alan Kay</small></span>
        <div class="tags">
            Tags: 
            <a class="tag" href="/tag/future/">future</a>
            <a class="tag" href="/tag/invention/">invention</a>
        </div>
    </div>
    <nav>
        <ul class="pager">
            <li class="next">
                <a href="/page/2/">Next &rarr;</a>
            </li>
        </ul>
    </nav>
</body>
</html>''',
    
    'fixtures/books_page1.html': '''<!DOCTYPE html>
<html>
<head>
    <title>All products | Books to Scrape</title>
    <meta charset="UTF-8">
</head>
<body>
    <ul class="breadcrumb">
        <li><a href="index.html">Home</a></li>
        <li class="active">Books</li>
    </ul>
    
    <article class="product_pod">
        <div class="image_container">
            <a href="a-light-in-the-attic_1000/index.html"><img src="media/cache/2c/da/2cdad67c44b002e7ead0cc35693c0e8b.jpg" alt="A Light in the Attic" class="thumbnail"></a>
        </div>
        <h3><a href="a-light-in-the-attic_1000/index.html" title="A Light in the Attic">A Light in the Attic</a></h3>
        <div class="product_price">
            <p class="price_color">&pound;51.77</p>
            <p class="instock availability">
                <i class="icon-ok"></i>
                In stock
            </p>
        </div>
        <p class="star-rating Three">
            <i class="icon-star"></i>
            <i class="icon-star"></i>
            <i class="icon-star"></i>
            <i class="icon-star"></i>
            <i class="icon-star"></i>
        </p>
    </article>
    
    <li class="next">
        <a href="page-2.html">next</a>
    </li>
</body>
</html>'''
}

for file_path, content in test_fixtures.items():
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(content)
    print(f"创建测试文件: {file_path}")

# 创建CLI命令行接口
files_to_create['crawl.py'] = '''#!/usr/bin/env python3
"""
Async MiniCrawler 命令行接口
"""

import asyncio
import argparse
from main import run_crawler

async def main():
    parser = argparse.ArgumentParser(description='Async MiniCrawler - 异步网页爬虫')
    subparsers = parser.add_subparsers(dest='command', help='命令')
    
    # run 命令
    run_parser = subparsers.add_parser('run', help='运行爬虫')
    run_parser.add_argument('--site', type=str, choices=['quotes', 'books'], 
                          default='quotes', help='要爬取的站点 (quotes 或 books)')
    run_parser.add_argument('--concurrency', type=int, default=5, 
                          help='并发数 (默认: 5)')
    run_parser.add_argument('--max-pages', type=int, default=50, 
                          help='最大页面数 (默认: 50)')
    run_parser.add_argument('--delay', type=float, default=1.0, 
                          help='请求延迟 (默认: 1.0秒)')
    
    args = parser.parse_args()
    
    if args.command == 'run':
        print(f"开始爬取 {args.site} 站点")
        print(f"配置: 并发数={args.concurrency}, 最大页面={args.max_pages}, 延迟={args.delay}s")
        
        data, stats = await run_crawler(
            site=args.site,
            concurrency=args.concurrency,
            max_pages=args.max_pages,
            delay=args.delay
        )
        
        print(f"爬虫完成! 共收集 {len(data)} 条数据")
        print(f"统计信息: {stats['successful_pages']} 成功, {stats['failed_pages']} 失败")
        
    else:
        parser.print_help()

if __name__ == '__main__':
    asyncio.run(main())
'''

# 创建测试文件 - 修正路径引用
test_files = {
    'tests/test_crawler.py': '''#!/usr/bin/env python3
"""
爬虫测试文件 - 测试解析功能
"""

import asyncio
import os
import sys

# 修正路径导入
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
    sys.path.insert(0, parent_dir)

try:
    from main import PageParser, AsyncCrawler
except ImportError as e:
    print(f"导入错误: {e}")
    # 在Jupyter中运行时可能需要重新定义类
    print("在Jupyter环境中运行测试...")

def test_parse_quotes():
    """测试名言页面解析"""
    print("测试名言页面解析...")
    
    # 使用绝对路径确保能找到文件（现在fixtures在项目根目录）
    fixture_path = os.path.join(parent_dir, 'fixtures', 'quotes_page1.html')
    
    with open(fixture_path, 'r', encoding='utf-8') as f:
        html = f.read()
    
    parser = PageParser()
    quotes, next_url = parser.parse_quotes(html, 'http://test.com')
    
    # 验证解析结果
    assert len(quotes) == 1, f"应该解析出1条名言，实际得到: {len(quotes)}"
    assert quotes[0]['author'] == 'Alan Kay', f"作者应该是Alan Kay，实际是: {quotes[0]['author']}"
    assert 'future' in quotes[0]['tags'], f"应该包含future标签，实际标签: {quotes[0]['tags']}"
    assert 'invention' in quotes[0]['tags'], f"应该包含invention标签，实际标签: {quotes[0]['tags']}"
    assert next_url == 'http://test.com/page/2/', f"下一页URL应该是http://test.com/page/2/，实际是: {next_url}"
    
    # 验证名言文本
    expected_text = '"The best way to predict the future is to invent it."'
    assert quotes[0]['text'] == expected_text, f"名言文本不匹配，期望: {expected_text}，实际: {quotes[0]['text']}"
    
    print("✓ 名言解析测试通过")
    return quotes

def test_parse_books():
    """测试图书页面解析"""
    print("测试图书页面解析...")
    
    # 使用绝对路径确保能找到文件（现在fixtures在项目根目录）
    fixture_path = os.path.join(parent_dir, 'fixtures', 'books_page1.html')
    
    with open(fixture_path, 'r', encoding='utf-8') as f:
        html = f.read()
    
    parser = PageParser()
    books, next_url = parser.parse_books(html, 'http://test.com')
    
    # 验证解析结果
    assert len(books) == 1, f"应该解析出1本图书，实际得到: {len(books)}"
    assert books[0]['title'] == 'A Light in the Attic', f"书名应该是A Light in the Attic，实际是: {books[0]['title']}"
    assert books[0]['price'] == '£51.77', f"价格应该是£51.77，实际是: {books[0]['price']}"
    assert books[0]['stock'] == 'In stock', f"库存状态应该是In stock，实际是: {books[0]['stock']}"
    assert books[0]['rating'] == 'Three', f"评分应该是Three，实际是: {books[0]['rating']}"
    assert next_url == 'http://test.com/page-2.html', f"下一页URL应该是http://test.com/page-2.html，实际是: {next_url}"
    
    print("✓ 图书解析测试通过")
    return books

def test_parse_empty_html():
    """测试空HTML解析"""
    print("测试空HTML解析...")
    
    parser = PageParser()
    
    # 测试空HTML
    quotes, next_url = parser.parse_quotes('', 'http://test.com')
    assert len(quotes) == 0, "空HTML应该返回0条名言"
    assert next_url is None, "空HTML应该返回None下一页URL"
    
    books, next_url = parser.parse_books('', 'http://test.com')
    assert len(books) == 0, "空HTML应该返回0本图书"
    assert next_url is None, "空HTML应该返回None下一页URL"
    
    print("✓ 空HTML解析测试通过")

def test_parse_invalid_html():
    """测试无效HTML解析"""
    print("测试无效HTML解析...")
    
    parser = PageParser()
    
    # 测试无效HTML
    invalid_html = '<div>Invalid HTML content</div>'
    quotes, next_url = parser.parse_quotes(invalid_html, 'http://test.com')
    assert len(quotes) == 0, "无效HTML应该返回0条名言"
    assert next_url is None, "无效HTML应该返回None下一页URL"
    
    books, next_url = parser.parse_books(invalid_html, 'http://test.com')
    assert len(books) == 0, "无效HTML应该返回0本图书"
    assert next_url is None, "无效HTML应该返回None下一页URL"
    
    print("✓ 无效HTML解析测试通过")

def test_parse_html_with_missing_elements():
    """测试缺失元素的HTML解析"""
    print("测试缺失元素的HTML解析...")
    
    parser = PageParser()
    
    # 测试缺失重要元素的HTML
    partial_html = ('<div class="quote">'
                   '<span class="text">"Test quote"</span>'
                   '<!-- 故意缺少author元素 -->'
                   '</div>')
    
    quotes, next_url = parser.parse_quotes(partial_html, 'http://test.com')
    assert len(quotes) == 0, "缺失author元素应该返回0条名言"
    
    print("✓ 缺失元素解析测试通过")

async def test_crawler_initialization():
    """测试爬虫初始化"""
    print("测试爬虫初始化...")
    
    # 测试quotes爬虫初始化
    quotes_crawler = AsyncCrawler('quotes', concurrency=2, max_pages=5, delay=1.0)
    assert quotes_crawler.site == 'quotes'
    assert quotes_crawler.base_url == 'https://quotes.toscrape.com'
    assert quotes_crawler.concurrency == 2
    assert quotes_crawler.max_pages == 5
    assert quotes_crawler.delay == 1.0
    
    # 测试books爬虫初始化
    books_crawler = AsyncCrawler('books', concurrency=3, max_pages=10, delay=2.0)
    assert books_crawler.site == 'books'
    assert books_crawler.base_url == 'https://books.toscrape.com'
    assert books_crawler.concurrency == 3
    assert books_crawler.max_pages == 10
    assert books_crawler.delay == 2.0
    
    print("✓ 爬虫初始化测试通过")

def simple_test():
    """简化测试 - 用于在Notebook中直接运行"""
    print("运行简化测试...")
    try:
        test_parse_quotes()
        test_parse_books()
        print(" 基本解析测试通过")
        return True
    except Exception as e:
        print(f" 测试失败: {e}")
        return False

if __name__ == '__main__':
    print("开始运行爬虫测试...\\n")
    
    try:
        test_parse_quotes()
        test_parse_books()
        test_parse_empty_html()
        test_parse_invalid_html()
        test_parse_html_with_missing_elements()
        asyncio.run(test_crawler_initialization())
        
        print("\\n 所有测试通过!")
        
    except Exception as e:
        print(f"\\n 测试失败: {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)
''',
    
    'tests/__init__.py': '# 测试包初始化',
    
    'tests/conftest.py': '''"""
Pytest 配置文件
"""
import sys
import os

# 添加项目根目录到Python路径
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
if parent_dir not in sys.path:
    sys.path.insert(0, parent_dir)
'''
}

# 创建测试文件
for file_path, content in test_files.items():
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(content)
    print(f"创建测试文件: {file_path}")

# 创建crawl.py文件
crawl_py_content = files_to_create['crawl.py']
with open('crawl.py', 'w', encoding='utf-8') as f:
    f.write(crawl_py_content)
print(f"创建文件: crawl.py")


In [2]:
!pip install aiohttp aiosqlite pandas plotly beautifulsoup4 httpx nest_asyncio -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
!pip install aiohttp --verbose

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Using pip 22.3.1 from D:\Anaconda3_\envs\pytorch\lib\site-packages\pip (python 3.7)


In [3]:
import asyncio
import aiohttp
import aiosqlite
import pandas as pd
import plotly.express as px
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import hashlib
import json
import csv
from datetime import datetime
import time
import nest_asyncio
from urllib.robotparser import RobotFileParser
import os

nest_asyncio.apply()
print("所有库导入成功")

所有库导入成功


In [4]:
#URL数据库类
class URLDatabase:
    def __init__(self, db_path=':memory:'):
        self.db_path = db_path
        self.conn = None
        
    async def init_db(self):
        self.conn = await aiosqlite.connect(self.db_path)
        # 先删除旧表（如果存在），确保表结构正确
        await self.conn.execute('DROP TABLE IF EXISTS seen_urls')
        await self.conn.execute('''
            CREATE TABLE IF NOT EXISTS seen_urls (
                url TEXT PRIMARY KEY,
                status TEXT,
                last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                content_hash TEXT
            )
        ''')
        await self.conn.commit()
        print("数据库初始化完成")
    
    async def is_seen(self, url):
        async with self.conn.execute("SELECT url FROM seen_urls WHERE url = ?", (url,)) as cursor:
            result = await cursor.fetchone()
            return result is not None
    
    async def mark_seen(self, url, status, content_hash=None):
        await self.conn.execute('''
            INSERT OR REPLACE INTO seen_urls (url, status, content_hash) 
            VALUES (?, ?, ?)
        ''', (url, status, content_hash))
        await self.conn.commit()
    
    async def close(self):
        if self.conn:
            await self.conn.close()
            print("数据库连接已关闭。")
        else:
            # 就是这里！当 self.conn 为 None (即没有活跃连接) 时，会打印这句话
            print("尝试关闭数据库连接，但没有激活的连接。") # 可以通过这个打印来确认

In [5]:
class RobotsChecker:
    def __init__(self):
        self.robot_parsers = {}
    
    async def can_fetch(self, url, user_agent='*'):
        """检查是否允许爬取"""
        parsed_url = urlparse(url)
        base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
        
        if base_url not in self.robot_parsers:
            # 获取robots.txt
            robot_url = f"{base_url}/robots.txt"
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(robot_url, timeout=10) as response:
                        if response.status == 200:
                            robot_content = await response.text()
                            parser = RobotFileParser()
                            parser.parse(robot_content.splitlines())
                            self.robot_parsers[base_url] = parser
                        else:
                            # 如果没有robots.txt，默认允许
                            self.robot_parsers[base_url] = None
            except:
                self.robot_parsers[base_url] = None
        
        parser = self.robot_parsers[base_url]
        return parser is None or parser.can_fetch(user_agent, url)

In [6]:
class PageParser:
    @staticmethod
    def parse_quotes(html, page_url):
        """解析Quotes页面"""
        soup = BeautifulSoup(html, 'html.parser')
        quotes = []
        
        for quote_div in soup.find_all('div', class_='quote'):
            text_elem = quote_div.find('span', class_='text')
            author_elem = quote_div.find('small', class_='author')
            
            if text_elem and author_elem:
                text = text_elem.get_text(strip=True)
                author = author_elem.get_text(strip=True)
                
                # 获取作者链接
                author_link = quote_div.find('a', href=True)
                author_url = urljoin(page_url, author_link['href']) if author_link else None
                
                # 获取标签
                tags = []
                tags_container = quote_div.find('div', class_='tags')
                if tags_container:
                    tags = [tag.get_text(strip=True) for tag in tags_container.find_all('a', class_='tag')]
                
                quotes.append({
                    'text': text,
                    'author': author,
                    'tags': tags,
                    'author_url': author_url,
                    'page_url': page_url
                })
        
        # 查找下一页链接
        next_link = soup.find('li', class_='next')
        next_url = urljoin(page_url, next_link.find('a')['href']) if next_link and next_link.find('a') else None
        
        return quotes, next_url
    
    @staticmethod
    def parse_books(html, page_url):
        """解析Books页面"""
        soup = BeautifulSoup(html, 'html.parser')
        books = []
        
        for book in soup.find_all('article', class_='product_pod'):
            # 获取标题
            title_elem = book.find('h3').find('a') if book.find('h3') else None
            title = title_elem['title'] if title_elem and 'title' in title_elem.attrs else 'Unknown'
            
            # 获取价格
            price_elem = book.find('p', class_='price_color')
            price = price_elem.get_text(strip=True) if price_elem else 'Unknown'
            
            # 获取库存状态
            stock_elem = book.find('p', class_='instock')
            stock = stock_elem.get_text(strip=True) if stock_elem else 'Unknown'
            
            # 获取评分
            rating_elem = book.find('p', class_='star-rating')
            rating = rating_elem['class'][1] if rating_elem and len(rating_elem.get('class', [])) > 1 else 'None'
            
            # 获取分类（从面包屑导航）
            category = 'Unknown'
            breadcrumb = soup.find('ul', class_='breadcrumb')
            if breadcrumb:
                category_items = breadcrumb.find_all('li')
                if len(category_items) >= 2:
                    category = category_items[-2].get_text(strip=True)
            
            # 获取产品链接
            product_link = title_elem['href'] if title_elem and 'href' in title_elem.attrs else None
            product_url = urljoin(page_url, product_link) if product_link else None
            
            books.append({
                'title': title,
                'price': price,
                'stock': stock,
                'rating': rating,
                'category': category,
                'product_url': product_url,
                'page_url': page_url
            })
        
        # 查找下一页链接
        next_link = soup.find('li', class_='next')
        next_url = urljoin(page_url, next_link.find('a')['href']) if next_link and next_link.find('a') else None
        
        return books, next_url

In [7]:
class AsyncCrawler:
    def __init__(self, site, concurrency=5, max_pages=50, delay=1.0):
        self.site = site
        self.base_url = "https://quotes.toscrape.com" if site == "quotes" else "https://books.toscrape.com"
        self.concurrency = concurrency
        self.max_pages = max_pages
        self.delay = delay
        self.semaphore = asyncio.Semaphore(concurrency)
        
        self.db = URLDatabase()
        self.robots_checker = RobotsChecker()
        self.parser = PageParser()
        
        self.collected_data = []
        self.stats = {
            'total_pages': 0,
            'successful_pages': 0,
            'failed_pages': 0,
            'duplicate_pages': 0,
            'start_time': None,
            'end_time': None
        }
    
    async def fetch_url(self, session, url, retries=3):
        for attempt in range(retries):
            try:
                async with self.semaphore:
                    await asyncio.sleep(self.delay)

                    # 直接使用传入的 session 参数
                    async with session.get(url, timeout=30) as response:
                        if response.status == 200:
                            content = await response.text()
                            content_hash = hashlib.md5(content.encode()).hexdigest()
                            return content, content_hash, True
                        else:
                            print(f"尝试 {attempt + 1}: {url} 返回状态码 {response.status}")
            except Exception as e:
                print(f"尝试 {attempt + 1}: {url} 错误: {e}")

            await asyncio.sleep(2 ** attempt)

        return None, None, False
    
    async def process_page(self, session, url):
        """处理单个页面"""
        try:
            self.stats['total_pages'] += 1
        
            # 检查robots.txt
            if not await self.robots_checker.can_fetch(url):
                print(f"由于robots.txt限制，跳过: {url}")
                return None
        
            # 检查是否已访问
            if await self.db.is_seen(url):
                print(f"跳过已访问URL: {url}")
                self.stats['duplicate_pages'] += 1
                return None
        
            # 抓取页面
            content, content_hash, success = await self.fetch_url(session, url)
        
            if not success:
                print(f"抓取失败: {url}")
                await self.db.mark_seen(url, "failed", content_hash)
                self.stats['failed_pages'] += 1
                return None
        
            # 解析页面
            if self.site == "quotes":
                data, next_url = self.parser.parse_quotes(content, url)
            else:
                data, next_url = self.parser.parse_books(content, url)
        
            # 保存数据
            self.collected_data.extend(data)
        
            # 标记为成功访问
            await self.db.mark_seen(url, "success", content_hash)
            self.stats['successful_pages'] += 1
        
            print(f"成功处理: {url} (找到 {len(data)} 条数据)")
            return next_url
            
        except Exception as e:
            print(f"解析错误 {url}: {e}")
            # 即使出错也要标记为已访问
            if 'content_hash' in locals():
                await self.db.mark_seen(url, "parse_error", content_hash)
            else:
                await self.db.mark_seen(url, "parse_error", None)
            self.stats['failed_pages'] += 1
            return None
       
    
    async def run(self):
        """运行爬虫"""
        print(f" 开始爬取 {self.site} 站点...")
        self.stats['start_time'] = datetime.now()
        
        # 初始化数据库
        await self.db.init_db()
        
        try:
            async with aiohttp.ClientSession() as session:
                queue = asyncio.Queue()
                await queue.put(self.base_url)

                workers = []

                for i in range(self.concurrency):
                    worker = asyncio.create_task(self.worker(f"worker-{i}", session, queue))
                    workers.append(worker)

                while self.stats['total_pages'] < self.max_pages and not queue.empty():
                    await asyncio.sleep(0.1)

                for worker in workers:
                    worker.cancel()

                await asyncio.gather(*workers, return_exceptions=True)

        finally:
            # 确保数据库连接关闭
            if hasattr(self, 'db') and self.db.conn:
                await self.db.close()
            # 理论上这里的session会被async with自动关闭，但如果仍然有问题，可以考虑在这里显式检查和关闭
            # if session and not session.closed:
            #     await session.close()
                
        self.stats['end_time'] = datetime.now()
        print(f" 爬取完成！总共处理 {self.stats['total_pages']} 个页面")
        
        
        return self.collected_data, self.stats
    
    # 修复worker方法
    async def worker(self, name, session, queue):
        """工作线程"""
        while True:
            try:
                # 检查是否达到最大页面限制
                if self.stats['total_pages'] >= self.max_pages:
                    break
                
                # 获取URL（带超时）
                try:
                    url = await asyncio.wait_for(queue.get(), timeout=5.0)
                except asyncio.TimeoutError:
                    # 队列为空且超时，退出worker
                    if queue.empty():
                        break
                    continue
                
                # 处理页面
                next_url = await self.process_page(session, url)
            
                # 添加新URL到队列
                if next_url and self.stats['total_pages'] < self.max_pages:
                    await queue.put(next_url)
                
                queue.task_done()
            
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Worker {name} 错误: {e}")
                try:
                    queue.task_done()
                except:
                    pass

In [8]:
class DataExporter:
    @staticmethod
    async def _save_stats(stats, site):
        """保存统计信息 - 符合项目要求"""
        try:
            stats_path = 'outputs/stats.json'
            stats_to_save = stats.copy()
        
            # 转换所有datetime对象为字符串
            for key, value in stats_to_save.items():
                if isinstance(value, datetime):
                    stats_to_save[key] = value.isoformat()
        
            # 计算项目要求的统计信息
            if 'start_time' in stats_to_save and 'end_time' in stats_to_save:
                try:
                    # 解析时间字符串
                    if isinstance(stats_to_save['start_time'], str):
                        start_time = datetime.fromisoformat(stats_to_save['start_time'].replace('Z', '+00:00'))
                    else:
                        start_time = stats_to_save['start_time']
                    
                    if isinstance(stats_to_save['end_time'], str):
                        end_time = datetime.fromisoformat(stats_to_save['end_time'].replace('Z', '+00:00'))
                    else:
                        end_time = stats_to_save['end_time']
                
                    if isinstance(start_time, datetime) and isinstance(end_time, datetime):
                        # 总耗时
                        total_duration = (end_time - start_time).total_seconds()
                        stats_to_save['total_duration'] = round(total_duration, 2)
                        
                        # 获取其他统计值
                        total_pages = stats.get('total_pages', 0)
                        duplicate_pages = stats.get('duplicate_pages', 0)
                        successful_pages = stats.get('successful_pages', 0)
                        
                        # 平均延迟
                        if total_pages > 0:
                            stats_to_save['avg_duration_per_page'] = round(total_duration / total_pages, 2)
                        else:
                            stats_to_save['avg_duration_per_page'] = 0
                            
                        # 重复比例
                        if total_pages > 0:
                            stats_to_save['duplicate_rate'] = round(duplicate_pages / total_pages, 3)
                        else:
                            stats_to_save['duplicate_rate'] = 0
                            
                        # 成功率
                        if total_pages > 0:
                            stats_to_save['success_rate'] = round(successful_pages / total_pages, 3)
                        else:
                            stats_to_save['success_rate'] = 0
                except Exception as e:
                    print(f"计算统计信息时出错: {e}")
                    # 如果计算失败，设置默认值
                    stats_to_save.update({
                        'total_duration': 0,
                        'avg_duration_per_page': 0,
                        'duplicate_rate': 0,
                        'success_rate': 0
                    })
        
            # 添加站点信息
            stats_to_save['site'] = site
            stats_to_save['crawled_at'] = datetime.now().isoformat()
        
            with open(stats_path, 'w', encoding='utf-8') as f:
                json.dump(stats_to_save, f, indent=2, ensure_ascii=False)
            print(f" 统计信息已保存: {stats_path}")
            
            return stats_to_save  # 返回计算后的统计信息
        
        except Exception as e:
            print(f" 保存统计信息失败: {e}")
            return stats
    
    @staticmethod
    async def export_data(data, stats, site):
        """导出数据到CSV和JSONL - 符合项目要求"""
        # 确保outputs目录存在
        os.makedirs('outputs', exist_ok=True)
        
        # 处理空数据的情况
        if not data or len(data) == 0:
            print("没有数据，创建符合要求的空文件")
            
            # 根据站点类型创建正确的表头
            if site == 'quotes':
                columns = ['text', 'author', 'tags', 'author_url', 'page_url']
            else:  # books
                columns = ['title', 'price', 'stock', 'rating', 'category', 'product_url', 'page_url']
            
            df_empty = pd.DataFrame(columns=columns)
            
            # 导出CSV - UTF-8编码，字段顺序固定
            csv_path = 'outputs/data.csv'
            df_empty.to_csv(csv_path, index=False, encoding='utf-8')
            print(f" CSV文件已创建: {csv_path}")
            
            # 导出JSONL
            jsonl_path = 'outputs/data.jsonl'
            with open(jsonl_path, 'w', encoding='utf-8') as f:
                pass  # 创建空文件
            print(f" JSONL文件已创建: {jsonl_path}")
            
            # 保存统计信息并获取计算后的统计信息
            calculated_stats = await DataExporter._save_stats(stats, site)
            
            return df_empty, calculated_stats  # 返回DataFrame和计算后的统计信息
        
        # 有数据的情况
        try:
            # 转换为DataFrame并确保字段顺序
            df = pd.DataFrame(data)
            
            # 确保字段顺序符合项目要求
            if site == 'quotes':
                expected_columns = ['text', 'author', 'tags', 'author_url', 'page_url']
            else:  # books
                expected_columns = ['title', 'price', 'stock', 'rating', 'category', 'product_url', 'page_url']
            
            # 只保留需要的列并按正确顺序排列
            available_columns = [col for col in expected_columns if col in df.columns]
            df = df[available_columns]
            
            # 导出CSV - UTF-8编码
            csv_path = 'outputs/data.csv'
            df.to_csv(csv_path, index=False, encoding='utf-8')
            print(f" CSV文件已保存: {csv_path} ({len(df)} 条记录)")
            
            # 导出JSONL
            jsonl_path = 'outputs/data.jsonl'
            df.to_json(jsonl_path, orient='records', lines=True, force_ascii=False)
            print(f" JSONL文件已保存: {jsonl_path}")
            
            # 保存统计信息并获取计算后的统计信息
            calculated_stats = await DataExporter._save_stats(stats, site)
            
            return df, calculated_stats
            
        except Exception as e:
            print(f" 保存数据文件失败: {e}")
            import traceback
            traceback.print_exc()
            return None, stats

    @staticmethod
    async def generate_report(df, stats, site):
        """生成HTML报告 - 符合项目要求的可视化"""
        # 使用传入的stats，而不是重新复制
        stats_for_report = stats
        
        if df.empty:
            print("没有数据生成报告")
            return await DataExporter._generate_basic_report(stats_for_report, site)
    
        try:
            # 确保统计信息中有正确的值
            total_pages = stats_for_report.get('total_pages', 0)
            success_rate = stats_for_report.get('success_rate', 0) * 100
            duplicate_rate = stats_for_report.get('duplicate_rate', 0) * 100
            total_duration = stats_for_report.get('total_duration', 0)
            
            print(f"调试信息 - 统计值: total_pages={total_pages}, success_rate={success_rate}%, duplicate_rate={duplicate_rate}%, total_duration={total_duration}s")

            # 创建包含可视化图表的HTML报告
            html_content = [
                "<!DOCTYPE html>",
                "<html>",
                "<head>",
                "    <meta charset=\"UTF-8\">",
                "    <title>Async MiniCrawler Report - " + site.capitalize() + "</title>",
                "    <script src='https://cdn.plot.ly/plotly-latest.min.js'></script>",
                "    <style>",
                "        body { font-family: Arial, sans-serif; margin: 40px; background: #f8f9fa; }",
                "        .header { text-align: center; margin-bottom: 30px; }",
                "        .kpi-container { display: flex; justify-content: center; flex-wrap: wrap; gap: 20px; margin-bottom: 40px; }",
                "        .kpi-card { ",
                "            background: white; padding: 25px; border-radius: 12px;",
                "            box-shadow: 0 2px 10px rgba(0,0,0,0.1); min-width: 200px; text-align: center;",
                "        }",
                "        .kpi-value { font-size: 28px; font-weight: bold; color: #007bff; margin: 10px 0; }",
                "        .kpi-label { color: #6c757d; font-size: 14px; }",
                "        .chart-container { background: white; padding: 20px; border-radius: 12px;",
                "            box-shadow: 0 2px 10px rgba(0,0,0,0.1); margin: 20px 0; }",
                "        .data-table { margin: 30px 0; }",
                "        table { width: 100%; border-collapse: collapse; background: white; }",
                "        th, td { padding: 12px; text-align: left; border-bottom: 1px solid #dee2e6; }",
                "        th { background-color: #f8f9fa; font-weight: bold; }",
                "        .positive { color: #28a745; }",
                "        .negative { color: #dc3545; }",
                "    </style>",
                "</head>",
                "<body>",
                "    <div class='header'>",
                "        <h1>📊 迷你爬虫报告</h1>",
                "        <h2>" + ("名言警句" if site == 'quotes' else "图书") + "站点分析</h2>",
                "        <p>生成时间: " + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + "</p>",
                "    </div>",
            ]
        
            # KPI 概览卡片
            html_content.extend([
                "    <div class='kpi-container'>",
                "        <div class='kpi-card'>",
                "            <div class='kpi-label'>总页面数</div>",
                "            <div class='kpi-value'>" + str(total_pages) + "</div>",
                "        </div>",
                "        <div class='kpi-card'>",
                "            <div class='kpi-label'>成功率</div>",
                "            <div class='kpi-value'>" + f"{success_rate:.1f}%" + "</div>",
                "        </div>",
                "        <div class='kpi-card'>",
                "            <div class='kpi-label'>重复率</div>",
                "            <div class='kpi-value'>" + f"{duplicate_rate:.1f}%" + "</div>",
                "        </div>",
                "        <div class='kpi-card'>",
                "            <div class='kpi-label'>总耗时</div>",
                "            <div class='kpi-value'>" + f"{total_duration:.1f}s" + "</div>",
                "        </div>",
                "    </div>",
            ])
        
            # 可视化图表 - Top 10 作者/分类
            if site == 'quotes' and 'author' in df.columns and not df.empty:
                author_counts = df['author'].value_counts().head(10)
                if not author_counts.empty:
                    html_content.extend(DataExporter._create_author_chart(author_counts))
            elif site == 'books' and 'category' in df.columns and not df.empty:
                category_counts = df['category'].value_counts().head(10)
                if not category_counts.empty:
                    html_content.extend(DataExporter._create_category_chart(category_counts))
        
            # 数据预览
            html_content.extend([
                "    <div class='chart-container'>",
                "        <h3>数据预览 (前10条记录)</h3>",
                "        <div class='data-table'>" + df.head(10).to_html(classes='table table-striped', index=False, escape=False) + "</div>",
                "    </div>",
            ])
        
            # 详细统计信息
            html_content.extend([
                "    <div class='chart-container'>",
                "        <h3>详细统计信息</h3>",
                "        <pre style='background: #f8f9fa; padding: 20px; border-radius: 5px;'>" + 
                json.dumps(stats_for_report, indent=2, ensure_ascii=False) + "</pre>",
                "    </div>",
            ])
        
            html_content.extend([
                "</body>",
                "</html>"
            ])
        
            # 保存报告
            report_path = 'outputs/report.html'
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write('\n'.join(html_content))
        
            print(f" HTML报告已生成: {report_path}")
            return report_path
        
        except Exception as e:
            print(f" 生成报告失败: {e}")
            import traceback
            traceback.print_exc()
            return await DataExporter._generate_basic_report(stats_for_report, site)

    
    @staticmethod
    def _create_author_chart(author_counts):
        """创建作者条形图"""
        authors = author_counts.index.tolist()
        counts = author_counts.values.tolist()
        
        return [
            "    <div class='chart-container'>",
            "        <h3>Top 10 作者</h3>",
            "        <div id='author-chart'></div>",
            "        <script>",
            "            var authorData = {",
            "                x: " + json.dumps(counts) + ",",
            "                y: " + json.dumps(authors) + ",",
            "                type: 'bar',",
            "                orientation: 'h',",
            "                marker: { color: '#007bff' }",
            "            };",
            "            var layout = {",
            "                height: 400,",
            "                margin: { l: 150 },",
            "                title: 'Top 10 作者'",
            "            };",
            "            Plotly.newPlot('author-chart', [authorData], layout);",
            "        </script>",
            "    </div>"
        ]
    
    @staticmethod
    def _create_category_chart(category_counts):
        """创建分类条形图"""
        categories = category_counts.index.tolist()
        counts = category_counts.values.tolist()
        
        return [
            "    <div class='chart-container'>",
            "        <h3>Top 10 分类</h3>",
            "        <div id='category-chart'></div>",
            "        <script>",
            "            var categoryData = {",
            "                x: " + json.dumps(counts) + ",",
            "                y: " + json.dumps(categories) + ",",
            "                type: 'bar',",
            "                orientation: 'h',",
            "                marker: { color: '#28a745' }",
            "            };",
            "            var layout = {",
            "                height: 400,",
            "                margin: { l: 150 },",
            "                title: 'Top 10 分类'",
            "            };",
            "            Plotly.newPlot('category-chart', [categoryData], layout);",
            "        </script>",
            "    </div>"
        ]
    
    @staticmethod
    async def _generate_basic_report(stats, site):
        """生成基本报告（当没有数据时）"""
        try:
            # 确保stats中没有datetime对象
            stats_for_report = stats.copy()
            for key, value in stats_for_report.items():
                if isinstance(value, datetime):
                    stats_for_report[key] = value.isoformat()
        
            html_content = [
                "<!DOCTYPE html>",
                "<html>",
                "<head>",
                "    <meta charset=\"UTF-8\">",  # 添加UTF-8编码声明
                "    <title>迷你爬虫报告</title>",
                "</head>",
                "<body>",
                "    <h1>迷你爬虫报告 - " + ("名言警句" if site == 'quotes' else "图书") + "</h1>",
                "    <p>生成时间: " + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + "</p>",
                "    <h2>没有收集到数据</h2>",
                "    <p>爬虫已完成但没有收集到任何数据。</p>",
                "    <h3>统计信息:</h3>",
                "    <pre>" + json.dumps(stats_for_report, indent=2, ensure_ascii=False) + "</pre>",
                "</body>",
                "</html>"
            ]
        
            report_path = 'outputs/report.html'
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write('\n'.join(html_content))
        
            print(f" 基本HTML报告已生成: {report_path}")
            return report_path
        
        except Exception as e:
            print(f" 生成基本报告失败: {e}")
            return None

In [9]:
# ==================== 测试运行部分 ====================

print(" 设置测试环境...")

# 确保测试文件存在且路径正确
import os

# 检查测试文件是否存在
test_files = [
    'tests/fixtures/quotes_page1.html',
    'tests/fixtures/books_page1.html',
    'tests/test_crawler.py'
]

for file in test_files:
    if os.path.exists(file):
        print(f" {file} 存在")
    else:
        print(f" {file} 不存在，请先创建测试文件")

print("\n 开始运行测试...")
print("=" * 50)

# 直接在notebook中运行测试函数
def run_tests_in_notebook():
    """在Notebook中直接运行测试"""
    
    # 临时定义测试函数（因为import可能有问题）
    def test_parse_quotes():
        """测试名言页面解析"""
        print("测试名言页面解析...")
        
        try:
            with open('tests/fixtures/quotes_page1.html', 'r', encoding='utf-8') as f:
                html = f.read()
            
            parser = PageParser()
            quotes, next_url = parser.parse_quotes(html, 'http://test.com')
            
            # 验证解析结果
            assert len(quotes) == 1, f"应该解析出1条名言，实际得到: {len(quotes)}"
            assert quotes[0]['author'] == 'Alan Kay', f"作者应该是Alan Kay，实际是: {quotes[0]['author']}"
            assert 'future' in quotes[0]['tags'], f"应该包含future标签，实际标签: {quotes[0]['tags']}"
            assert 'invention' in quotes[0]['tags'], f"应该包含invention标签，实际标签: {quotes[0]['tags']}"
            assert next_url == 'http://test.com/page/2/', f"下一页URL应该是http://test.com/page/2/，实际是: {next_url}"
            
            expected_text = '\"The best way to predict the future is to invent it.\"'
            assert quotes[0]['text'] == expected_text, f"名言文本不匹配"
            
            print(" 名言解析测试通过")
            return True
            
        except Exception as e:
            print(f" 名言解析测试失败: {e}")
            return False

    def test_parse_books():
        """测试图书页面解析"""
        print("测试图书页面解析...")
        
        try:
            with open('tests/fixtures/books_page1.html', 'r', encoding='utf-8') as f:
                html = f.read()
            
            parser = PageParser()
            books, next_url = parser.parse_books(html, 'http://test.com')
            
            # 验证解析结果
            assert len(books) == 1, f"应该解析出1本图书，实际得到: {len(books)}"
            assert books[0]['title'] == 'A Light in the Attic', f"书名应该是A Light in the Attic，实际是: {books[0]['title']}"
            assert books[0]['price'] == '£51.77', f"价格应该是£51.77，实际是: {books[0]['price']}"
            assert books[0]['stock'] == 'In stock', f"库存状态应该是In stock，实际是: {books[0]['stock']}"
            assert books[0]['rating'] == 'Three', f"评分应该是Three，实际是: {books[0]['rating']}"
            assert next_url == 'http://test.com/page-2.html', f"下一页URL应该是http://test.com/page-2.html，实际是: {next_url}"
            
            print(" 图书解析测试通过")
            return True
            
        except Exception as e:
            print(f" 图书解析测试失败: {e}")
            return False

    # 运行测试
    tests = [test_parse_quotes, test_parse_books]
    results = []
    
    for test in tests:
        try:
            result = test()
            results.append(result)
        except Exception as e:
            print(f" 测试运行错误: {e}")
            results.append(False)
    
    # 显示结果
    print("\\n" + "=" * 50)
    print(" 测试结果汇总:")
    print(f"总测试数: {len(tests)}")
    print(f"通过数: {sum(results)}")
    print(f"失败数: {len(tests) - sum(results)}")
    
    if all(results):
        print(" 所有测试通过！")
    else:
        print(" 有测试失败！")
    
    return all(results)

print("\n 测试部分完成！")

 设置测试环境...
 tests/fixtures/quotes_page1.html 存在
 tests/fixtures/books_page1.html 存在
 tests/test_crawler.py 存在

 开始运行测试...

 测试部分完成！


In [10]:
import sqlite3

def force_close_database():
    """强制关闭所有可能的数据库连接"""
    try:
        # 尝试连接并立即关闭来释放文件锁
        if os.path.exists('seen_urls.db'):
            conn = sqlite3.connect('seen_urls.db')
            conn.close()
            print("数据库连接已强制关闭")
    except Exception as e:
        print(f"强制关闭数据库时出错: {e}")
    
    # 尝试删除数据库文件
    db_files = ['seen_urls.db', 'seen_urls.db-journal']
    for db_file in db_files:
        if os.path.exists(db_file):
            try:
                os.remove(db_file)
                print(f"已删除数据库文件: {db_file}")
            except Exception as e:
                print(f"无法删除 {db_file}: {e}")
# 彻底清理outputs目录和数据库文件
# 彻底清理
import shutil
import os
import glob

# 先强制关闭数据库
force_close_database()

# 清理outputs目录
if os.path.exists('outputs'):
    shutil.rmtree('outputs')
os.makedirs('outputs', exist_ok=True)
print("已清理outputs目录")

print("清理完成，准备运行爬虫")

已清理outputs目录
清理完成，准备运行爬虫


In [11]:
import os
import glob

# 彻底删除所有旧文件
files_to_remove = ['seen_urls.db'] + glob.glob('outputs/*')
for file in files_to_remove:
    if os.path.exists(file):
        try:
            if os.path.isfile(file):
                os.remove(file)
                print(f"已删除: {file}")
            elif os.path.isdir(file):
                import shutil
                shutil.rmtree(file)
                print(f"已删除目录: {file}")
        except Exception as e:
            print(f"无法删除 {file}: {e}")

print("清理完成")

清理完成


In [12]:
# 先测试数据库功能
async def test_database():
    db = URLDatabase()
    await db.init_db()
    
    # 测试插入和查询
    test_url = "https://quotes.toscrape.com/test"
    await db.mark_seen(test_url, "test", "test_hash")
    
    is_seen = await db.is_seen(test_url)
    print(f"数据库测试: {'成功' if is_seen else '失败'}")
    
    await db.close()

await test_database()

数据库初始化完成
数据库测试: 成功
数据库连接已关闭。


In [13]:
async def run_crawler(site='quotes', concurrency=3, max_pages=20, delay=1.0):
    """运行爬虫并导出所有要求的文件"""
    print(f"开始爬取 {site} 站点")
    print(f"配置: {concurrency} 并发, 最多 {max_pages} 页面, 延迟 {delay}s")
    
    crawler = None
    try:
        crawler = AsyncCrawler(site, concurrency, max_pages, delay)
        data, stats = await crawler.run()
        
        print(f"爬虫完成，获得 {len(data) if data else 0} 条数据")
        
        # 导出数据
        print("开始导出数据文件...")
        df, calculated_stats = await DataExporter.export_data(data, stats, site)
        
        if df is not None:
            print(f"数据导出成功，共 {len(df)} 条记录")
        
            # 生成HTML报告，使用计算后的统计信息
            print("生成可视化报告...")
            report_path = await DataExporter.generate_report(df, calculated_stats, site)
            
            if report_path:
                print(f"所有文件生成完成！")
                print(f" - CSV: outputs/data.csv")
                print(f" - JSONL: outputs/data.jsonl") 
                print(f" - Stats: outputs/stats.json")
                print(f" - Report: outputs/report.html")
            else:
                print("报告生成失败")
        else:
            print("数据导出失败")
        
        return data, calculated_stats  # 返回计算后的统计信息
        
    except Exception as e:
        print(f"爬虫运行失败: {e}")
        import traceback
        traceback.print_exc()
        
        # 即使失败也生成统计文件
        error_stats = {
            'total_pages': 0,
            'successful_pages': 0,
            'failed_pages': 1,
            'duplicate_pages': 0,
            'start_time': datetime.now().isoformat(),
            'end_time': datetime.now().isoformat(),
            'error': str(e),
            'status': 'failed'
        }
        await DataExporter.export_data([], error_stats, site)
        
        return [], error_stats
    finally:
        # 确保数据库连接关闭（安全地关闭）
        try:
            if crawler and hasattr(crawler, 'db') and crawler.db.conn:
                await crawler.db.close()
                print("数据库连接已安全关闭")
        except Exception as e:
            print(f"关闭数据库连接时出错: {e}")

In [14]:
# 清理outputs目录
import shutil
if os.path.exists('outputs'):
    shutil.rmtree('outputs')
os.makedirs('outputs', exist_ok=True)
print("已清理outputs目录")

# 运行爬虫
print("开始运行爬虫...")
try:
    data, stats = await run_crawler('quotes', concurrency=2, max_pages=50, delay=1.0)
    
    # 检查所有生成的文件
    print("\n=== 文件生成情况 ===")
    import glob
    files = glob.glob('outputs/*')
    
    if not files:
        print(" 没有生成任何文件")
    else:
        for file in files:
            if os.path.exists(file):
                file_size = os.path.getsize(file)
                print(f" {file} - {file_size} 字节")
                
                # 显示文件内容预览
                if file.endswith('.csv') and file_size > 0:
                    try:
                        df_check = pd.read_csv(file)
                        print(f"  数据行数: {len(df_check)}")
                        print(f"  列名: {list(df_check.columns)}")
                    except Exception as e:
                        print(f"  读取CSV文件失败: {e}")
                
                elif file.endswith('.json') and file_size > 0:
                    try:
                        with open(file, 'r', encoding='utf-8') as f:
                            stats_content = json.load(f)
                        print(f"  统计信息: {stats_content.get('total_pages', '未知')} 页面")
                    except Exception as e:
                        print(f"  读取JSON文件失败: {e}")
            
except Exception as e:
    print(f"爬虫运行失败: {e}")
    import traceback
    traceback.print_exc()

已清理outputs目录
开始运行爬虫...
开始爬取 quotes 站点
配置: 2 并发, 最多 50 页面, 延迟 1.0s
 开始爬取 quotes 站点...
数据库初始化完成
成功处理: https://quotes.toscrape.com (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/2/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/3/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/4/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/5/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/6/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/7/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/8/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/9/ (找到 10 条数据)
成功处理: https://quotes.toscrape.com/page/10/ (找到 10 条数据)
数据库连接已关闭。
 爬取完成！总共处理 10 个页面
爬虫完成，获得 100 条数据
开始导出数据文件...
 CSV文件已保存: outputs/data.csv (100 条记录)
 JSONL文件已保存: outputs/data.jsonl
 统计信息已保存: outputs/stats.json
数据导出成功，共 100 条记录
生成可视化报告...
调试信息 - 统计值: total_pages=10, success_rate=100.0%, duplicate_rate=0.0%, total_duration=19.31s
 HTML报告已生成: outputs/report.html
所有文件生成完成！
 - CSV: outputs/data.csv
 - JSONL: outputs/data.jsonl
 -