In [18]:
import ccxt
import pandas as pd
import numpy as np

# 创建Binance交易所实例
exchange = ccxt.binance()

# 获取BTC/USDT的历史K线数据，时间间隔为5分钟
symbol = 'BTC/USDT'
timeframe = '5m'
limit = 5000  # 可以根据需要调整数量
ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)

# 转换为pandas DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

In [5]:
# 统计5分钟K线的波动幅度分布
df['5m_range'] = df['high'] - df['low']
range_distribution = df['5m_range'].describe()

print("5分钟K线波动幅度分布：")
print(range_distribution)

# 统计某个关键价格（例如移动均线）的突破情况
df['20_SMA'] = df['close'].rolling(window=20).mean()
df['breakout_above_20SMA'] = df['close'] > df['20_SMA']

# 计算突破20SMA后的涨跌幅度
df['returns'] = df['close'].pct_change()
breakout_returns = df.loc[df['breakout_above_20SMA'], 'returns'].mean()

print(f"突破20SMA后平均涨幅为: {breakout_returns:.2%}")

5分钟K线波动幅度分布：
count    500.000000
mean     108.155820
std      101.917989
min       11.910000
25%       50.162500
50%       78.430000
75%      132.990000
max      920.360000
Name: 5m_range, dtype: float64
突破20SMA后平均涨幅为: 0.03%


In [19]:
import ccxt
import pandas as pd
import numpy as np

# 创建Binance交易所实例
exchange = ccxt.binance()

# 获取BTC/USDT的历史K线数据，时间间隔为5分钟
symbol = 'BTC/USDT'
timeframe = '5m'
limit = 1000  # 可以根据需要调整数量
ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)

# 转换为pandas DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

# 提取日期信息
df['date'] = df['timestamp'].dt.date

# 计算每日的最高价和最低价
daily_highs = df.groupby('date')['high'].max()
daily_lows = df.groupby('date')['low'].min()

# 添加每日最高价和最低价信息
df['daily_high'] = df['date'].map(daily_highs)
df['daily_low'] = df['date'].map(daily_lows)

# 判断每根K线是否为当天最高价或最低价
df['is_daily_high_reached'] = df['high'] >= df['daily_high']
df['is_daily_low_reached'] = df['low'] <= df['daily_low']

# 判断截至当前K线，是否已经达到过当天的最高价或最低价
df['high_or_low_reached'] = df.groupby('date').apply(lambda x: x['is_daily_high_reached'].cummax() | x['is_daily_low_reached'].cummax()).reset_index(level=0, drop=True)

# 计算每根K线后当天最高价或最低价已经出现的概率
probability_by_candle = df.groupby(df.index % (24*12))['high_or_low_reached'].mean() * 100

print("每根5分钟K线后，已出现当天最高价或最低价的概率：")
print(probability_by_candle)

每根5分钟K线后，已出现当天最高价或最低价的概率：
0       75.0
1       75.0
2       75.0
3       75.0
4      100.0
       ...  
283    100.0
284    100.0
285    100.0
286    100.0
287    100.0
Name: high_or_low_reached, Length: 288, dtype: float64


  df['high_or_low_reached'] = df.groupby('date').apply(lambda x: x['is_daily_high_reached'].cummax() | x['is_daily_low_reached'].cummax()).reset_index(level=0, drop=True)


In [15]:
import ccxt
import pandas as pd

# 创建Binance交易所实例
exchange = ccxt.binance()

# 获取BTC/USDT的历史K线数据，时间间隔为1天
symbol = 'BTC/USDT'
timeframe = '1d'
limit = 1000  # 可以根据需要调整数量
ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)

# 转换为pandas DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

# 提取周几信息
df['weekday'] = df['timestamp'].dt.day_name()

# 计算每日波动幅度
df['daily_range'] = df['high'] - df['low']

# 按周分组并找出每周波动最大的一天
df['week_start'] = df['timestamp'].dt.to_period('W').apply(lambda r: r.start_time)
weekly_max = df.loc[df.groupby('week_start')['daily_range'].idxmax()]

# 统计每个周几成为波动最大一天的次数
weekday_counts = weekly_max['weekday'].value_counts(normalize=True) * 100

print("每周波动最大的一天是周几的概率：")
print(weekday_counts)

每周波动最大的一天是周几的概率：
weekday
Monday       23.076923
Wednesday    20.279720
Friday       18.881119
Tuesday      18.181818
Thursday     13.286713
Sunday        3.496503
Saturday      2.797203
Name: proportion, dtype: float64


In [3]:
import pandas as pd
import numpy as np
import ccxt
import mplfinance as mpf
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from pa.pa_index import pa_oscillator, check_zones


