版本:2.0.0 (模块化架构) 更新时间:2026-03-22
飞书数据采集系统是一个模块化、可扩展的数据采集和推送工具,支持:
- 🔍 多搜索引擎支持(Bing)
- 🕷️ 多爬虫策略(简单/增强)
- 📊 多数据库支持(SQLite)
- 📨 多消息渠道(飞书)
- 🐳 Docker 容器化部署
- 模块化:每个功能独立成模块,低耦合高内聚
- 插件化:搜索引擎、爬虫、消息发送器都可以灵活替换
- 配置驱动:通过配置文件和环境变量控制行为
- 易扩展:基于抽象基类,方便添加新功能
crawler_project/
├── src/
│ ├── __init__.py # 包初始化
│ ├── config.py # 配置管理
│ ├── database.py # 数据库操作
│ ├── pipeline.py # 数据处理流水线
│ ├── searchers/ # 搜索引擎模块
│ │ ├── __init__.py
│ │ ├── base.py # 搜索引擎基类
│ │ └── bing.py # Bing搜索实现
│ ├── crawlers/ # 爬虫模块
│ │ ├── __init__.py
│ │ ├── base.py # 爬虫基类
│ │ └── simple.py # 爬虫实现
│ └── senders/ # 消息发送模块
│ ├── __init__.py
│ ├── base.py # 发送器基类
│ └── feishu.py # 飞书发送实现
├── main.py # 主程序入口
├── config.json # 配置文件
├── keywords.json # 关键词配置
├── requirements.txt # 依赖列表
├── Dockerfile # Docker配置
├── docker-compose.yml # Docker Compose配置
└── .dockerignore # Docker忽略文件
用户请求
↓
Pipeline (流水线)
↓
Searcher (搜索)
↓
Crawler (爬取)
↓
Database (存储)
↓
Sender (发送)
↓
飞书群
类:Config
配置管理类,支持从配置文件和环境变量读取配置。
核心方法:
__init__(config_file): 初始化配置get(key, default=None): 获取配置项(支持嵌套,优先环境变量)get_all(): 获取所有配置reload(): 重新加载配置
使用示例:
from src.config import config
# 获取配置
api_key = config.get('bing.api_key')
webhook = config.get('feishu.webhook')
# 支持嵌套
count = config.get('max_articles_per_category', 10)类:Database
数据库操作类,封装了所有数据库操作。
核心方法:
__init__(db_path): 初始化数据库is_url_exists(url): 检查URL是否已存在save_article(article): 保存文章batch_save_articles(articles): 批量保存文章get_articles_by_category(category, limit): 获取指定分类未发送的文章mark_as_sent(url): 标记文章已发送batch_mark_as_sent(urls): 批量标记get_stats(): 获取统计信息save_crawl_stat(category, keyword, count): 保存爬取统计clear_old_data(days): 清理旧数据
数据表结构:
-- 文章表
CREATE TABLE articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT NOT NULL,
category TEXT NOT NULL,
crawl_time TIMESTAMP NOT NULL,
is_sent BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
-- 统计表
CREATE TABLE crawl_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
keyword TEXT NOT NULL,
article_count INTEGER DEFAULT 0,
crawl_time TIMESTAMP NOT NULL,
UNIQUE(category, keyword, crawl_time)
)使用示例:
from src.database import Database
db = Database('crawler.db')
# 保存文章
article = {
'url': 'https://example.com/article',
'title': '文章标题',
'content': '文章内容',
'source': 'example.com',
'category': '科技'
}
db.save_article(article)
# 获取未发送文章
articles = db.get_articles_by_category('科技', limit=10)
# 标记为已发送
for article in articles:
db.mark_as_sent(article['url'])所有搜索引擎的基类,定义了统一接口。
抽象方法:
validate_config(): 验证配置search(keyword, count, freshness, language): 执行搜索
方法:
search_by_keywords(keywords, count, **kwargs): 批量搜索
Bing搜索引擎实现。
配置参数:
{
"api_key": "你的Bing API密钥"
}使用示例:
from src.searchers import BingSearcher
config = {'api_key': 'your-api-key'}
searcher = BingSearcher(config)
# 搜索
results = searcher.search(
keyword='人工智能',
count=10,
freshness='day',
language='zh-CN'
)
# 批量搜索
keywords = ['人工智能', '机器学习', '深度学习']
results_dict = searcher.search_by_keywords(keywords, count=5)支持更多功能,如安全搜索、新闻搜索。
额外方法:
search_news(keyword, count): 搜索新闻
所有爬虫的基类。
配置参数:
{
"delay_min": 1,
"delay_max": 3,
"timeout": 10,
"max_retries": 3
}抽象方法:
crawl(url): 爬取单个URL
方法:
batch_crawl(urls): 批量爬取crawl_with_retry(url): 带重试的爬取_random_delay(): 随机延迟
简单爬虫实现,使用 requests + BeautifulSoup。
特性:
- 自动提取标题和正文
- 支持多种内容容器识别
- 文本清理和长度限制
- 自动检测编码
增强版爬虫,支持更多功能。
额外特性:
- 提取发布时间
- 提取作者信息
- 支持重定向
- SSL验证控制
使用示例:
from src.crawlers import SimpleCrawler, AdvancedCrawler
# 简单爬虫
config = {'delay_min': 1, 'delay_max': 3}
crawler = SimpleCrawler(config)
article = crawler.crawl('https://example.com')
# 增强爬虫
advanced_crawler = AdvancedCrawler(config)
article = advanced_crawler.crawl('https://example.com')
print(f"作者: {article.get('author')}")
print(f"发布时间: {article.get('pub_date')}")所有消息发送器的基类。
抽象方法:
validate_config(): 验证配置send(title, content, **kwargs): 发送消息
方法:
send_batch(messages): 批量发送enable()/disable(): 启用/禁用is_enabled(): 检查是否启用
飞书消息发送器。
配置参数:
{
"webhook_url": "https://open.feishu.cn/open-apis/bot/v2/hook/xxx",
"enabled": true
}方法:
send(title, content, msg_type, **kwargs): 发送消息_send_text(text): 发送文本消息_send_post(title, content): 发送富文本消息_send_interactive(title, content): 发送卡片消息send_article_card(category, articles): 发送文章卡片send_summary(stats): 发送汇总信息
消息类型:
text: 纯文本post: 富文本interactive: 交互卡片
增强版飞书发送器,支持更多功能。
额外方法:
send_multi_category_report(articles_by_category, stats): 发送多分类报告
使用示例:
from src.senders import FeishuSender, FeishuSenderV2
# 基础发送器
config = {'webhook_url': 'your-webhook-url'}
sender = FeishuSender(config)
# 发送消息
sender.send(
title='测试消息',
content='这是测试内容',
msg_type='interactive'
)
# 增强发送器
sender_v2 = FeishuSenderV2(config)
sender_v2.send_article_card('科技', articles)类:DataPipeline
数据处理流水线,协调各个模块完成完整的数据采集流程。
工作流程:
- 加载配置
- 初始化组件(搜索引擎、爬虫、数据库、发送器)
- 数据采集(搜索 → 爬取 → 存储)
- 发送消息(飞书)
- 输出统计
方法:
__init__(): 初始化流水线run(keywords_file): 运行完整流程_load_keywords(keywords_file): 加载关键词配置_collect_data(keywords_config): 数据采集_send_to_feishu(keywords_config): 发送到飞书_print_stats(): 输出统计
使用示例:
from src.pipeline import DataPipeline
# 创建流水线
pipeline = DataPipeline()
# 运行
pipeline.run('keywords.json')class Config:
def __init__(self, config_file: str = "config.json")
def get(self, key: str, default: Any = None) -> Any
def get_all(self) -> Dict[str, Any]
def reload(self)设计要点:
- 支持环境变量覆盖配置文件
- 支持嵌套配置访问(如
bing.api_key) - 自动类型转换(int, bool)
class Database:
def __init__(self, db_path: str = "crawler.db")
def is_url_exists(self, url: str) -> bool
def save_article(self, article: Dict[str, Any]) -> bool
def batch_save_articles(self, articles: List[Dict[str, Any]]) -> int
def get_articles_by_category(self, category: str, limit: int = 10) -> List[Dict[str, Any]]
def mark_as_sent(self, url: str)
def batch_mark_as_sent(self, urls: List[str])
def get_stats(self) -> Dict[str, Any]
def save_crawl_stat(self, category: str, keyword: str, article_count: int)
def clear_old_data(self, days: int = 7)设计要点:
- 使用上下文管理器管理连接
- 支持批量操作
- 自动处理重复URL
class BaseSearcher(ABC):
@abstractmethod
def validate_config(self)
@abstractmethod
def search(self, keyword: str, count: int = 10, freshness: Optional[str] = None, language: str = "zh-CN") -> List[Dict[str, Any]]
def search_by_keywords(self, keywords: List[str], count: int = 10, **kwargs) -> Dict[str, List[Dict[str, Any]]]设计要点:
- 抽象基类,定义接口
- 统一搜索结果格式
- 支持批量搜索
class BaseCrawler(ABC):
@abstractmethod
def crawl(self, url: str) -> Optional[Dict[str, Any]]
def batch_crawl(self, urls: List[str]) -> List[Dict[str, Any]]
def crawl_with_retry(self, url: str) -> Optional[Dict[str, Any]]
def _random_delay(self)
def validate_url(self, url: str) -> bool设计要点:
- 抽象基类,定义接口
- 内置重试机制
- 随机延迟防止被封
class BaseSender(ABC):
@abstractmethod
def validate_config(self)
@abstractmethod
def send(self, title: str, content: str, **kwargs) -> bool
def send_batch(self, messages: List[Dict[str, Any]]) -> Dict[str, bool]
def is_enabled(self) -> bool
def enable(self)
def disable(self)设计要点:
- 抽象基类,定义接口
- 支持启用/禁用
- 统一发送接口
步骤:
- 在
src/searchers/创建新文件,如google.py - 继承
BaseSearcher - 实现
validate_config()和search()方法 - 在
src/searchers/__init__.py导出
示例:
# src/searchers/google.py
from typing import List, Dict, Any, Optional
from .base import BaseSearcher
class GoogleSearcher(BaseSearcher):
def validate_config(self):
if not self.config.get('api_key'):
raise ValueError("Google搜索器需要 api_key 配置")
def search(
self,
keyword: str,
count: int = 10,
freshness: Optional[str] = None,
language: str = "zh-CN"
) -> List[Dict[str, Any]]:
# 实现Google搜索逻辑
results = []
# ... 搜索代码 ...
return results步骤:
- 在
src/crawlers/创建新文件,如selenium.py - 继承
BaseCrawler - 实现
crawl()方法 - 在
src/crawlers/__init__.py导出
示例:
# src/crawlers/selenium.py
from typing import Dict, Any, Optional
from .base import BaseCrawler
class SeleniumCrawler(BaseCrawler):
def crawl(self, url: str) -> Optional[Dict[str, Any]]:
# 实现Selenium爬取逻辑
# ... 爬取代码 ...
return {
'url': url,
'title': '标题',
'content': '内容',
'crawler': 'selenium'
}步骤:
- 在
src/senders/创建新文件,如wechat.py - 继承
BaseSender - 实现
validate_config()和send()方法 - 在
src/senders/__init__.py导出
示例:
# src/senders/wechat.py
from typing import Dict, Any
from .base import BaseSender
class WeChatSender(BaseSender):
def validate_config(self):
if not self.config.get('webhook_url'):
raise ValueError("微信发送器需要 webhook_url 配置")
def send(self, title: str, content: str, **kwargs) -> bool:
# 实现微信发送逻辑
# ... 发送代码 ...
return True| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
bing.api_key |
string | 必填 | Bing API密钥 |
feishu.webhook |
string | 必填 | 飞书Webhook地址 |
db_path |
string | "crawler.db" | 数据库文件路径 |
max_articles_per_category |
int | 10 | 每分类最大文章数 |
bing_count_per_search |
int | 10 | 每次搜索结果数 |
| 环境变量 | 对应配置项 | 示例 |
|---|---|---|
BING_API_KEY |
bing.api_key |
export BING_API_KEY=xxx |
FEISHU_WEBHOOK |
feishu.webhook |
export FEISHU_WEBHOOK=xxx |
DB_PATH |
db_path |
export DB_PATH=/app/data/crawler.db |
{
'title': '标题',
'url': 'https://example.com',
'snippet': '摘要',
'source': 'example.com',
'search_engine': 'bing'
}{
'url': 'https://example.com',
'title': '标题',
'content': '正文内容',
'source': 'example.com',
'crawler': 'simple'
}{
'total': 100,
'by_category': {
'科技': 50,
'财经': 30,
'娱乐': 20
},
'unsent': 10
}- 使用环境变量:敏感信息(API密钥)使用环境变量
- 批量操作:尽量使用批量操作提高性能
- 错误处理:每个模块都有完善的错误处理
- 日志记录:重要操作都有日志输出
- 配置管理:配置文件和环境变量结合使用
- 数据库索引:已为关键字段创建索引
- 批量操作:支持批量保存和批量标记
- 连接池:使用session管理HTTP连接
- 延迟控制:随机延迟避免被封
A: 修改 pipeline.py 中的 _init_searcher() 方法:
from src.searchers import GoogleSearcher
def _init_searcher(self):
config = {'api_key': self.bing_api_key}
return GoogleSearcher(config)A: 参考上面的"添加新的爬虫"章节。
A: 修改 pipeline.py 中的 _init_sender() 方法:
def _init_sender(self):
senders = [
FeishuSender(self.feishu_config),
WeChatSender(self.wechat_config)
]
return senders- ✅ 完全模块化重构
- ✅ 插件化架构
- ✅ 支持多种搜索引擎
- ✅ 支持多种爬虫策略
- ✅ 支持多种消息渠道
- ✅ 完整的代码文档
- ✅ MVP版本
- ✅ 基础数据采集功能
- ✅ 飞书消息推送
欢迎贡献代码!请遵循以下步骤:
- Fork 本项目
- 创建特性分支 (
git checkout -b feature/AmazingFeature) - 提交更改 (
git commit -m 'Add some AmazingFeature') - 推送到分支 (
git push origin feature/AmazingFeature) - 开启 Pull Request
MIT License
如有问题或建议,请提交 Issue。