In [3]:
# 导入依赖库
import akshare as ak
import pandas as pd
import matplotlib.pyplot as plt
import os
from datetime import datetime
import numpy as np
import seaborn as sns

# 设置中文字体支持
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题

# 配置项目路径
BASE_DIR = os.path.abspath('.')
DATA_DIR = os.path.join(BASE_DIR, "data/raw")
RESULT_DIR = os.path.join(BASE_DIR, "results")
FIG_DIR = os.path.join(RESULT_DIR, "figures")
TABLE_DIR = os.path.join(RESULT_DIR, "tables")

# 创建目录
for dir_path in [DATA_DIR, RESULT_DIR, FIG_DIR, TABLE_DIR]:
    os.makedirs(dir_path, exist_ok=True)

In [4]:
# ============== 数据下载模块 ==============
def download_stock_data():
    """下载数据并打印实际字段（关键调试步骤）"""
    print("开始下载数据...")
    try:
        sh_list = ak.stock_info_sh_name_code()  # 上交所接口
        sz_list = ak.stock_info_sz_name_code()  # 深交所接口
        bj_list = ak.stock_info_bj_name_code()  # 北交所接口
        
        # 打印各接口实际字段和记录数（必须执行！）
        print("\n=== 原始数据 ===")
        print(f"上交所: {len(sh_list)}条 | 字段: {sh_list.columns.tolist()}")
        print(f"深交所: {len(sz_list)}条 | 字段: {sz_list.columns.tolist()}")
        print(f"北交所: {len(bj_list)}条 | 字段: {bj_list.columns.tolist()}")
        
        # 保存原始数据
        sh_list.to_csv(os.path.join(DATA_DIR, "shanghai_stocks.csv"), index=False)
        sz_list.to_csv(os.path.join(DATA_DIR, "shenzhen_stocks.csv"), index=False)
        bj_list.to_csv(os.path.join(DATA_DIR, "beijing_stocks.csv"), index=False)
        
        return sh_list, sz_list, bj_list
    except Exception as e:
        print(f"数据下载失败: {str(e)}")
        return None, None, None

In [5]:
# ============== 数据清洗模块 ==============
def clean_data(sh_list, sz_list, bj_list):
    """根据实际字段清洗数据（最终修正版）"""
    print("\n开始数据清洗...")
    
    # ================== 上交所处理 ==================
    print(f"上交所实际列名: {sh_list.columns.tolist()}, 列数: {len(sh_list.columns)}")
    
    # 上交所列处理（根据实际列数动态处理）
    if len(sh_list.columns) == 4:  # 新版本接口
        sh_list.columns = ["代码", "名称", "上市日期", "行业"]
        sh_list["板块"] = "上交所"
        sh_list["交易所"] = "SSE"
    elif len(sh_list.columns) == 6:  # 旧版本接口
        sh_list.columns = ["代码", "名称", "上市日期", "行业", "板块", "交易所"]
    else:
        raise ValueError(f"未知的上交所数据结构，列数: {len(sh_list.columns)}")
    
    print(f"上交所样本:\n{sh_list.head(1)}")
    
    # ================== 深交所处理 ==================
    print(f"深交所实际列名: {sz_list.columns.tolist()}, 列数: {len(sz_list.columns)}")
    
    # 深交所列处理（动态适应不同版本）
    if "交易所" in sz_list.columns:  # 如果有交易所列
        sz_cols = ["板块", "A股代码", "A股简称", "A股上市日期", "所属行业", "交易所"]
    else:  # 如果没有交易所列
        sz_cols = ["板块", "A股代码", "A股简称", "A股上市日期", "所属行业"]
    
    try:
        sz_list = sz_list[sz_cols].copy()
        if len(sz_cols) == 5:  # 如果只有5列
            sz_list["交易所"] = "SZSE"  # 手动添加交易所列
        
        sz_list.columns = ["板块", "代码", "名称", "上市日期", "行业", "交易所"]
    except KeyError as e:
        print(f"深交所列匹配失败，实际列: {sz_list.columns.tolist()}")
        raise
    
    print(f"深交所样本:\n{sz_list.head(1)}")
    
    # ================== 北交所处理 ==================
    print(f"北交所实际列名: {bj_list.columns.tolist()}, 列数: {len(bj_list.columns)}")
    
    # 北交所列处理
    bj_cols = ["证券代码", "证券简称", "上市日期", "所属行业"]
    if "交易所" in bj_list.columns:
        bj_cols.append("交易所")
    
    bj_list = bj_list[bj_cols].copy()
    bj_list.columns = ["代码", "名称", "上市日期", "行业", "交易所"] if len(bj_cols) == 5 else ["代码", "名称", "上市日期", "行业"]
    
    if "交易所" not in bj_list.columns:
        bj_list["交易所"] = "BJSE"
    bj_list["板块"] = "北交所"
    
    print(f"北交所样本:\n{bj_list.head(1)}")
    
    # ================== 合并数据 ==================
    all_stocks = pd.concat([sh_list, sz_list, bj_list], ignore_index=True)
    print(f"\n合并后总数(初始): {len(all_stocks)}")
    
    # ================== 日期处理 ==================
    def safe_parse_date(date_str):
        try:
            # 尝试多种日期格式
            for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y%m%d"]:
                try:
                    return datetime.strptime(str(date_str), fmt).year
                except:
                    continue
            return None
        except:
            return None
    
    all_stocks["上市年份"] = all_stocks["上市日期"].apply(safe_parse_date)
    
    # 缺失值统计
    missing = all_stocks["上市年份"].isnull().sum()
    print(f"无法解析的日期数量: {missing} ({missing/len(all_stocks):.1%})")
    
    # 删除无效记录
    all_stocks = all_stocks[all_stocks["上市年份"].notna() | all_stocks["上市日期"].notna()]
    print(f"清洗后有效记录: {len(all_stocks)}")
    
    return all_stocks

