In [1]:
import os
from sqlalchemy import create_engine, text

# --- 1. 数据库连接配置 (使用你之前提供的参数) ---
DB_HOST = 'localhost'
DB_NAME = 'mydb'
DB_USER = 'alan-hopiy'
DB_PASS = ''
DB_PORT = '5432'

# 创建数据库连接URI
db_uri = f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

# --- 2. 您提供的完整SQL建表语句 ---
# 使用三引号 """ 来包裹多行SQL语句
SQL_SCRIPT = """
-- 如果表已存在，则先删除，方便重新运行脚本
DROP TABLE IF EXISTS stock_cap_classification;

-- 创建市值分类结果表
CREATE TABLE stock_cap_classification (
    ts_code VARCHAR(10) NOT NULL,
    name VARCHAR(50),
    industry VARCHAR(50),
    period_start_date DATE NOT NULL,
    period_end_date DATE NOT NULL,
    avg_total_mv NUMERIC(20, 4), -- 使用NUMERIC类型以保证精度，单位：万元
    classification VARCHAR(20),
    calculation_date DATE DEFAULT CURRENT_DATE, -- 记录计算当天的日期
    PRIMARY KEY (ts_code, period_start_date, period_end_date) -- 复合主键
);

-- 为表和列添加注释，增加可读性
COMMENT ON TABLE stock_cap_classification IS '存储A股按周期计算的市值分类结果';
COMMENT ON COLUMN stock_cap_classification.ts_code IS '股票代码';
COMMENT ON COLUMN stock_cap_classification.name IS '股票名称';
COMMENT ON COLUMN stock_cap_classification.industry IS '所属行业';
COMMENT ON COLUMN stock_cap_classification.period_start_date IS '市值计算周期的开始日期';
COMMENT ON COLUMN stock_cap_classification.period_end_date IS '市值计算周期的结束日期';
COMMENT ON COLUMN stock_cap_classification.avg_total_mv IS '周期内的日均总市值（万元）';
COMMENT ON COLUMN stock_cap_classification.classification IS '市值分类（大/中/小市值公司）';
COMMENT ON COLUMN stock_cap_classification.calculation_date IS '本条记录的计算日期';
"""

# --- 3. 执行SQL脚本的函数 ---
def create_table_in_db():
    """
    连接到数据库并执行SQL脚本来创建表和添加注释。
    """
    try:
        print(f"正在连接到数据库 '{DB_NAME}'...")
        engine = create_engine(db_uri)

        # 使用 with engine.connect() 来自动管理连接的开启和关闭
        with engine.connect() as connection:
            print("连接成功！准备执行SQL脚本...")
            
            # 使用事务（transaction）来确保所有SQL语句要么全部成功，要么全部失败
            # 这对于多步DDL（数据定义语言）操作是一个好习惯
            with connection.begin() as transaction:
                # SQLAlchemy 的 text() 函数用于安全地执行原始SQL
                # 它可以直接处理包含多个语句的字符串
                connection.execute(text(SQL_SCRIPT))
            
            print("--------------------------------------------------")
            print("✅ SQL脚本执行成功！")
            print("✅ 数据表 'stock_cap_classification' 已成功创建并配置。")
            print("--------------------------------------------------")

    except Exception as e:
        print("\n❌ 操作失败！发生错误：")
        print(e)

# --- 4. 运行主程序 ---
if __name__ == "__main__":
    create_table_in_db()

正在连接到数据库 'mydb'...
连接成功！准备执行SQL脚本...
--------------------------------------------------
✅ SQL脚本执行成功！
✅ 数据表 'stock_cap_classification' 已成功创建并配置。
--------------------------------------------------


In [14]:
import pandas as pd
import tushare as ts
import time
from sqlalchemy import create_engine, text
from sqlalchemy.types import VARCHAR, NUMERIC, DATE
from typing import Union  # <--- 在这里添加这一行
import os

# --- 1. 配置参数 (无变动) ---
TUSHARE_TOKEN = 'a872d82f46046d335ccf68ef591747ff66b9a9d598b40791b80f017a'
pro = ts.pro_api(TUSHARE_TOKEN)
DB_HOST = 'localhost'
DB_NAME = 'mydb'
DB_USER = 'alan-hopiy'
DB_PASS = ''
DB_PORT = '5432'
db_uri = f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
engine = create_engine(db_uri)
START_DATE = '20240101'
END_DATE = '20241231'