# 定义检测“看涨吞没”形态的函数
def is_bullish_engulfing(df, index):
    if index < 1:
        return False  # 必须要有前一根K线
    prev_candle = df.iloc[index - 1]
    current_candle = df.iloc[index]
    
    # 条件1：前一根是阴线
    if prev_candle['Close'] >= prev_candle['Open']:
        return False
    
    # 条件2：当前K线是阳线
    if current_candle['Close'] <= current_candle['Open']:
        return False
    
    # 条件3：当前K线吞没前一根K线
    if current_candle['Open'] > prev_candle['Low'] or current_candle['Close'] < prev_candle['High']:
        return False
    
    return True


# 新增：定义检测"看涨孕线"形态的函数
def is_bullish_harami(df, index):
    if index < 1:
        return False  # 必须要有前一根K线
    prev_candle = df.iloc[index - 1]
    current_candle = df.iloc[index]
    
    # 条件1：前一根是阴线
    if prev_candle['Close'] >= prev_candle['Open']:
        return False
    
    # 条件2：当前K线是阳线
    if current_candle['Close'] <= current_candle['Open']:
        return False
    
    # 条件3：当前K线完全包含在前一根K线实体内
    if (current_candle['Open'] <= prev_candle['Close'] and
        current_candle['Close'] >= prev_candle['Open'] and
        current_candle['High'] <= prev_candle['High'] and
        current_candle['Low'] >= prev_candle['Low']):
        return True
    
    return False


from pa.vector_candles import vector_candles