In [6]:
# ============== 统计分析模块 ==============
def analyze_data(all_stocks):
    """执行统计分析（增强版）"""
    print("\n开始统计分析...")
    
    # 1. 各年度总数
    annual_count = all_stocks.groupby("上市年份")["代码"].count().reset_index()
    annual_count.columns = ["年份", "上市公司总数"]
    
    # 2. 交易所+板块分布
    exchange_board = all_stocks.groupby(["交易所", "板块"])["代码"].count().unstack(fill_value=0)
    
    # 3. 行业分析
    latest_year = all_stocks["上市年份"].max()
    industry_stats = {
        "latest_top10": all_stocks[all_stocks["上市年份"]==latest_year]
                          .groupby("行业")["代码"].count()
                          .sort_values(ascending=False).head(10),
        "all_top10": all_stocks.groupby("行业")["代码"].count()
                      .sort_values(ascending=False).head(10)
    }
    
    # 4. 保存完整数据
    all_stocks.to_csv(os.path.join(DATA_DIR, "all_stocks_cleaned.csv"), index=False)
    
    return {
        "annual": annual_count,
        "exchange_board": exchange_board,
        "industry": industry_stats,
        "total_companies": len(all_stocks),
        "latest_year": latest_year
    }

In [7]:
# ============== 可视化模块 ==============
def visualize_results(results):
    """生成图表（增强版）"""
    print("\n生成可视化图表...")
    
    # 1. 年度趋势图
    plt.figure(figsize=(14, 7))
    plt.plot(results["annual"]["年份"], results["annual"]["上市公司总数"], 
             marker='o', linestyle='-', color='#1f77b4', linewidth=2)
    plt.title(f"中国上市公司数量年度趋势（总计{results['total_companies']}家）", fontsize=14)
    plt.xlabel("年份", fontsize=12)
    plt.ylabel("公司数量", fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.savefig(os.path.join(FIG_DIR, "annual_trend.png"), bbox_inches='tight', dpi=300)
    plt.close()
    
    # 2. 板块分布
    results["exchange_board"].plot(kind='bar', stacked=True, figsize=(12, 7), 
                                 colormap='viridis')
    plt.title("各交易所板块分布", fontsize=14)
    plt.xlabel("交易所", fontsize=12)
    plt.ylabel("公司数量", fontsize=12)
    plt.legend(title="板块", bbox_to_anchor=(1.05, 1))
    plt.savefig(os.path.join(FIG_DIR, "board_distribution.png"), 
                bbox_inches='tight', dpi=300)
    plt.close()
    
    # 3. 行业分布
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 7))
    
    results["industry"]["latest_top10"].plot(kind='barh', ax=ax1, color='#2ca02c')
    ax1.set_title(f"{results['latest_year']}年新增上市公司行业TOP10", fontsize=12)
    ax1.set_xlabel("公司数量")
    
    results["industry"]["all_top10"].plot(kind='barh', ax=ax2, color='#d62728')
    ax2.set_title("全市场行业分布TOP10", fontsize=12)
    ax2.set_xlabel("公司数量")
    
    plt.tight_layout()
    plt.savefig(os.path.join(FIG_DIR, "industry_comparison.png"), dpi=300)
    plt.close()