# --- 中间结果的存档文件名 (无变动) ---
CACHE_FILE = 'intermediate_results.parquet'

final_results = None

# --- 断点续传逻辑 (无变动) ---
if os.path.exists(CACHE_FILE):
    print(f"发现存档文件 '{CACHE_FILE}'，直接加载数据...")
    final_results = pd.read_parquet(CACHE_FILE)
    print("数据加载成功，将跳过API获取和计算步骤。")
else:
    print(f"未发现存档文件，将执行完整的数据获取流程。")

# 只有在未加载到缓存数据时，才执行下面的数据获取和处理流程
if final_results is None:
    # --- 2. 从数据库读取股票列表 (无变动) ---
    try:
        print("正在从数据库 stock_basic_info 表中读取A股上市公司列表...")
        # 核心逻辑调整：在这里加入筛选条件
        query = """
        SELECT ts_code, name, industry 
        FROM stock_basic_info 
        WHERE 
            list_date <= '20240101' AND -- 筛选掉在统计周期内上市的新股
            market IN ('主板', '创业板', '科创板') -- 暂时只包含这三个板块
        """
        stock_list = pd.read_sql(query, engine)
        print(f"经过筛选，从数据库获取 {len(stock_list)} 家符合条件的上市公司信息。")
    except Exception as e:
        print(f"从数据库获取股票列表失败，错误：{e}")
        exit()

    # --- 3. 获取数据函数 (内部逻辑不变) ---
    def fetch_daily_data_for_stock(ts_code: str) -> Union[pd.DataFrame, None]:
        try:
            # 每次调用都是一次API请求
            daily_df = pro.daily_basic(ts_code=ts_code, start_date=START_DATE, end_date=END_DATE, fields='ts_code,trade_date,total_mv')
            if not daily_df.empty:
                return daily_df
        except Exception as e:
            print(f"获取 {ts_code} Tushare数据时出错: {e}")
        return None

    # --- 修改：从多线程并行改为单线程串行获取数据 ---
    all_daily_data = []
    total_stocks = len(stock_list)
    processed_count = 0
    # 200次/分钟 -> 60秒/200次 = 0.3秒/次。设置0.33秒确保安全。
    SLEEP_INTERVAL = 0.33 

    print(f"\n开始通过【单线程串行】模式获取 {total_stocks} 只股票的日度总市值...")
    print(f"为遵守API频率限制 (200次/分钟)，每次请求后将暂停 {SLEEP_INTERVAL:.2f} 秒。")

    # 使用 for 循环代替 ThreadPoolExecutor
    for index, stock_row in stock_list.iterrows():
        ts_code = stock_row['ts_code']
        
        # 调用函数获取数据
        result_df = fetch_daily_data_for_stock(ts_code)
        
        processed_count += 1
        if result_df is not None:
            all_daily_data.append(result_df)
        
        # 打印进度 (逻辑不变)
        if processed_count % 100 == 0 or processed_count == total_stocks:
            print(f"已完成 {processed_count}/{total_stocks} ({(processed_count / total_stocks) * 100:.2f}%)")

        # 核心新增：每次循环后暂停，以控制API请求频率
        time.sleep(SLEEP_INTERVAL)

    print("\n所有股票的日度数据获取完成。")
    
    # --- 4, 5, 6. 数据处理与整合 (无变动) ---
    if not all_daily_data:
        print("未能获取到任何股票的市值数据，程序终止。")
        exit()

    market_values_df = pd.concat(all_daily_data, ignore_index=True)
    
    # 新增筛选：只保留在周期内有超过120天交易记录的股票
    trade_day_counts = market_values_df.groupby('ts_code').size()
    valid_ts_codes = trade_day_counts[trade_day_counts > 120].index
    market_values_df = market_values_df[market_values_df['ts_code'].isin(valid_ts_codes)]
    print(f"筛选后，剩余 {len(valid_ts_codes)} 只股票（年内交易日 > 120天）进入市值计算。")

    print("正在计算每家公司的年内平均市值...")
    avg_market_value = market_values_df.groupby('ts_code')['total_mv'].mean().dropna().reset_index()
    avg_market_value.rename(columns={'total_mv': 'avg_total_mv'}, inplace=True)
    print("平均市值计算完成。")
    
    print("正在根据平均市值进行大、中、小盘划分...")
    # 使用 30% 和 70% 分位数进行划分
    p30 = avg_market_value['avg_total_mv'].quantile(0.3)
    p70 = avg_market_value['avg_total_mv'].quantile(0.7)
    print(f"市值划分阈值：小市值上限 {p30:,.2f} 万元, 大市值下限 {p70:,.2f} 万元")
    def classify_market_cap(mv):
        if mv >= p70: return '大盘股'
        elif mv < p30: return '小盘股'
        else: return '中盘股'
    avg_market_value['classification'] = avg_market_value['avg_total_mv'].apply(classify_market_cap)
    print("市值划分完成。")

    print("\n正在整合最终结果...")
    final_results = pd.merge(stock_list, avg_market_value, on='ts_code', how='inner')
    final_results['period_start_date'] = pd.to_datetime(START_DATE, format='%Y%m%d').date()
    final_results['period_end_date'] = pd.to_datetime(END_DATE, format='%Y%m%d').date()
    final_results = final_results[['ts_code', 'name', 'industry', 'period_start_date', 'period_end_date', 'avg_total_mv', 'classification']]

    # --- 存档逻辑 (无变动) ---
    try:
        print(f"正在将中间结果保存到存档文件 '{CACHE_FILE}'...")
        final_results.to_parquet(CACHE_FILE, index=False)
        print("存档成功！")
    except Exception as e:
        print(f"保存存档文件失败，错误：{e}")

