In [None]:
import requests  # 用于发送 HTTP 请求
import json  # 用于解析 JSON 数据
import time
from datetime import datetime, timezone  # 用于计算时间戳和日期
from sqlalchemy import create_engine, text, MetaData, Table, Column, String, Float, Date  # SQLAlchemy 用于数据库操作和表定义（添加 text）

# 数据库引擎配置（使用您的连接字符串）
DATABASE_URL = 'postgresql+psycopg2://postgres:admin1234@localhost:5432/bootcamp_2504p'
engine = create_engine(DATABASE_URL)

# 定义 MetaData（用于管理表结构）
metadata = MetaData()

# 定义 stock_ohlcv 表（symbol 作为 PRIMARY KEY）
stock_ohlcv_table = Table(
    'stock_ohlcv',
    metadata,
    Column('symbol', String(10), primary_key=True),  # symbol 作为主键（VARCHAR(10)）
    Column('date', Date, primary_key=True, nullable=False),  # 日期
    Column('open', Float, nullable=True),  # 开盘价
    Column('high', Float, nullable=True),  # 最高价
    Column('low', Float, nullable=True),  # 最低价
    Column('close', Float, nullable=True),  # 收盘价
    Column('volume', Float, nullable=True)  # 交易量
)

# 步骤 1: 创建表（如果不存在）
def create_table():
    metadata.create_all(engine)  # 自动创建表
    print("表 stock_ohlcv 创建完成（如果需要）。")

# 步骤 2: 从数据库加载 symbols
def load_symbols():
    with engine.connect() as conn:
        try:
            result = conn.execute(text("SELECT symbol FROM stocks"))
            symbols = [row['symbol'] for row in result.mappings()]
            print(f"从数据库加载 {len(symbols)} 个股票代码: {symbols}")
            return symbols
        except Exception as e:
            print(f"加载股票代码失败: {e}")
            return []

# 步骤 3: 从 Yahoo Finance API 获取 JSON 数据
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def get_json_from_api(symbol, period1, period2, interval='1d', events='history'):
    url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}?period1={period1}&period2={period2}&interval={interval}&events={events}"
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
    session = requests.Session()
    retry_strategy = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    try:
        response = session.get(url, headers=headers, timeout=30)  # 30秒超时
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"API 请求失败 ({symbol}): {e}")
        return None

# 步骤 4: 解析 JSON 并保存到 stock_ohlcv 表（修复：使用 text() 执行 SQL）
def save_to_postgres(symbol, json_data):
    # 数据验证
    if not json_data or 'chart' not in json_data or 'result' not in json_data['chart'] or not json_data['chart']['result']:
        print(f"无效的 JSON 数据 ({symbol})")
        return 0

    # 提取 result 数据
    result = json_data['chart']['result'][0]
    timestamps = result.get('timestamp', [])
    quotes = result['indicators']['quote'][0] if 'indicators' in result and 'quote' in result['indicators'] else {}

    # 提取 OHLCV 数据
    open_prices = quotes.get('open', [])
    high_prices = quotes.get('high', [])
    low_prices = quotes.get('low', [])
    close_prices = quotes.get('close', [])
    volumes = quotes.get('volume', [])

    # 检查数据是否为空
    if not timestamps or len(timestamps) == 0:
        print(f"没有数据 ({symbol})")
        return 0

    # 验证数据长度一致性
    data_length = len(timestamps)
    if not all(len(lst) == data_length for lst in [open_prices, high_prices, low_prices, close_prices, volumes]):
        print(f"数据长度不一致 ({symbol})")
        return 0

    # 准备批量插入数据
    records = []
    for ts, open_price, high, low, close, volume in zip(timestamps, open_prices, high_prices, low_prices, close_prices, volumes):
        date = datetime.fromtimestamp(ts, tz=timezone.utc).date()
        records.append({
            'symbol': symbol,
            'date': date,
            'open': open_price if open_price is not None else None,
            'high': high if high is not None else None,
            'low': low if low is not None else None,
            'close': close if close is not None else None,
            'volume': volume if volume is not None else None
        })

    # 批量插入到数据库
    with engine.connect() as conn:
        try:
            conn.execute(
                text("""
                    INSERT INTO stock_ohlcv (symbol, date, open, high, low, close, volume)
                    VALUES (:symbol, :date, :open, :high, :low, :close, :volume)
                    ON CONFLICT (symbol, date) DO NOTHING;
                """),
                records
            )
            conn.commit()
            print(f"成功插入 {len(records)} 条数据 ({symbol})")
            return len(records)
        except Exception as e:
            print(f"插入数据失败 ({symbol}): {e}")
            conn.rollback()
            return 0

# 主程序
if __name__ == "__main__":
    # 计算 period1 和 period2 以覆盖 2022-07-08 全天（UTC）
    # period1: 2022-07-08 00:00 UTC (1657238400)
    # period2: 2022-07-09 00:00 UTC (1657324800)
    start_dt = datetime(2022, 7, 1, 0, 0, 0, tzinfo=timezone.utc)
    end_dt = datetime.now()
    period1 = int(start_dt.timestamp())
    period2 = int(end_dt.timestamp())
    
    # 如果您坚持使用相同的 period1=period2=1657287000，取消注释以下行：
    # period1 = 1657287000
    # period2 = 1657287000
    
    # 创建表
    create_table()

    # 从数据库加载 symbols
    symbols = load_symbols()
    if not symbols:
        print("没有可用的股票代码，退出程序")
        exit()

    # 循环处理每个股票，添加延时
    total_records = 0
    for symbol in symbols:
        print(f"处理股票: {symbol}")
        json_data = get_json_from_api(symbol, period1, period2)
        if json_data:
            total_records += save_to_postgres(symbol, json_data)
        time.sleep(3)  # 添加 1 秒延时，避免 API 速率限制
    print(f"总共插入 {total_records} 条数据")


表 stock_ohlcv 创建完成（如果需要）。
成功插入 799 条数据 (AOS)
