## 处理 WebSocket 获取的单笔交易，统计为 Aggregated Data

In [1]:
import json
import threading
import websocket
import time
import csv
import os
import pytz
import logging
import requests
import pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict

### 配置参数

In [2]:
# 要订阅的股票列表
SYMBOLS = ["AAPL", "GOOGL", "TSLA"]

# 统计间隔
INTERVALS = {
    "1min": 1,
    "5min": 5,
    "15min": 15,
    "30min": 30,
    "1h": 60
}

# 用于填补数据的最小间隔(1分钟)
MIN_INTERVAL = "1min"

# API Key
FINNHUB_TOKEN = "cvop3lhr01qihjtq3uvgcvop3lhr01qihjtq3v00"

# REST API URL
REST_URL = "https://finnhub.io/api/v1/stock/candle"

### 初始化准备

In [3]:
WS_URL = f"wss://ws.finnhub.io?token={FINNHUB_TOKEN}"

# 确保必要的目录存在
os.makedirs("./data/logs", exist_ok=True)
for interval in INTERVALS.keys():
    directory = f"./data/raw/{interval}"
    os.makedirs(directory, exist_ok=True)

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(threadName)s - %(levelname)s - \"%(message)s\"',
    handlers=[
        logging.FileHandler("./data/logs/stock_data.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 聚合数据存储 - 每个时间间隔一个存储
# 结构: interval_data[interval][symbol][timestamp] = {stats}
interval_data = {interval: defaultdict(lambda: defaultdict(dict)) for interval in INTERVALS.keys()}

# 当前周期的交易数据缓存 - 每个时间间隔一个缓存
# 结构: current_period_data[interval][symbol] = {trades, stats...}
current_period_data = {}
for interval in INTERVALS.keys():
    current_period_data[interval] = defaultdict(lambda: {
        'trades': [],
        'first_trade_price': None,
        'high': float('-inf'),
        'low': float('inf'),
        'volume': 0,
        'value': 0,  # price * volume 的总和，用于计算VWAP
        'count': 0,
    })

# 程序启动标志，用于初始数据补齐
is_first_run = True

### Helper Methods

In [4]:
def get_eastern_time():
    """获取当前美国东部时间"""
    eastern_tz = pytz.timezone('America/New_York')
    return datetime.now(pytz.UTC).astimezone(eastern_tz)

def get_interval_timestamp(dt, interval_minutes):
    """
    将datetime对象转换为指定间隔的时间戳字符串，向下取整到该间隔
    
    参数:
    - dt: datetime对象
    - interval_minutes: 间隔分钟数
    
    返回:
    - 格式化的时间戳字符串
    """
    minutes_since_midnight = dt.hour * 60 + dt.minute
    floor_minutes = (minutes_since_midnight // interval_minutes) * interval_minutes
    new_hour = floor_minutes // 60
    new_minute = floor_minutes % 60
    dt_floored = dt.replace(hour=new_hour, minute=new_minute, second=0, microsecond=0)
    
    # 格式化为时间戳字符串
    return dt_floored.strftime('%Y-%m-%d %H:%M:00')

def get_interval_timestamp_from_millis(ts_millis, interval_minutes):
    """将毫秒时间戳转换为指定间隔的时间戳字符串，向下取整到该间隔"""
    dt_utc = datetime.fromtimestamp(ts_millis / 1000, tz=pytz.UTC)
    eastern_tz = pytz.timezone('America/New_York')
    dt_eastern = dt_utc.astimezone(eastern_tz)
    
    return get_interval_timestamp(dt_eastern, interval_minutes)

def get_next_interval_time(interval_minutes):
    """计算下一个时间间隔的开始时间"""
    now = get_eastern_time()
    minutes_since_midnight = now.hour * 60 + now.minute
    next_interval_minutes = ((minutes_since_midnight // interval_minutes) + 1) * interval_minutes
    
    next_hour = (next_interval_minutes // 60) % 24
    next_minute = next_interval_minutes % 60
    
    # 如果跨天
    days_to_add = 0
    if next_hour < now.hour:
        days_to_add = 1
        
    next_time = now.replace(
        day=now.day + days_to_add,
        hour=next_hour, 
        minute=next_minute, 
        second=0, 
        microsecond=0
    )
    
    return next_time

### 数据补齐

In [5]:
def fetch_historical_data(symbol, start_timestamp, end_timestamp, resolution):
    """
    通过REST API获取历史K线数据，用于补齐缺失的数据
    
    参数:
    - symbol: 股票代码
    - start_timestamp: 开始时间戳（秒）
    - end_timestamp: 结束时间戳（秒）
    - resolution: 时间粒度，对应INTERVALS中的key
    
    返回:
    - 包含K线数据的DataFrame或None（请求失败）
    """
    interval_mapping = {
        '1min': '1',
        '5min': '5',
        '15min': '15',
        '30min': '30',
        '1h': '60'
    }
    
    if resolution not in interval_mapping:
        logger.error(f"不支持的时间间隔: {resolution}")
        return None
        
    params = {
        'symbol': symbol,
        'resolution': interval_mapping[resolution],
        'from': int(start_timestamp),
        'to': int(end_timestamp),
        'token': FINNHUB_TOKEN
    }
    
    logger.info(f"获取历史数据: {symbol}, 从 {datetime.fromtimestamp(start_timestamp)} 到 {datetime.fromtimestamp(end_timestamp)}, 间隔: {resolution}")
    
    try:
        response = requests.get(REST_URL, params=params)
        if response.status_code != 200:
            logger.error(f"历史数据请求失败: {response.status_code} - {response.text}")
            return None
            
        data = response.json()
        if data.get('s') != 'ok':
            logger.warning(f"未获取到历史数据: {data}")
            return None
            
        # 转换为DataFrame
        df = pd.DataFrame({
            'timestamp': pd.to_datetime(data['t'], unit='s'),
            'open': data['o'],
            'high': data['h'],
            'low': data['l'],
            'close': data['c'],
            'volume': data['v'],
            'symbol': symbol
        })
        
        if df.empty:
            logger.warning(f"获取的历史数据为空: {symbol}")
            return None
            
        return df
        
    except Exception as e:
        logger.error(f"获取历史数据时发生异常: {e}")
        return None

def fill_initial_interval_data():
    """
    程序启动时，补齐当前不完整时间间隔的数据
    
    例如，如果程序在10:03启动，而我们需要5分钟间隔的数据，
    这个函数会获取10:00-10:03的1分钟数据，并将其聚合添加到当前周期
    """
    logger.info("开始补齐当前时间间隔的初始数据...")
    
    # 获取当前时间
    now = get_eastern_time()
    
    # 对每个时间间隔和股票执行数据补齐
    for interval_key, interval_minutes in INTERVALS.items():
        # 计算当前间隔的开始时间（向下取整）
        current_interval_start_str = get_interval_timestamp(now, interval_minutes)
        current_interval_start = datetime.strptime(current_interval_start_str, '%Y-%m-%d %H:%M:%S')
        current_interval_start = current_interval_start.replace(tzinfo=now.tzinfo)
        
        # 如果现在的时间就是间隔的开始，则不需要补齐
        if now.minute == current_interval_start.minute and now.hour == current_interval_start.hour:
            logger.info(f"当前时间正好是 {interval_key} 间隔的开始，无需补齐")
            continue
            
        # 计算开始和结束时间戳
        start_timestamp = int(current_interval_start.timestamp())
        end_timestamp = int(now.timestamp())
        
        logger.info(f"补齐 {interval_key} 数据: 从 {current_interval_start_str} 到 {now.strftime('%Y-%m-%d %H:%M:%S')}")
        
        # 对每个股票执行补齐
        for symbol in SYMBOLS:
            # 获取较小间隔（1分钟）的历史数据
            historical_df = fetch_historical_data(symbol, start_timestamp, end_timestamp, MIN_INTERVAL)
            
            if historical_df is not None and not historical_df.empty:
                # 初始化聚合数据
                agg_data = {
                    'open': None,
                    'high': float('-inf'),
                    'low': float('inf'),
                    'volume': 0,
                    'value': 0,  # 用于计算VWAP
                    'count': 0,
                }
                
                # 处理每一条历史1分钟数据
                for _, row in historical_df.iterrows():
                    # 第一条记录的开盘价作为区间开盘价
                    if agg_data['open'] is None:
                        agg_data['open'] = row['open']
                    
                    # 更新最高价和最低价
                    agg_data['high'] = max(agg_data['high'], row['high'])
                    agg_data['low'] = min(agg_data['low'], row['low'])
                    
                    # 累计成交量
                    agg_data['volume'] += row['volume']
                    
                    # 累计价值（用于VWAP计算）
                    agg_data['value'] += row['close'] * row['volume']
                    
                    # 累计交易数量
                    agg_data['count'] += 1
                
                # 只有在有数据的情况下更新
                if agg_data['count'] > 0:
                    # 计算VWAP
                    vwap = agg_data['value'] / agg_data['volume'] if agg_data['volume'] > 0 else 0
                    
                    # 将聚合数据应用到当前周期数据
                    if current_period_data[interval_key][symbol]['first_trade_price'] is None:
                        current_period_data[interval_key][symbol]['first_trade_price'] = agg_data['open']
                    
                    current_period_data[interval_key][symbol]['high'] = max(
                        current_period_data[interval_key][symbol]['high'], 
                        agg_data['high']
                    )
                    
                    current_period_data[interval_key][symbol]['low'] = min(
                        current_period_data[interval_key][symbol]['low'], 
                        agg_data['low']
                    )
                    
                    current_period_data[interval_key][symbol]['volume'] += agg_data['volume']
                    current_period_data[interval_key][symbol]['value'] += agg_data['value']
                    current_period_data[interval_key][symbol]['count'] += agg_data['count']
                    
                    # 添加一个伪交易记录，最后一分钟的收盘价
                    if historical_df.shape[0] > 0:
                        last_row = historical_df.iloc[-1]
                        current_period_data[interval_key][symbol]['trades'].append({
                            'price': last_row['close'],
                            'volume': last_row['volume'],
                            'timestamp': int(last_row['timestamp'].timestamp() * 1000)
                        })
                    
                    logger.info(f"已成功补齐 {symbol} 的 {interval_key} 初始数据: 开盘价={agg_data['open']}, 最高价={agg_data['high']}, 最低价={agg_data['low']}, 成交量={agg_data['volume']}")
                else:
                    logger.warning(f"未找到 {symbol} 的 {interval_key} 初始数据")
            else:
                logger.warning(f"无法获取 {symbol} 的历史数据进行初始补齐")
    
    logger.info("初始数据补齐完成")

### 数据聚合与写入

In [6]:
def aggregate_interval_data(interval_key, interval_minutes):
    """聚合指定时间间隔的所有交易数据，生成统计"""
    now = get_eastern_time()
    interval_timestamp = get_interval_timestamp(now, interval_minutes)
    
    for symbol, data in current_period_data[interval_key].items():
        if not data['trades']:
            continue  # 跳过没有交易的股票
            
        # 计算统计数据
        open_price = data['first_trade_price']
        close_price = data['trades'][-1]['price'] if data['trades'] else None
        high_price = data['high']
        low_price = data['low']
        volume = data['volume']
        vwap = data['value'] / volume if volume > 0 else 0
        num_transactions = data['count']
        
        # 存储聚合数据
        interval_data[interval_key][symbol][interval_timestamp] = {
            'timestamp': interval_timestamp,
            'open': open_price,
            'high': high_price,
            'low': low_price,
            'close': close_price,
            'vwap': vwap,
            'volume': volume,
            'num_transactions': num_transactions
        }
        
        # 将聚合数据写入CSV
        write_to_csv(interval_key, symbol, interval_timestamp, interval_data[interval_key][symbol][interval_timestamp])
        
        # 重置当前周期数据
        current_period_data[interval_key][symbol] = {
            'trades': [],
            'first_trade_price': None,
            'high': float('-inf'),
            'low': float('inf'),
            'volume': 0,
            'value': 0,
            'count': 0,
        }

def write_to_csv(interval_key, symbol, timestamp, stats):
    """将聚合数据写入CSV文件"""
    directory = f"./data/raw/{interval_key}"
    filename = f"{directory}/{symbol}_{interval_key}.csv"
    file_exists = os.path.isfile(filename)
    
    with open(filename, 'a', newline='') as csvfile:
        fieldnames = ['timestamp', 'open', 'high', 'low', 'close', 'vwap', 'volume', 'num_transactions']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        if not file_exists:
            writer.writeheader()
            
        writer.writerow(stats)
    
    logger.info(f"数据已写入 {filename}：{stats['timestamp']}")

### WebSocket回调函数

In [7]:
def on_message(ws, message):
    """收到WebSocket消息时的回调"""
    msg = json.loads(message)
    if msg.get("type") == "trade":
        for trade in msg["data"]:
            symbol = trade["s"]
            price = trade["p"]
            volume = trade["v"]
            ts_millis = trade["t"]
            
            # 存储原始交易信息
            trade_info = {
                'price': price,
                'volume': volume,
                'timestamp': ts_millis
            }
            
            # 更新每个时间间隔的当前周期数据
            for interval_key in INTERVALS.keys():
                if current_period_data[interval_key][symbol]['first_trade_price'] is None:
                    current_period_data[interval_key][symbol]['first_trade_price'] = price
                    
                current_period_data[interval_key][symbol]['trades'].append(trade_info)
                current_period_data[interval_key][symbol]['high'] = max(current_period_data[interval_key][symbol]['high'], price)
                current_period_data[interval_key][symbol]['low'] = min(current_period_data[interval_key][symbol]['low'], price)
                current_period_data[interval_key][symbol]['volume'] += volume
                current_period_data[interval_key][symbol]['value'] += price * volume
                current_period_data[interval_key][symbol]['count'] += 1

            # 打印接收到的交易信息 (可选，交易量大时考虑关闭)
            print(f"[{symbol}] Price={price} Volume={volume} Timestamp={ts_millis}")

def on_error(ws, error):
    print(f"WebSocket错误: {error}")
    logging.error(f"WebSocket错误: {error}")

def on_close(ws, close_status_code, close_msg):
    print(f"WebSocket关闭: 状态码={close_status_code}, 消息={close_msg}，5秒后尝试重新连接...")
    logging.error(f"WebSocket关闭: 状态码={close_status_code}, 消息={close_msg}，5秒后尝试重新连接...")
    time.sleep(5)
    start_websocket()  # 重新启动WebSocket

def on_open(ws):
    """连接建立后订阅所有股票"""
    global is_first_run
    
    def run():
        global is_first_run
        # 首次运行时，补齐初始数据
        if is_first_run:
            fill_initial_interval_data()
            is_first_run = False
            
        # 订阅所有股票
        for sym in SYMBOLS:
            sub_msg = json.dumps({"type": "subscribe", "symbol": sym})
            ws.send(sub_msg)
            logger.info(f"Subscribed to {sym}")
            time.sleep(0.1)

    threading.Thread(target=run).start()


### 定时任务

In [8]:
def start_aggregation_timers():
    """启动多个定时器，每个时间间隔一个"""
    def run_timer(interval_key, minutes):
        while True:
            # 计算到下一个时间间隔的等待时间
            next_time = get_next_interval_time(minutes)
            now = get_eastern_time()
            wait_seconds = (next_time - now).total_seconds()
            
            logger.info(f"等待 {interval_key} 下一次聚合，将在 {next_time.strftime('%Y-%m-%d %H:%M:%S')} 进行，等待 {wait_seconds:.2f} 秒")
            
            # 等待到下一个时间间隔开始
            time.sleep(max(0, wait_seconds))
            
            # 聚合当前时间间隔的数据
            aggregate_interval_data(interval_key, minutes)
    
    # 为每个时间间隔启动一个单独的线程
    for interval_key, minutes in INTERVALS.items():
        threading.Thread(target=run_timer, args=(interval_key, minutes), daemon=True).start()

def start_market_hours_check():
    """启动市场交易时间检查线程，可选，用于只在市场开盘时收集数据"""
    def check_market_hours():
        while True:
            now = get_eastern_time()
            # 检查是否为交易日（周一至周五）
            is_weekday = 0 <= now.weekday() <= 4
            # 检查是否在交易时间（美东时间上午9:30至下午4:00）
            is_trading_hours = (9 < now.hour or (now.hour == 9 and now.minute >= 30)) and now.hour < 16
            
            # 打印市场状态
            if is_weekday and is_trading_hours:
                logger.info("市场开盘中，数据收集活跃")
            else:
                logger.info("市场已关闭，等待开盘")
            
            # 30分钟检查一次
            time.sleep(1800)
    
    threading.Thread(target=check_market_hours, daemon=True).start()

def start_websocket():
    """启动WebSocket连接的函数，便于重连"""
    ws_app = websocket.WebSocketApp(
        WS_URL,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    
    # 添加ping_interval参数可以保持连接活跃
    ws_app.run_forever(ping_interval=30)

### 开始统计

In [None]:
if __name__ == "__main__":
    # 启动定时聚合
    start_aggregation_timers()
    
    # ----------- 可选：启动市场交易时间检查，只在市场开盘时收集数据 -----------
    # start_market_hours_check()
    
    # 启动WebSocket连接
    ws_app = websocket.WebSocketApp(
        WS_URL,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    ws_app.run_forever()

2025-05-09 04:21:54,869 - Thread-5 (run_timer) - INFO - "等待 1min 下一次聚合，将在 2025-05-09 07:22:00 进行，等待 5.13 秒"
2025-05-09 04:21:54,870 - Thread-8 (run_timer) - INFO - "等待 30min 下一次聚合，将在 2025-05-09 07:30:00 进行，等待 485.13 秒"
2025-05-09 04:21:54,871 - Thread-9 (run_timer) - INFO - "等待 1h 下一次聚合，将在 2025-05-09 08:00:00 进行，等待 2285.13 秒"
2025-05-09 04:21:54,872 - Thread-7 (run_timer) - INFO - "等待 15min 下一次聚合，将在 2025-05-09 07:30:00 进行，等待 485.13 秒"
2025-05-09 04:21:54,872 - Thread-6 (run_timer) - INFO - "等待 5min 下一次聚合，将在 2025-05-09 07:25:00 进行，等待 185.13 秒"
2025-05-09 04:21:55,038 - MainThread - INFO - "Websocket connected"
2025-05-09 04:21:55,038 - Thread-10 (run) - INFO - "开始补齐当前时间间隔的初始数据..."
2025-05-09 04:21:55,041 - Thread-10 (run) - INFO - "当前时间正好是 1min 间隔的开始，无需补齐"
2025-05-09 04:21:55,041 - Thread-10 (run) - INFO - "补齐 5min 数据: 从 2025-05-09 07:20:00 到 2025-05-09 07:21:55"
2025-05-09 04:21:55,042 - Thread-10 (run) - INFO - "获取历史数据: AAPL, 从 2025-05-09 04:20:00 到 2025-05-09 04:21:55, 间隔: 1min"
2025

[GOOGL] Price=154.68 Volume=10 Timestamp=1746789717474


2025-05-09 04:22:00,008 - Thread-5 (run_timer) - INFO - "数据已写入 ./data/raw/1min/GOOGL_1min.csv：2025-05-09 07:22:00"
2025-05-09 04:22:00,009 - Thread-5 (run_timer) - INFO - "等待 1min 下一次聚合，将在 2025-05-09 07:23:00 进行，等待 59.99 秒"


[GOOGL] Price=154.7 Volume=1 Timestamp=1746789727147
[GOOGL] Price=154.69 Volume=1 Timestamp=1746789727149
[GOOGL] Price=154.68 Volume=4 Timestamp=1746789727151
[GOOGL] Price=154.68 Volume=4 Timestamp=1746789727152
[GOOGL] Price=154.68 Volume=4 Timestamp=1746789727153
[GOOGL] Price=154.68 Volume=4 Timestamp=1746789727153
[GOOGL] Price=154.68 Volume=4 Timestamp=1746789727154
[GOOGL] Price=154.68 Volume=4 Timestamp=1746789727155
[GOOGL] Price=154.68 Volume=4 Timestamp=1746789727156
[GOOGL] Price=154.68 Volume=1 Timestamp=1746789727156
[GOOGL] Price=154.68 Volume=16 Timestamp=1746789728646
[GOOGL] Price=154.69 Volume=9 Timestamp=1746789731583
[GOOGL] Price=154.69 Volume=9 Timestamp=1746789731587
[TSLA] Price=288.33 Volume=20 Timestamp=1746789734681
[GOOGL] Price=154.69 Volume=25 Timestamp=1746789742739
[GOOGL] Price=154.68 Volume=1 Timestamp=1746789746908
[GOOGL] Price=154.67 Volume=22 Timestamp=1746789746908
[GOOGL] Price=154.72 Volume=100 Timestamp=1746789765298


2025-05-09 04:23:00,016 - Thread-5 (run_timer) - INFO - "数据已写入 ./data/raw/1min/GOOGL_1min.csv：2025-05-09 07:23:00"
2025-05-09 04:23:00,018 - Thread-5 (run_timer) - INFO - "数据已写入 ./data/raw/1min/TSLA_1min.csv：2025-05-09 07:23:00"
2025-05-09 04:23:00,019 - Thread-5 (run_timer) - INFO - "等待 1min 下一次聚合，将在 2025-05-09 07:24:00 进行，等待 59.98 秒"


[GOOGL] Price=154.68 Volume=4 Timestamp=1746789798817
[GOOGL] Price=154.68 Volume=46 Timestamp=1746789798818
[GOOGL] Price=154.7 Volume=4 Timestamp=1746789807509
[GOOGL] Price=154.7 Volume=4 Timestamp=1746789809659