# --- 7. 将结果写入数据库 (无变动) ---
if final_results is not None and not final_results.empty:
    try:
        print(f"\n准备将结果写入数据库表 'stock_cap_classification'...")
        # 注意：此处 if_exists='replace' 会清空并重建表
        dtype_mapping = {'ts_code': VARCHAR(10), 'name': VARCHAR(50), 'industry': VARCHAR(50), 'period_start_date': DATE, 'period_end_date': DATE, 'avg_total_mv': NUMERIC(20, 4), 'classification': VARCHAR(20)}
        final_results.to_sql('stock_cap_classification', engine, if_exists='replace', index=False, dtype=dtype_mapping, method='multi')
        print("数据成功写入数据库！")

        with engine.connect() as connection:
            count = connection.execute(text("SELECT COUNT(1) FROM stock_cap_classification")).scalar()
            print(f"数据库表 'stock_cap_classification' 中现在共有 {count} 条记录。")

    except Exception as e:
        print(f"数据写入数据库失败，错误：{e}")
else:
    print("没有最终数据可写入数据库。")

print("\n--- 任务执行完毕 ---")

发现存档文件 'intermediate_results.parquet'，直接加载数据...
数据加载成功，将跳过API获取和计算步骤。

准备将结果写入数据库表 'stock_cap_classification'...
数据成功写入数据库！
数据库表 'stock_cap_classification' 中现在共有 5031 条记录。

--- 任务执行完毕 ---


In [21]:
import pandas as pd
from sqlalchemy import create_engine
import os
from datetime import datetime  # <--- 新增：导入datetime模块用于获取当前日期

# --- 1. 从config文件加载核心配置 ---
import config

DB_HOST = config.DB_HOST
DB_NAME = config.DB_NAME
DB_USER = config.DB_USER
DB_PASS = config.DB_PASS
DB_PORT = config.DB_PORT
output_directory = config.BASE_OUTPUT_DIR

# --- 2. 动态生成文件名 ---
# 获取当前日期的字符串，格式为 YYYYMMDD (例如 '20250611')
current_date_str = datetime.now().strftime('%Y%m%d')
# 将日期加入文件名中
output_filename = f"stock_cap_classification_{current_date_str}.xlsx"
# 组合成完整的文件路径
output_filepath = os.path.join(output_directory, output_filename)


# --- 3. 主脚本逻辑 ---
print("--- 开始从数据库导出数据到Excel ---")
print(f"本次导出的文件将保存为: {output_filename}")

# 使用加载的配置构建URI
db_uri = f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

try:
    print(f"正在连接到数据库 '{DB_NAME}'...")
    engine = create_engine(db_uri)
    
    table_name = 'stock_cap_classification'
    query = f"SELECT * FROM {table_name}"
    
    print("正在读取全部数据...")
    df = pd.read_sql(query, engine)
    print(f"成功读取 {len(df)} 行数据。")

    # 确保输出目录存在
    os.makedirs(output_directory, exist_ok=True)
    
    print(f"正在将数据写入到: {output_filepath}")
    df.to_excel(output_filepath, index=False)
    
    print("\n--- ✅ 导出成功！ ---")
    print(f"文件已保存在: {output_filepath}")