In [8]:
# ============== 主程序 ==============
if __name__ == "__main__":
    # 1. 下载数据
    sh_data, sz_data, bj_data = download_stock_data()
    
    if sh_data is not None:
        # 2. 数据清洗
        cleaned_data = clean_data(sh_data, sz_data, bj_data)
        
        # 3. 统计分析
        analysis_results = analyze_data(cleaned_data)
        print(f"\n=== 分析结果 ===")
        print(f"全市场上市公司总数: {analysis_results['total_companies']}")
        print(f"最新年度: {analysis_results['latest_year']}")
        print("\n各交易所板块分布:")
        print(analysis_results["exchange_board"])
        
        # 4. 可视化
        visualize_results(analysis_results)
        
        # 5. 保存结果
        analysis_results["annual"].to_csv(os.path.join(TABLE_DIR, "annual_count.csv"), index=False)
        analysis_results["exchange_board"].to_csv(os.path.join(TABLE_DIR, "exchange_board.csv"))
        analysis_results["industry"]["latest_top10"].to_frame().to_csv(
            os.path.join(TABLE_DIR, "latest_industry_top10.csv"))
        
        print("\n分析完成！结果已保存至以下目录:")
        print(f"数据文件: {DATA_DIR}")
        print(f"图表文件: {FIG_DIR}")
        print(f"统计表格: {TABLE_DIR}")
    else:
        print("数据下载失败，请检查网络连接或AKShare接口状态")

开始下载数据...


  0%|          | 0/14 [00:00<?, ?it/s]


=== 原始数据 ===
上交所: 1695条 | 字段: ['证券代码', '证券简称', '公司全称', '上市日期']
深交所: 2867条 | 字段: ['板块', 'A股代码', 'A股简称', 'A股上市日期', 'A股总股本', 'A股流通股本', '所属行业']
北交所: 266条 | 字段: ['证券代码', '证券简称', '总股本', '流通股本', '上市日期', '所属行业', '地区', '报告日期']

开始数据清洗...
上交所实际列名: ['证券代码', '证券简称', '公司全称', '上市日期'], 列数: 4
上交所样本:
       代码    名称            上市日期          行业   板块  交易所
0  600000  浦发银行  上海浦东发展银行股份有限公司  1999-11-10  上交所  SSE
深交所实际列名: ['板块', 'A股代码', 'A股简称', 'A股上市日期', 'A股总股本', 'A股流通股本', '所属行业'], 列数: 7
深交所样本:
   板块      代码    名称        上市日期     行业   交易所
0  主板  000001  平安银行  1991-04-03  J 金融业  SZSE
北交所实际列名: ['证券代码', '证券简称', '总股本', '流通股本', '上市日期', '所属行业', '地区', '报告日期'], 列数: 8
北交所样本:
       代码    名称        上市日期     行业   交易所   板块
0  430017  星昊医药  2023-05-31  医药制造业  BJSE  北交所

合并后总数(初始): 4828
无法解析的日期数量: 1695 (35.1%)
清洗后有效记录: 4828

开始统计分析...

=== 分析结果 ===
全市场上市公司总数: 4828
最新年度: 2025.0

各交易所板块分布:
板块     上交所    主板   创业板  北交所
交易所                        
BJSE     0     0     0  266
SSE   1695     0     0    0
SZSE     0  1486  1381  