# Alpaca-py stock trading basic

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/alpacahq/alpaca-py/blob/master/examples/stocks/stocks-trading-basic.ipynb)

In [58]:
import pandas as pd
import numpy as np
import time
from datetime import datetime, timedelta
import pytz
from google.colab import userdata
from alpaca.trading.client import TradingClient
from alpaca.trading.requests import MarketOrderRequest, LimitOrderRequest
from alpaca.trading.enums import OrderSide, TimeInForce
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest, StockLatestQuoteRequest, StockLatestTradeRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.data.enums import DataFeed # 确保引入

# === 配置 ===
try:
    API_KEY = userdata.get('PK6HZ4U3ZEFRVBYMCKRJHKVJ7Z')
    SECRET_KEY = userdata.get('6y5BxmpkqvxegVKF8YKUUKzieU6WrA1kVZXEfBo9FazM')
except:
    API_KEY = "PK6HZ4U3ZEFRVBYMCKRJHKVJ7Z"
    SECRET_KEY = "6y5BxmpkqvxegVKF8YKUUKzieU6WrA1kVZXEfBo9FazM"

UNIVERSE = ['AAPL', 'MSFT', 'NVDA', 'TSLA', 'AMD', 'GOOGL', 'META', 'AMZN']

trading_client = TradingClient(API_KEY, SECRET_KEY, paper=True)
data_client = StockHistoricalDataClient(API_KEY, SECRET_KEY)

# ==========================================

def get_market_data(symbols):
    now = datetime.now(pytz.timezone('US/Eastern'))

    # === 测试模式: 强制使用昨天的数据 ===
    TEST_MODE = True
    today_str = "2026-01-08" if TEST_MODE else now.strftime('%Y-%m-%d')

    print(f"正在获取数据... (目标日期: {today_str})")

    # 1. 获取日线 (IEX)
    daily_req = StockBarsRequest(
        symbol_or_symbols=symbols,
        timeframe=TimeFrame.Day,
        start=now - timedelta(days=5),
        limit=5, # 稍微多拿一点防止只有一天
        feed=DataFeed.IEX
    )
    daily_bars = data_client.get_stock_bars(daily_req).df

    # 2. 获取分钟线 (IEX)
    # 强制加上时区信息，否则 Pandas 对比时会报警
    today_start = pd.Timestamp(f"{today_str} 09:30:00").tz_localize('US/Eastern')

    min_req = StockBarsRequest(
        symbol_or_symbols=symbols,
        timeframe=TimeFrame.Minute,
        start=today_start,
        feed=DataFeed.IEX
    )

    try:
        min_bars = data_client.get_stock_bars(min_req).df
    except:
        min_bars = pd.DataFrame()

    # 3. 获取实时价格 (IEX) - 仅作为非测试模式的默认值
    latest_trade_req = StockLatestTradeRequest(symbol_or_symbols=symbols, feed=DataFeed.IEX)
    try:
        latest_trades = data_client.get_stock_latest_trade(latest_trade_req)
    except:
        latest_trades = {} # 容错

    data_pack = {}

    for symbol in symbols:
        try:
            # --- 提取 Close_dt-1 ---
            if symbol in daily_bars.index.get_level_values(0):
                symbol_daily = daily_bars.xs(symbol)
                # 逻辑: 找到日期小于 today_str 的最后一天
                prev_days = symbol_daily[symbol_daily.index.strftime('%Y-%m-%d') < today_str]
                if not prev_days.empty:
                    close_dt_1 = prev_days.iloc[-1]['close']
                else:
                    # 如果实在找不到昨天的，就用列表里的第一个兜底
                    close_dt_1 = symbol_daily.iloc[0]['close']
            else:
                continue

            # --- 提取 Open_dt & Price_10:30 & Current_Price ---
            if not min_bars.empty and symbol in min_bars.index.get_level_values(0):
                symbol_min = min_bars.xs(symbol)
                open_dt = symbol_min.iloc[0]['open']

                # 10:30
                target_1030 = pd.Timestamp(f"{today_str} 10:30:00").tz_localize('US/Eastern')
                try:
                    idx_1030 = symbol_min.index.get_indexer([target_1030], method='nearest')[0]
                    price_1030 = symbol_min.iloc[idx_1030]['close']
                    prices_for_sharpe = symbol_min.loc[target_1030:]['close']
                except:
                    price_1030 = open_dt
                    prices_for_sharpe = symbol_min['close']

                # Current Price (关键修复点)
                # 如果是测试模式，用历史数据的最后一根 Bar 作为 "当前价格"
                if TEST_MODE:
                    current_price = symbol_min.iloc[-1]['close']
                else:
                    current_price = latest_trades[symbol].price

            else:
                # 只有日线没分钟线的情况
                open_dt = close_dt_1
                price_1030 = close_dt_1
                current_price = latest_trades[symbol].price if symbol in latest_trades else close_dt_1
                prices_for_sharpe = pd.Series([close_dt_1])

            data_pack[symbol] = {
                'close_dt_1': close_dt_1,
                'open_dt': open_dt,
                'price_1030': price_1030,
                'current_price': current_price,
                'price_series': prices_for_sharpe
            }
        except Exception as e:
            print(f"Skipping {symbol}: {e}")

    return data_pack

