In [14]:
import sqlite3
import pandas as pd

# 数据库路径
db_path = r"C:\Users\BXDM\Documents\Notebook\Python 量化投资\历史数据管理\stock_data.db"
stock_code = '600893'
capital = 50000  # 本金5万元

# 连接数据库并读取数据
conn = sqlite3.connect(db_path)
query = f"SELECT trade_date, open_price, high_price, low_price, close_price FROM stock_daily_data WHERE stock_code='{stock_code}' ORDER BY trade_date"
df = pd.read_sql_query(query, conn)
conn.close()

print(df.head())
print(f"初始本金：{capital} 元")

   trade_date  open_price  high_price  low_price  close_price
0  2023-01-03       42.17       44.10      42.10        43.78
1  2023-01-04       43.69       44.10      43.30        43.84
2  2023-01-05       43.68       44.32      43.50        44.14
3  2023-01-06       43.89       44.30      43.58        43.97
4  2023-01-09       43.90       44.18      42.20        42.54
初始本金：50000 元


In [15]:
def turtle_strategy(df, window=20):
    df['high_max'] = df['high_price'].rolling(window).max().shift(1)
    df['low_min'] = df['low_price'].rolling(window).min().shift(1)
    df['signal'] = 0
    df.loc[df['close_price'] > df['high_max'], 'signal'] = 1  # 突破最高买入
    df.loc[df['close_price'] < df['low_min'], 'signal'] = -1 # 跌破最低卖出
    return df

# 假设df包含列：date, open, high, low, close
# df = pd.read_csv('your_data.csv')
# result = turtle_strategy(df)

In [16]:
turtle_strategy(df)

Unnamed: 0,trade_date,open_price,high_price,low_price,close_price,high_max,low_min,signal
0,2023-01-03,42.17,44.10,42.10,43.78,,,0
1,2023-01-04,43.69,44.10,43.30,43.84,,,0
2,2023-01-05,43.68,44.32,43.50,44.14,,,0
3,2023-01-06,43.89,44.30,43.58,43.97,,,0
4,2023-01-09,43.90,44.18,42.20,42.54,,,0
...,...,...,...,...,...,...,...,...
612,2025-07-16,36.79,36.84,36.48,36.70,38.88,34.46,0
613,2025-07-17,36.70,38.00,36.69,37.95,38.88,34.46,0
614,2025-07-18,37.90,39.39,37.70,39.13,38.88,34.46,1
615,2025-07-21,36.79,36.84,36.48,36.70,39.39,34.46,0


In [48]:
import sqlite3
import pandas as pd
import numpy as np
import talib
import plotly.graph_objs as go

# 数据读取
def load_data(db_path, stock_code):
    conn = sqlite3.connect(db_path)
    query = f"""
    SELECT trade_date, open_price, high_price, low_price, close_price, volume
    FROM stock_daily_data
    WHERE stock_code='{stock_code}'
    ORDER BY trade_date
    """
    df = pd.read_sql_query(query, conn)
    conn.close()
    df['trade_date'] = pd.to_datetime(df['trade_date'])
    df = df.sort_values('trade_date').reset_index(drop=True)
    return df

# 海龟策略