except Exception as e:
    print(f"\n--- ❌ 发生错误 ---")
    print(f"Error: {e}")

--- 开始从数据库导出数据到Excel ---
本次导出的文件将保存为: stock_cap_classification_20250611.xlsx
正在连接到数据库 'mydb'...
正在读取全部数据...
成功读取 5031 行数据。
正在将数据写入到: /Users/alan-hopiy/Documents/个人研究/基本面量化/stock_cap_classification_20250611.xlsx

--- ✅ 导出成功！ ---
文件已保存在: /Users/alan-hopiy/Documents/个人研究/基本面量化/stock_cap_classification_20250611.xlsx


In [18]:
from sqlalchemy import create_engine, text

# --- 1. 配置参数 (请确保与您的配置一致) ---
DB_HOST = 'localhost'
DB_NAME = 'mydb'
DB_USER = 'alan-hopiy'
DB_PASS = ''  # 无密码
DB_PORT = '5432'
TABLE_NAME = 'stock_cap_classification'

# 构建数据库连接URI
db_uri = f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

# --- 2. 定义SQL更新语句 ---
# 使用 CASE 语句可以一次性完成所有替换，效率最高
update_sql = f"""
UPDATE {TABLE_NAME}
SET classification = CASE
    WHEN classification = '大盘股' THEN 'Large'
    WHEN classification = '中盘股' THEN 'Mid'
    WHEN classification = '小盘股' THEN 'Small'
    ELSE classification -- 保留其他可能的值不变
END
WHERE classification IN ('大盘股', '中盘股', '小盘股'); -- 只对需要修改的行进行操作，提高效率
"""

print("--- 开始更新数据库中的分类名称 ---")

try:
    engine = create_engine(db_uri)
    with engine.connect() as connection:
        # 开始一个事务
        with connection.begin() as transaction:
            print("正在执行更新操作...")
            # 执行更新语句
            result = connection.execute(text(update_sql))
            print(f"操作成功！共影响了 {result.rowcount} 行数据。")
            # 事务在此处自动提交

    print("\n--- 更新完成，正在验证结果 ---")

    # --- 3. 验证更新后的结果 ---
    with engine.connect() as connection:
        verify_sql = f"SELECT DISTINCT classification FROM {TABLE_NAME};"
        verification_result = connection.execute(text(verify_sql)).fetchall()
        
        print("数据库中 'classification' 列当前所有的值为:")
        for row in verification_result:
            print(f"- {row[0]}")

    print("\n✅ 任务成功完成！")

except Exception as e:
    print(f"\n❌ 操作失败，发生错误: {e}")

--- 开始更新数据库中的分类名称 ---
正在执行更新操作...
操作成功！共影响了 5031 行数据。

--- 更新完成，正在验证结果 ---
数据库中 'classification' 列当前所有的值为:
- Small
- Large
- Mid

✅ 任务成功完成！


In [19]:
import pandas as pd
from sqlalchemy import create_engine

# --- 1. 配置参数 (请确保与您的配置一致) ---
DB_HOST = 'localhost'
DB_NAME = 'mydb'
DB_USER = 'alan-hopiy'
DB_PASS = ''  # 无密码
DB_PORT = '5432'
TABLE_NAME = 'stock_cap_classification'

# 构建数据库连接URI
db_uri = f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

print(f"--- 准备从数据库表 '{TABLE_NAME}' 中读取前10行数据 ---")

try:
    # 创建数据库引擎
    engine = create_engine(db_uri)

    # 定义SQL查询语句，使用 LIMIT 10 获取前10条记录
    query = f"SELECT * FROM {TABLE_NAME} LIMIT 10;"

    print("正在连接数据库并执行查询...")
    
    # 使用pandas读取SQL查询结果
    df_head = pd.read_sql(query, engine)

    print("\n✅ 查询成功！以下是表的前10行内容：\n")
    
    # 直接打印DataFrame，pandas会自动进行格式化，非常清晰
    print(df_head)

except Exception as e:
    print(f"\n❌ 操作失败，发生错误: {e}")