def calculate_features(data_pack):
    df = pd.DataFrame.from_dict(data_pack, orient='index')

    # F1: (Open - PrevClose) / PrevClose
    df['F1'] = (df['open_dt'] - df['close_dt_1']) / df['close_dt_1']

    # F2: (Price_1030 - Open) / Open
    df['F2'] = (df['price_1030'] - df['open_dt']) / df['open_dt']

    # F3: (Current - Price_1030) / Price_1030
    df['F3'] = (df['current_price'] - df['price_1030']) / df['price_1030']

    # F4: Sharpe
    sharpe_list = []
    for symbol in df.index:
        prices = data_pack[symbol]['price_series']
        returns = prices.pct_change().dropna()
        if len(returns) > 1 and returns.std() != 0:
            sharpe = returns.mean() / returns.std()
        else:
            sharpe = 0
        sharpe_list.append(sharpe)
    df['F4'] = sharpe_list

    # F5: Zscore Sum
    if len(df) > 1: # 防止只有1个股票时std为NaN
        df['z_F3'] = (df['F3'] - df['F3'].mean()) / df['F3'].std(ddof=1)
        df['z_F4'] = (df['F4'] - df['F4'].mean()) / df['F4'].std(ddof=1)
        # 如果std是0或者nan (所有股票涨幅一样), fillna 0
        df['z_F3'] = df['z_F3'].fillna(0)
        df['z_F4'] = df['z_F4'].fillna(0)
    else:
        df['z_F3'] = 0
        df['z_F4'] = 0

    df['F5'] = df['z_F3'] + df['z_F4']

    return df

def execute_limit_order_logic(symbol, side, qty, wait_seconds=5, cancel_on_timeout=True):
    try:
        # 获取报价 (IEX)
        quote_req = StockLatestQuoteRequest(symbol_or_symbols=symbol, feed=DataFeed.IEX)
        quotes = data_client.get_stock_latest_quote(quote_req)
        quote = quotes[symbol]

        limit_price = quote.bid_price if side == OrderSide.BUY else quote.ask_price

        # 价格为0时的兜底 (修复: 加上 feed=DataFeed.IEX)
        if limit_price == 0:
            trade_req = StockLatestTradeRequest(symbol_or_symbols=symbol, feed=DataFeed.IEX)
            limit_price = data_client.get_stock_latest_trade(trade_req)[symbol].price

        print(f"提交 LIMIT {side} {symbol}: {qty}股 @ ${limit_price}")

        limit_order_data = LimitOrderRequest(
            symbol=symbol,
            qty=qty,
            side=side,
            time_in_force=TimeInForce.DAY,
            limit_price=limit_price
        )
        order = trading_client.submit_order(limit_order_data)

        time.sleep(wait_seconds)

        updated_order = trading_client.get_order_by_id(order.id)
        if updated_order.status == 'filled':
            print(f"--> {symbol} LIMIT 成交!")
            return True
        else:
            print(f"--> {symbol} 未成交 ({updated_order.status}) -> 撤单")
            trading_client.cancel_order_by_id(order.id)

            if not cancel_on_timeout:
                print(f"--> 切换 Market 单强平")
                market_order = MarketOrderRequest(symbol=symbol, qty=qty, side=side, time_in_force=TimeInForce.DAY)
                trading_client.submit_order(market_order)
                return True
            return False

    except Exception as e:
        print(f"下单失败 {symbol}: {e}")
        return False

def run_strategy_logic(timestamp_label):
    print(f"\n=== 触发逻辑: {timestamp_label} ===")

    data_pack = get_market_data(UNIVERSE)
    df = calculate_features(data_pack)

    # 打印一下中间结果方便调试
    print("\n特征计算结果 (前3行):")
    print(df[['F1', 'F2', 'F3', 'F5']].head(3))

    # 过滤
    filtered_df = df[ (df['F1'].abs() <= 0.005) & (df['F2'].abs() <= 0.005) ].copy()
    if filtered_df.empty:
        print("没有股票满足 F1/F2 过滤条件。")
        return

    # 排名
    filtered_df = filtered_df.sort_values(by='F5', ascending=False)
    n = len(filtered_df)
    top_n = max(1, int(n * 0.3))

    top_stocks = filtered_df.index[:top_n].tolist()
    bottom_stocks = filtered_df.index[-top_n:].tolist()

    print(f"做多列表 (Top): {top_stocks}")
    print(f"做空列表 (Bottom): {bottom_stocks}")

    # 执行 (假设 15:58)
    qty = 1
    if timestamp_label == '15:58':
        for sym in top_stocks:
            execute_limit_order_logic(sym, OrderSide.BUY, qty)
        for sym in bottom_stocks:
            execute_limit_order_logic(sym, OrderSide.SELL, qty)

# === 直接运行测试 ===
if __name__ == "__main__":
    print("开始测试 (基于 2026-01-08 数据)...")
    run_strategy_logic('15:58')

开始测试 (基于 2026-01-08 数据)...

=== 触发逻辑: 15:58 ===
正在获取数据... (目标日期: 2026-01-08)

特征计算结果 (前3行):
            F1        F2        F3        F5
AAPL -0.012790 -0.000992  0.011548  1.414214
AMD  -0.048675 -0.014218 -0.012976 -1.414214
没有股票满足 F1/F2 过滤条件。
