<a href="https://colab.research.google.com/github/Emmawalker-bloom/cninfo-annul-report-download/blob/main/main_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [26]:
import requests
import os
import time
import argparse
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import re

# 全局请求头，模拟浏览器访问
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Referer': 'http://www.cninfo.com.cn/',
}

def fetch_reports_for_keyword(session, stock_code, plate, keyword, year):
    """为单个年份和单个关键词获取报告"""
    stock_param = f"{stock_code},gssh0{stock_code}" if plate == 'sh' else f"{stock_code},gssz0{stock_code}"

    # 构建请求参数
    params = {
        'stock': stock_param,
        'tabName': 'fulltext',
        'pageSize': '50',  # 增加每页大小以确保获取所有相关报告
        'pageNum': '1',
        'column': f'sse{plate}',
        'plate': plate,
        'seDate': f'{year}-01-01~{year}-12-31',
        'searchkey': keyword,
        'secid': '',
        'sortName': 'announcementTime',
        'sortType': 'desc',
        'isHLtitle': 'true',
    }

    print(f"DEBUG: Requesting reports for stock={stock_param}, year={year}, keyword='{keyword}'") # Debug print
    try:
        # 发送POST请求
        response = session.post('http://www.cninfo.com.cn/new/hisAnnouncement/query', data=params, headers=HEADERS, timeout=10)
        response.raise_for_status()
        data = response.json()
        print(f"DEBUG: Received response data for year={year}, keyword='{keyword}': {data}") # Debug print
        if 'announcements' in data and data['announcements']:
            return data['announcements']
    except requests.exceptions.RequestException as e:
        print(f"请求失败: 年份={year}, 关键词='{keyword}', 错误: {e}")
    return []

def fetch_all_reports(stock_code, start_year, end_year):
    """获取指定股票在年份范围内的所有财报"""
    print(f"开始查询 {stock_code} 从 {start_year} 年到 {end_year} 年的财报...")

    # 判断股票属于上海还是深圳市场
    plate = 'sh' if stock_code.startswith('6') else 'sz'
    report_keywords = ['年度报告'] # Only search for annual reports

    all_announcements = []
    tasks = []

    # Use a single session for all requests within the thread pool
    with requests.Session() as session:
        # Use thread pool for concurrent requests
        with ThreadPoolExecutor(max_workers=10) as executor:
            for year in range(end_year, start_year - 1, -1):
                for keyword in report_keywords:
                    tasks.append(executor.submit(fetch_reports_for_keyword, session, stock_code, plate, keyword, year))

            for future in tqdm(as_completed(tasks), total=len(tasks), desc="查询进度"):
                result = future.result()
                if result:
                    all_announcements.extend(result)

    # 去重和筛选
    unique_announcements = {}
    # More specific exclusion keywords
    excluded_keywords = ['半年度报告', '季度报告', '持续督导', '受托管理', '问询函', '回复', '摘要', '英文版']

    for ann in all_announcements:
        title = ann['announcementTitle']
        # Check if the title contains "年度报告" and does not contain the specific excluded keywords
        if '年度报告' in title and not any(keyword in title for keyword in excluded_keywords):
             unique_announcements[ann['announcementId']] = ann


    # 按时间降序排序
    sorted_announcements = sorted(unique_announcements.values(), key=lambda x: x['announcementTime'], reverse=True)

    return sorted_announcements

def download_file(report, download_dir):
    """下载单个报告文件"""
    pdf_url = f"http://static.cninfo.com.cn/{report['adjunctUrl']}"
    date = time.strftime('%Y-%m-%d', time.localtime(report['announcementTime'] / 1000))
    # 清理文件名中的非法字符
    clean_title = report['announcementTitle'].replace('*', '').replace(':', '：').replace('?', '？').replace('/', ' ').replace('\\', ' ')
    filename = f"{date}_{clean_title}.pdf"
    filepath = os.path.join(download_dir, filename)

    try:
        response = requests.get(pdf_url, headers=HEADERS, stream=True, timeout=30)
        response.raise_for_status()

        total_size = int(response.headers.get('content-length', 0))

        # 使用tqdm显示下载进度条
        with open(filepath, 'wb') as f, tqdm(
            desc=filename,
            total=total_size,
            unit='iB',
            unit_scale=True,
            unit_divisor=1024,
        ) as bar:
            for data in response.iter_content(chunk_size=1024):
                size = f.write(data)
                bar.update(size)
        return f"下载成功: {filename}"
    except requests.exceptions.RequestException as e:
        return f"下载失败: {filename}, 错误: {e}"