def turtle_strategy_original(df, entry_window=20, exit_window=10, atr_period=20, risk_pct=0.01, max_units=4, capital=50000):
    df = df.copy()
    df['ATR'] = talib.ATR(df['high_price'], df['low_price'], df['close_price'], timeperiod=atr_period)
    df['entry_high'] = df['high_price'].rolling(entry_window).max().shift(1)
    df['exit_low'] = df['low_price'].rolling(exit_window).min().shift(1)
    df['signal'] = 0

    position = 0
    entry_price = 0
    stop_price = 0
    units = 0
    shares = 0
    cash = capital
    equity_curve = []
    trades = []

    for i, row in df.iterrows():
        price = row['close_price']
        atr = row['ATR']
        # 入场
        if position == 0 and price > row['entry_high'] and not np.isnan(atr):
            unit_size = int((cash * risk_pct) // atr)
            if unit_size > 0:
                shares += unit_size
                cash -= unit_size * price
                entry_price = price
                stop_price = price - 2 * atr
                position = 1
                units = 1
                df.at[i, 'signal'] = 1
        # 加仓
        elif position == 1 and units < max_units and price >= entry_price + 0.5 * atr * units:
            unit_size = int((cash * risk_pct) // atr)
            if unit_size > 0:
                shares += unit_size
                cash -= unit_size * price
                units += 1
        # 止损或离场
        if position == 1 and (price < stop_price or price < row['exit_low']):
            cash += shares * price
            trades.append(price - entry_price)
            shares = 0
            position = 0
            units = 0
            entry_price = 0
            stop_price = 0
            df.at[i, 'signal'] = -1
        total_asset = cash + shares * price
        equity_curve.append(total_asset)
    df['equity'] = equity_curve
    return df, trades


# 反向海龟策略
def anti_turtle_strategy(df, 
                         entry_window=20,  # 入场突破窗口
                         exit_window=10, # 离场突破窗口
                         atr_period=20, # 平均真实波幅计算周期
                         risk_pct=0.01, # 风险占比
                         max_units=4, # 最大持仓单位数
                         capital=50000): # 初始资金
    df = df.copy()
    df['ATR'] = talib.ATR(df['high_price'], df['low_price'], df['close_price'], timeperiod=atr_period)
    df['entry_high'] = df['high_price'].rolling(entry_window).max().shift(1)
    df['exit_low'] = df['low_price'].rolling(exit_window).min().shift(1)
    df['signal'] = 0

    position = 0  # 0:空仓, 1:做多, -1:做空
    entry_price = 0
    stop_price = 0
    units = 0
    shares = 0
    cash = capital
    equity_curve = []
    trades = []

    for i, row in df.iterrows():
        price = row['close_price']
        atr = row['ATR']
        # 做空信号：突破新高
        if position == 0 and price > row['entry_high'] and not np.isnan(atr):
            unit_size = int((cash * risk_pct) // atr)
            if unit_size > 0:
                shares -= unit_size  # 卖空
                cash += unit_size * price
                entry_price = price
                stop_price = price + 2 * atr
                position = -1
                units = 1
                df.at[i, 'signal'] = -1
        # 做多信号：跌破新低
        elif position == 0 and price < row['exit_low'] and not np.isnan(atr):
            unit_size = int((cash * risk_pct) // atr)
            if unit_size > 0:
                shares += unit_size
                cash -= unit_size * price
                entry_price = price
                stop_price = price - 2 * atr
                position = 1
                units = 1
                df.at[i, 'signal'] = 1
        # 做空加仓
        elif position == -1 and units < max_units and price <= entry_price - 0.5 * atr * units:
            unit_size = int((cash * risk_pct) // atr)
            if unit_size > 0:
                shares -= unit_size
                cash += unit_size * price
                units += 1
        # 做多加仓
        elif position == 1 and units < max_units and price >= entry_price + 0.5 * atr * units:
            unit_size = int((cash * risk_pct) // atr)
            if unit_size > 0:
                shares += unit_size
                cash -= unit_size * price
                units += 1
        # 平空仓
        if position == -1 and (price > stop_price or price < row['exit_low']):
            cash -= abs(shares) * price
            trades.append(entry_price - price)
            shares = 0
            position = 0
            units = 0
            entry_price = 0
            stop_price = 0
            df.at[i, 'signal'] = 2  # 平空
        # 平多仓
        if position == 1 and (price < stop_price or price > row['entry_high']):
            cash += shares * price
            trades.append(price - entry_price)
            shares = 0
            position = 0
            units = 0
            entry_price = 0
            stop_price = 0
            df.at[i, 'signal'] = -2  # 平多
        total_asset = cash + shares * price
        equity_curve.append(total_asset)
    df['equity'] = equity_curve
    return df, trades

# 混合海龟策略
def hybrid_turtle_strategy(df, ma_period=60, **kwargs):
    df = df.copy()
    df['MA'] = df['close_price'].rolling(ma_period).mean()
    df['trend'] = 0
    df.loc[df['close_price'] > df['MA'], 'trend'] = 1
    df.loc[df['close_price'] < df['MA'], 'trend'] = -1

    # 先计算正向和反向信号
    df_long, _ = turtle_strategy_original(df, **kwargs)
    df_anti, _ = anti_turtle_strategy(df, **kwargs)

    # 保证ATR等辅助列存在
    df['ATR'] = df_long['ATR']
    df['entry_high'] = df_long['entry_high']
    df['exit_low'] = df_long['exit_low']

    # 按趋势切换信号
    df['signal'] = 0
    df.loc[df['trend'] == -1, 'signal'] = df_long['signal']
    df.loc[df['trend'] == 1, 'signal'] = df_anti['signal']

    return df

# 回测（支持分批加仓）
def backtest_improved(df, initial_capital=50000, risk_pct=0.3, atr_stop=2.2, max_add=2):
    cash = initial_capital
    shares = 0
    position = 0
    entry_price = 0
    add_count = 0
    equity_curve = []
    trades = []
    for _, row in df.iterrows():
        price = row['close_price']
        signal = row['signal']
        atr = row['ATR']
        if signal == 1 and add_count < max_add:
            max_invest = cash * risk_pct
            buy_shares = max_invest // price
            if buy_shares > 0:
                avg_price = (entry_price * shares + price * buy_shares) / (shares + buy_shares) if shares > 0 else price
                entry_price = avg_price
                cash -= buy_shares * price
                shares += buy_shares
                position = 1
                add_count += 1
        elif position == 1:
            # 止损或卖出
            if (price < entry_price - atr_stop * atr) or signal == -1:
                cash += shares * price
                trades.append(price - entry_price)
                shares = 0
                position = 0
                add_count = 0
        total_asset = cash + shares * price
        equity_curve.append(total_asset)
    df['equity'] = equity_curve
    return df, trades

# 统计与可视化
def analyze_and_plot(df, trades, capital):
    df['profit'] = df['equity'] - capital
    df['equity_max'] = df['equity'].cummax()
    df['drawdown'] = (df['equity'] - df['equity_max']) / df['equity_max']
    total_profit = df['equity'].iloc[-1] - capital
    total_return = total_profit / capital * 100
    max_drawdown = df['drawdown'].min()
    profits = [p for p in trades if p > 0]
    losses = [p for p in trades if p < 0]
    if profits and losses:
        avg_profit = np.mean(profits)
        avg_loss = -np.mean(losses)
        pl_ratio = avg_profit / avg_loss if avg_loss != 0 else np.nan
    else:
        pl_ratio = np.nan
        
        # 夏普比率计算
    daily_returns = df['equity'].pct_change().dropna()
    if not daily_returns.empty:
        annual_return = np.mean(daily_returns) * 252
        annual_vol = np.std(daily_returns) * np.sqrt(252)
        sharpe = annual_return / annual_vol if annual_vol != 0 else np.nan
    else:
        sharpe = np.nan
    
    print(f"总收益: {total_profit:.2f} 元")
    print(f"总收益率: {total_return:.2f}%")
    print(f"最大回撤: {max_drawdown:.2%}")
    print(f"盈亏比: {pl_ratio:.2f}" if not np.isnan(pl_ratio) else "无有效盈亏数据，无法计算盈亏比。")
    print(f"夏普比率: {sharpe:.2f}" if not np.isnan(sharpe) else "无有效数据，无法计算夏普比率。")

    # 盈利面积图
    profit = df['profit']
    dates = df['trade_date']
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=dates, y=profit,
        mode='lines',
        line=dict(color='#00BFFF', width=2),
        name='累计盈利'
    ))
    fig.add_trace(go.Scatter(
        x=dates, y=profit.where(profit >= 0, 0),
        fill='tozeroy', mode='none',
        fillcolor='rgba(255,140,0,0.25)', name='盈利区'
    ))
    fig.add_trace(go.Scatter(
        x=dates, y=profit.where(profit < 0, 0),
        fill='tozeroy', mode='none',
        fillcolor='rgba(0,255,200,0.18)', name='亏损区'
    ))
    fig.update_layout(
        title='累计盈利面积图',
        xaxis_title='日期', yaxis_title='累计盈利（元）',
        plot_bgcolor='#23272E',
        paper_bgcolor='#23272E',
        width=900, height=400,
        font=dict(color='#E0E6F1'),
        legend=dict(bgcolor='rgba(0,0,0,0)')
    )
    fig.show()

    # 买卖点示意
    buy_signals = df[df['signal'] == 1]
    sell_signals = df[df['signal'] == -1]
    fig2 = go.Figure()
    fig2.add_trace(go.Scatter(
        x=df['trade_date'], y=df['close_price'],
        mode='lines', name='收盘价',
        line=dict(color='#E0E6F1', width=2)
    ))
    fig2.add_trace(go.Scatter(
        x=buy_signals['trade_date'], y=buy_signals['close_price'],
        mode='markers', name='买入',
        marker=dict(symbol='triangle-up', color='rgba(255,180,80,0.55)', size=14, line=dict(width=1, color='#FFD700'))
    ))
    fig2.add_trace(go.Scatter(
        x=sell_signals['trade_date'], y=sell_signals['close_price'],
        mode='markers', name='卖出',
        marker=dict(symbol='triangle-down', color='rgba(80,220,255,0.45)', size=14, line=dict(width=1, color='#00BFFF'))
    ))
    fig2.update_layout(
        title='买卖点示意',
        xaxis_title='日期', yaxis_title='收盘价',
        plot_bgcolor='#23272E',
        paper_bgcolor='#23272E',
        width=900, height=400,
        font=dict(color='#E0E6F1'),
        legend=dict(bgcolor='rgba(0,0,0,0)')
    )
    fig2.show()

In [45]:
# 主流程
if __name__ == "__main__":
    db_path = r"C:\Users\BXDM\Documents\Notebook\Python 量化投资\历史数据管理\stock_data.db"
    stock_code = '002911'
    capital = 50000

    df = load_data(db_path, stock_code)
    df, _ = turtle_strategy_original(df)
    df, trades = backtest_improved(df, initial_capital=capital)
    analyze_and_plot(df, trades, capital)

总收益: -856.89 元
总收益率: -1.71%
最大回撤: -5.61%
盈亏比: 1.21
夏普比率: -0.12


In [46]:
# 反向海龟策略
if __name__ == "__main__":
    db_path = r"C:\Users\BXDM\Documents\Notebook\Python 量化投资\历史数据管理\stock_data.db"
    stock_code = '002911'
    capital = 50000

    df = load_data(db_path, stock_code)
    df, _ = anti_turtle_strategy(df)
    df, trades = backtest_improved(df, initial_capital=capital)
    analyze_and_plot(df, trades, capital)

总收益: 1205.84 元
总收益率: 2.41%
最大回撤: -5.41%
盈亏比: 1.44
夏普比率: 0.23


In [49]:
# 混合海龟策略
if __name__ == "__main__":
    db_path = r"C:\Users\BXDM\Documents\Notebook\Python 量化投资\历史数据管理\stock_data.db"
    stock_code = '002911'
    capital = 50000

    df = load_data(db_path, stock_code)
    df = hybrid_turtle_strategy(df)
    df, trades = backtest_improved(df, initial_capital=capital)
    analyze_and_plot(df, trades, capital)

总收益: 1315.52 元
总收益率: 2.63%
最大回撤: -1.52%
无有效盈亏数据，无法计算盈亏比。
夏普比率: 0.71