--- 准备从数据库表 'stock_cap_classification' 中读取前10行数据 ---
正在连接数据库并执行查询...

✅ 查询成功！以下是表的前10行内容：

     ts_code  name industry period_start_date period_end_date  avg_total_mv  \
0  000020.SZ  深华发A      元器件        2024-01-01      2024-12-31       36.0873   
1  000021.SZ   深科技      元器件        2024-01-01      2024-12-31      238.6745   
2  000025.SZ   特力A      综合类        2024-01-01      2024-12-31       66.7836   
3  000026.SZ   飞亚达       服饰        2024-01-01      2024-12-31       40.3415   
4  000027.SZ  深圳能源     火力发电        2024-01-01      2024-12-31      319.4096   
5  000028.SZ  国药一致     医药商业        2024-01-01      2024-12-31      171.7735   
6  000029.SZ  深深房A     区域地产        2024-01-01      2024-12-31      124.7122   
7  000030.SZ  富奥股份     汽车配件        2024-01-01      2024-12-31       89.3700   
8  000031.SZ   大悦城     全国地产        2024-01-01      2024-12-31      118.3465   
9  000032.SZ  深桑达A     建筑工程        2024-01-01      2024-12-31      190.0538   

  classification  
0            Mid  
1

In [17]:
import pandas as pd
from sqlalchemy import create_engine, text

# --- 1. 配置参数 (请确保与您的配置一致) ---
DB_HOST = 'localhost'
DB_NAME = 'mydb'
DB_USER = 'alan-hopiy'
DB_PASS = ''  # 无密码
DB_PORT = '5432'
TABLE_NAME = 'stock_cap_classification'
COLUMN_TO_UPDATE = 'avg_total_mv'

# 构建数据库连接URI
db_uri = f'postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

# --- 2. 定义SQL更新语句 ---
# 将目标列的每一个值都除以10000
update_sql = f"""
UPDATE {TABLE_NAME}
SET {COLUMN_TO_UPDATE} = {COLUMN_TO_UPDATE} / 10000;
"""

print(f"--- 开始将 '{TABLE_NAME}' 表中 '{COLUMN_TO_UPDATE}' 列的单位从 '万' 修改为 '亿' ---")

try:
    engine = create_engine(db_uri)
    with engine.connect() as connection:
        # 使用事务确保操作的原子性
        with connection.begin() as transaction:
            print("正在执行单位更新操作...")
            # 执行更新
            result = connection.execute(text(update_sql))
            print(f"操作成功！共更新了 {result.rowcount} 行数据。")
            # 事务在此处自动提交

    print("\n--- 更新完成，正在查询前10行数据以验证结果 ---")

    # --- 3. 验证更新后的结果 ---
    with engine.connect() as connection:
        # 查询前10行，重点关注修改后的 avg_total_mv 列
        verify_sql = f"SELECT ts_code, name, avg_total_mv, classification FROM {TABLE_NAME} LIMIT 10;"
        df_new = pd.read_sql(verify_sql, engine)
        
        print("✅ 单位修改成功！以下是更新后的数据示例（市值单位已是'亿'）:\n")
        # 设置pandas显示格式，让小数更清晰
        pd.options.display.float_format = '{:,.4f}'.format
        print(df_new)


except Exception as e:
    print(f"\n❌ 操作失败，发生错误: {e}")

--- 开始将 'stock_cap_classification' 表中 'avg_total_mv' 列的单位从 '万' 修改为 '亿' ---
正在执行单位更新操作...
操作成功！共更新了 5031 行数据。

--- 更新完成，正在查询前10行数据以验证结果 ---
✅ 单位修改成功！以下是更新后的数据示例（市值单位已是'亿'）:

     ts_code  name  avg_total_mv classification
0  000020.SZ  深华发A       36.0873            中盘股
1  000021.SZ   深科技      238.6745            大盘股
2  000025.SZ   特力A       66.7836            中盘股
3  000026.SZ   飞亚达       40.3415            中盘股
4  000027.SZ  深圳能源      319.4096            大盘股
5  000028.SZ  国药一致      171.7735            大盘股
6  000029.SZ  深深房A      124.7122            大盘股
7  000030.SZ  富奥股份       89.3700            大盘股
8  000031.SZ   大悦城      118.3465            大盘股
9  000032.SZ  深桑达A      190.0538            大盘股