def main():
    # --- 获取用户输入 ---
    while True:
        code = input("请输入需要查询的公司代码 (例如: 000001): ")
        if code:
            break
        else:
            print("公司代码不能为空，请重新输入。")

    while True:
        try:
            start_year = int(input("请输入查询的开始年份 (例如: 2020): "))
            end_year = int(input("请输入查询的结束年份 (例如: 2024): "))
            if 1990 <= start_year <= end_year <= time.localtime().tm_year:
                break
            else:
                print("年份范围无效，请重新输入。")
        except ValueError:
            print("输入无效，请输入有效的年份数字。")

    # --- 获取报告列表 ---
    reports = fetch_all_reports(code, start_year, end_year)

    if not reports:
        print("未找到任何相关报告。")
        return

    # --- 展示并选择报告 ---
    print("\n找到以下报告：")
    for i, report in enumerate(reports):
        date = time.strftime('%Y-%m-%d', time.localtime(report['announcementTime'] / 1000))
        print(f"{i + 1:2d}: {date} - {report['announcementTitle']}")

    # --- 用户选择下载项 ---
    while True:
        try:
            choice = input("\n请输入要下载的报告编号（用逗号隔开，或输入'all'下载全部），按回车键取消：\n> ")
            if not choice:
                print("操作已取消。")
                return

            selected_indices = []
            if choice.lower() == 'all':
                selected_indices = list(range(len(reports)))
                break

            parts = choice.replace(' ', '').split(',')
            for part in parts:
                if '-' in part:
                    start, end = map(int, part.split('-'))
                    selected_indices.extend(range(start - 1, end))
                else:
                    selected_indices.append(int(part) - 1)

            # 验证输入是否有效
            if all(0 <= i < len(reports) for i in selected_indices):
                break
            else:
                print("输入无效，包含超出范围的编号。请重试。")
        except ValueError:
            print("输入格式错误，请输入数字、逗号或连字符。请重试。")

    # --- 确认下载目录 ---
    download_dir = os.path.join("downloads", code)
    print(f"\n默认下载目录为: {os.path.abspath(download_dir)}")
    while True:
        confirm = input("是否使用此目录下载文件？ (y/n): ").lower()
        if confirm in ['y', 'yes']:
            os.makedirs(download_dir, exist_ok=True)
            print(f"文件将保存到: {os.path.abspath(download_dir)}")
            break
        elif confirm in ['n', 'no']:
            print("下载已取消。")
            return
        else:
            print("输入无效，请输入 'y' 或 'n'。")


    # --- 执行下载 ---
    selected_reports = [reports[i] for i in sorted(list(set(selected_indices)))]

    # Use thread pool for concurrent downloads
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(download_file, report, download_dir) for report in selected_reports]
        for future in as_completed(futures):
            print(future.result())

    print("\n所有选定任务已完成。")

if __name__ == "__main__":
    main()

请输入需要查询的公司代码 (例如: 000001): 301486
请输入查询的开始年份 (例如: 2020): 2023
请输入查询的结束年份 (例如: 2024): 2025
开始查询 301486 从 2023 年到 2025 年的财报...
DEBUG: Requesting reports for stock=301486,gssz0301486, year=2025, keyword='年度报告'
DEBUG: Requesting reports for stock=301486,gssz0301486, year=2024, keyword='年度报告'
DEBUG: Requesting reports for stock=301486,gssz0301486, year=2023, keyword='年度报告'


查询进度:  33%|███▎      | 1/3 [00:01<00:02,  1.02s/it]

DEBUG: Received response data for year=2025, keyword='年度报告': {'classifiedAnnouncements': None, 'totalSecurities': 0, 'totalAnnouncement': 0, 'totalRecordNum': 0, 'announcements': None, 'categoryList': None, 'hasMore': False, 'totalpages': 0}
DEBUG: Received response data for year=2023, keyword='年度报告': {'classifiedAnnouncements': None, 'totalSecurities': 0, 'totalAnnouncement': 0, 'totalRecordNum': 0, 'announcements': None, 'categoryList': None, 'hasMore': False, 'totalpages': 0}


查询进度: 100%|██████████| 3/3 [00:01<00:00,  1.74it/s]

DEBUG: Received response data for year=2024, keyword='年度报告': {'classifiedAnnouncements': None, 'totalSecurities': 0, 'totalAnnouncement': 0, 'totalRecordNum': 0, 'announcements': None, 'categoryList': None, 'hasMore': False, 'totalpages': 0}
未找到任何相关报告。





In [27]:
class Args:
    def __init__(self, code, start, end):
        self.code = code
        self.start = start
        self.end = end

# Replace with your desired stock code, start year, and end year
args = Args(code='000001', start=2020, end=2024)

main(args)

TypeError: main() takes 0 positional arguments but 1 was given

In [None]:
from google.colab import drive
drive.mount('/content/drive')