# 修改回测函数以包含两种形态
def backtest_and_plot(df, initial_capital=10000, risk_per_trade=0.02, dca_levels=[0.05, 0.1]):

    tick_size = 0.01
    successful_trades = 0
    total_trades = 0
    total_profit_loss = 0
    buy_signals = []
    sell_signals = []
    capital = initial_capital
    position_size = 0
    avg_entry_price = 0
    dca_count = 0
    df = vector_candles(df)

    df.rename(columns={'open': 'Open', 'high': 'High', 'low': 'Low', 'close': 'Close', 'volume': 'Volume'}, inplace=True)

    df = pa_oscillator(df)
    df = check_zones(df)

    for i in range(1, len(df) - 1):
        if position_size == 0 and (is_bullish_engulfing(df, i) or is_bullish_harami(df, i)) and df.iloc[i]['PA'] < -10:
            signal_candle = df.iloc[i]
            next_candle = df.iloc[i + 1]
            
            entry_price = signal_candle['High'] + tick_size
            
            if next_candle['High'] >= entry_price:
                total_trades += 1
                
                stop_loss = signal_candle['Low'] - tick_size
                target_price = entry_price + 2 * (entry_price - stop_loss)
                
                # 计算头寸大小
                risk_amount = capital * risk_per_trade
                position_size = risk_amount / (entry_price - stop_loss)
                avg_entry_price = entry_price
                
                buy_signals.append((next_candle['timestamp'], entry_price))
                trade_exited = False
                dca_count = 0
                
                for j in range(i + 2, len(df)):
                    current_price = df.iloc[j]['Close']
                    
                    # 检查是否需要补仓
                    for level in dca_levels:
                        if current_price <= avg_entry_price * (1 - level) and dca_count < len(dca_levels):
                            dca_amount = capital * risk_per_trade * (1 + dca_count * 0.5)  # 增加补仓金额
                            dca_size = dca_amount / current_price
                            position_size += dca_size
                            avg_entry_price = (avg_entry_price * position_size + current_price * dca_size) / (position_size + dca_size)
                            position_size += dca_size
                            dca_count += 1
                            buy_signals.append((df.iloc[j]['timestamp'], current_price))
                    
                    if df.iloc[j]['Low'] <= stop_loss:
                        sell_signals.append((df.iloc[j]['timestamp'], stop_loss))
                        trade_exited = True
                        trade_profit = (stop_loss - avg_entry_price) * position_size
                        total_profit_loss += trade_profit
                        capital += trade_profit
                        break
                    if df.iloc[j]['High'] >= target_price:
                        successful_trades += 1
                        sell_signals.append((df.iloc[j]['timestamp'], target_price))
                        trade_exited = True
                        trade_profit = (target_price - avg_entry_price) * position_size
                        total_profit_loss += trade_profit
                        capital += trade_profit
                        break
                
                if not trade_exited:
                    exit_price = df.iloc[-1]['Close']
                    sell_signals.append((df.iloc[-1]['timestamp'], exit_price))
                    trade_profit = (exit_price - avg_entry_price) * position_size
                    total_profit_loss += trade_profit
                    capital += trade_profit

                position_size = 0
                avg_entry_price = 0

    # 绘制K线图并标注买卖点
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df.set_index('timestamp', inplace=True)
    
    
    # Filter buy and sell signals to only include those within the DataFrame's time range
    df_start, df_end = df.index[0], df.index[-1]
    buy_signals = [(t, p) for t, p in buy_signals if df_start <= t <= df_end]
    sell_signals = [(t, p) for t, p in sell_signals if df_start <= t <= df_end]

    # Convert buy and sell signals to DataFrames
    buy_df = pd.DataFrame(buy_signals, columns=['timestamp', 'price']).set_index('timestamp')
    sell_df = pd.DataFrame(sell_signals, columns=['timestamp', 'price']).set_index('timestamp')

    # Ensure buy and sell signals are within the DataFrame's index
    buy_df = buy_df[buy_df.index.isin(df.index)]
    sell_df = sell_df[sell_df.index.isin(df.index)]

    # 创建K线图
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, row_heights=[0.7, 0.3])

    # 定义颜色映射
    color_map = {
        ('up', 1): 'lime',      # Climax Up
        ('down', 1): 'red',     # Climax Down
        ('up', 2): 'blue',      # Above Average Up
        ('down', 2): 'fuchsia', # Above Average Down
        ('up', 0): 'gray',      # Normal Up
        ('down', 0): 'darkgray' # Normal Down
    }
    # 为每个K线分配颜色
    colors_kline = [color_map[(row['candle_direction'], row['va'])] for _, row in df.iterrows()]

    # 创建自定义K线图
    for i in range(len(df)):
        # 绘制影线
        fig.add_trace(go.Scatter(
            x=[df.index[i], df.index[i]],
            y=[df['Low'].iloc[i], df['High'].iloc[i]],
            mode='lines',
            line=dict(color=colors_kline[i], width=1),
            showlegend=False
        ), row=1, col=1)
        
        # 绘制实体
        open_price = df['Open'].iloc[i]
        close_price = df['Close'].iloc[i]
        fig.add_trace(go.Bar(
            x=[df.index[i]],
            y=[abs(close_price - open_price)],
            base=min(open_price, close_price),
            marker_color=colors_kline[i],
            marker_line_width=0,
            width=0.6,  # 调整K线宽度
            showlegend=False
        ), row=1, col=1)

    # 更新布局
    fig.update_layout(
        xaxis_rangeslider_visible=False,
        bargap=0,  # 移除K线之间的间隔
        bargroupgap=0.05  # 微调组内间隔
    )
    # 更新x轴设置，移除时间戳
    fig.update_xaxes(
        showticklabels=False,  # 隐藏x轴标签
        type='category',
        categoryorder='category ascending'
    )
    # 添加买入点
    fig.add_trace(go.Scatter(
        x=buy_df.index,
        y=buy_df['price'],
        mode='markers',
        marker=dict(symbol='triangle-up', size=10, color='green'),
        name='买入点'
    ), row=1, col=1)

    # 添加卖出点
    fig.add_trace(go.Scatter(
        x=sell_df.index,
        y=sell_df['price'],
        mode='markers',
        marker=dict(symbol='triangle-down', size=10, color='red'),
        name='卖出点'
    ), row=1, col=1)

    # 添加PA振荡器
    fig.add_trace(go.Scatter(
        x=df.index,
        y=df['PA'],
        mode='lines',
        name='PA振荡器'
    ), row=2, col=1)

    # 添加超买和超卖线
    fig.add_hline(y=80, line_dash="dash", line_color="gray", row=2, col=1)
    fig.add_hline(y=-80, line_dash="dash", line_color="gray", row=2, col=1)

    # 添加震荡区域线
    fig.add_hline(y=5, line_dash="dash", line_color="gray", row=2, col=1)
    fig.add_hline(y=-5, line_dash="dash", line_color="gray", row=2, col=1)

    # 更新图表布局
    fig.update_layout(
        title='BTC/USDT K线图 (带买卖点标记和PA振荡器)',
        xaxis_title='时间',
        yaxis_title='价格',
        xaxis_rangeslider_visible=False
    )

    # 显示图表
    fig.show()

    # 计算成功率和总盈亏
    success_rate = successful_trades / total_trades if total_trades > 0 else 0
    print(f"成功率: {success_rate:.2%} ({successful_trades}/{total_trades})")
    print(f"总盈亏: {total_profit_loss:.2f} USDT")
    print(f"最终资金: {capital:.2f} USDT")
    print(f"总收益率: {(capital - initial_capital) / initial_capital:.2%}")

# 创建Binance交易所实例
exchange = ccxt.binance()


# 获取BTC/USDT的历史K线数据，时间间隔为5分钟
symbol = 'BTC/USDT'
timeframe = '5m'
limit = 1000  # 可以根据需要调整数量
ohlcv = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=limit)

# 转换为pandas DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

# 调用回测和绘图函数
backtest_and_plot(df)

成功率: 66.67% (2/3)
总盈亏: 599.68 USDT
最终资金: 10599.68 USDT
总收益率: 6.00%
