-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
import pandas as pd
import numpy as np
import requests
import time
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
class ETHETCQuantStrategy:
def init(self, initial_capital=10000):
self.initial_capital = initial_capital
self.capital = initial_capital
self.eth_position = 0
self.etc_position = 0
self.trades = []
self.portfolio_values = []
def fetch_historical_data(self, symbol='ETHETC', timeframe='1h', limit=1000):
"""
获取历史价格数据(这里使用模拟数据,实际应连接交易所API)
"""
# 模拟价格数据生成
np.random.seed(42)
base_price = 0.1 # ETH/ETC 基础价格
dates = pd.date_range(end=datetime.now(), periods=limit, freq=timeframe)
prices = []
current_price = base_price
for _ in range(limit):
# 模拟价格波动
change = np.random.normal(0, 0.02) # 2% 日波动率
current_price *= (1 + change)
current_price = max(0.01, current_price) # 防止价格为负
prices.append(current_price)
df = pd.DataFrame({
'timestamp': dates,
'open': prices,
'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices],
'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices],
'close': prices,
'volume': [abs(np.random.normal(1000, 200)) for _ in prices]
})
return df
def calculate_indicators(self, df):
"""
计算技术指标
"""
# 移动平均线
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA50'] = df['close'].rolling(window=50).mean()
# 布林带
df['BB_Middle'] = df['close'].rolling(window=20).mean()
df['BB_Std'] = df['close'].rolling(window=20).std()
df['BB_Upper'] = df['BB_Middle'] + 2 * df['BB_Std']
df['BB_Lower'] = df['BB_Middle'] - 2 * df['BB_Std']
# RSI
df['RSI'] = self.calculate_rsi(df['close'], 14)
# MACD
df['MACD'], df['MACD_Signal'] = self.calculate_macd(df['close'])
# 成交量加权平均价
df['VWAP'] = (df['volume'] * (df['high'] + df['low'] + df['close']) / 3).cumsum() / df['volume'].cumsum()
return df
def calculate_rsi(self, prices, period=14):
"""
计算RSI指标
"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_macd(self, prices, fast=12, slow=26, signal=9):
"""
计算MACD指标
"""
ema_fast = prices.ewm(span=fast).mean()
ema_slow = prices.ewm(span=slow).mean()
macd = ema_fast - ema_slow
macd_signal = macd.ewm(span=signal).mean()
return macd, macd_signal
def generate_signals(self, df):
"""
生成交易信号
"""
df['signal'] = 0
# 均值回归策略信号
mean_reversion_signal = (
(df['close'] < df['BB_Lower']) & # 价格低于布林带下轨
(df['RSI'] < 30) & # RSI超卖
(df['close'] > df['MA20']) # 价格在20日均线上方
)
# 动量突破策略信号
momentum_signal = (
(df['close'] > df['BB_Upper']) & # 价格突破布林带上轨
(df['RSI'] > 70) & # RSI超买但趋势强劲
(df['MACD'] > df['MACD_Signal']) # MACD金叉
)
# 趋势反转信号
trend_reversal_signal = (
(df['MA20'] > df['MA50']) & # 短期均线上穿长期均线
(df['MACD'] > df['MACD_Signal']) &
(df['volume'] > df['volume'].rolling(20).mean()) # 放量
)
# 分配信号
df.loc[mean_reversion_signal, 'signal'] = 1 # 买入信号
df.loc[momentum_signal & ~mean_reversion_signal, 'signal'] = -1 # 卖出信号
df.loc[trend_reversal_signal & (df['signal'] == 0), 'signal'] = 1 # 趋势买入
return df
def calculate_position_size(self, current_price, signal_strength, risk_per_trade=0.02):
"""
计算仓位大小
"""
risk_amount = self.capital * risk_per_trade
# 根据信号强度调整仓位
position_value = risk_amount * (1 + signal_strength)
position_size = position_value / current_price
return min(position_size, self.capital * 0.1 / current_price) # 单次交易不超过总资金的10%
def execute_strategy(self, df):
"""
执行交易策略
"""
current_cash = self.initial_capital
position = 0
portfolio_values = []
for i in range(50, len(df)): # 从第50个数据点开始,确保指标计算完成
current_data = df.iloc[i]
signal = current_data['signal']
current_price = current_data['close']
# 计算信号强度
signal_strength = self.calculate_signal_strength(current_data)
# 执行交易逻辑
if signal == 1 and position == 0: # 买入信号且无持仓
position_size = self.calculate_position_size(current_price, signal_strength)
cost = position_size * current_price
if cost <= current_cash:
position = position_size
current_cash -= cost
self.trades.append({
'timestamp': current_data['timestamp'],
'action': 'BUY',
'price': current_price,
'size': position_size,
'value': cost
})
elif signal == -1 and position > 0: # 卖出信号且有持仓
revenue = position * current_price
current_cash += revenue
self.trades.append({
'timestamp': current_data['timestamp'],
'action': 'SELL',
'price': current_price,
'size': position,
'value': revenue
})
position = 0
# 计算当前投资组合价值
portfolio_value = current_cash + (position * current_price)
portfolio_values.append(portfolio_value)
self.portfolio_values = portfolio_values
return portfolio_values
def calculate_signal_strength(self, data):
"""
计算信号强度(0到1之间)
"""
strength = 0
# RSI强度
if data['RSI'] < 30:
strength += (30 - data['RSI']) / 30 * 0.3
elif data['RSI'] > 70:
strength += (data['RSI'] - 70) / 30 * 0.3
# 布林带强度
bb_position = (data['close'] - data['BB_Lower']) / (data['BB_Upper'] - data['BB_Lower'])
if bb_position < 0.2:
strength += (0.2 - bb_position) / 0.2 * 0.3
elif bb_position > 0.8:
strength += (bb_position - 0.8) / 0.2 * 0.3
# 成交量强度
volume_strength = min(data['volume'] / data['volume'].rolling(20).mean(), 2) / 2 * 0.4
strength += volume_strength
return min(strength, 1.0)
def calculate_performance_metrics(self):
"""
计算策略表现指标
"""
if not self.portfolio_values:
return {}
returns = pd.Series(self.portfolio_values).pct_change().dropna()
total_return = (self.portfolio_values[-1] - self.initial_capital) / self.initial_capital
sharpe_ratio = returns.mean() / returns.std() * np.sqrt(365*24) # 年化夏普比率
# 最大回撤
peak = pd.Series(self.portfolio_values).expanding().max()
drawdown = (pd.Series(self.portfolio_values) - peak) / peak
max_drawdown = drawdown.min()
# 交易统计
if self.trades:
trade_returns = []
for i in range(0, len(self.trades)-1, 2):
if i+1 < len(self.trades):
buy_trade = self.trades[i]
sell_trade = self.trades[i+1]
if buy_trade['action'] == 'BUY' and sell_trade['action'] == 'SELL':
trade_return = (sell_trade['value'] - buy_trade['value']) / buy_trade['value']
trade_returns.append(trade_return)
win_rate = len([r for r in trade_returns if r > 0]) / len(trade_returns) if trade_returns else 0
avg_trade_return = np.mean(trade_returns) if trade_returns else 0
else:
win_rate = 0
avg_trade_return = 0
return {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'avg_trade_return': avg_trade_return,
'total_trades': len(self.trades) // 2
}
def plot_results(self, df):
"""
绘制策略结果图表
"""
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# 价格和指标图表
ax1.plot(df['timestamp'], df['close'], label='ETH/ETC Price', linewidth=1)
ax1.plot(df['timestamp'], df['MA20'], label='20-period MA', alpha=0.7)
ax1.plot(df['timestamp'], df['BB_Upper'], label='Bollinger Upper', linestyle='--', alpha=0.7)
ax1.plot(df['timestamp'], df['BB_Lower'], label='Bollinger Lower', linestyle='--', alpha=0.7)
# 标记交易点
buy_times = [t['timestamp'] for t in self.trades if t['action'] == 'BUY']
buy_prices = [t['price'] for t in self.trades if t['action'] == 'BUY']
sell_times = [t['timestamp'] for t in self.trades if t['action'] == 'SELL']
sell_prices = [t['price'] for t in self.trades if t['action'] == 'SELL']
ax1.scatter(buy_times, buy_prices, color='green', marker='^', s=100, label='Buy', zorder=5)
ax1.scatter(sell_times, sell_prices, color='red', marker='v', s=100, label='Sell', zorder=5)
ax1.set_title('ETH/ETC Trading Strategy')
ax1.set_ylabel('Price')
ax1.legend()
ax1.grid(True)
# 投资组合价值图表
ax2.plot(df['timestamp'].iloc[50:50+len(self.portfolio_values)], self.portfolio_values, label='Portfolio Value')
ax2.axhline(y=self.initial_capital, color='r', linestyle='--', label='Initial Capital')
ax2.set_title('Portfolio Value Over Time')
ax2.set_ylabel('Portfolio Value (USD)')
ax2.set_xlabel('Date')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.show()
策略执行示例
def main():
# 初始化策略
strategy = ETHETCQuantStrategy(initial_capital=10000)
# 获取数据
print("获取历史数据...")
df = strategy.fetch_historical_data()
# 计算指标
print("计算技术指标...")
df = strategy.calculate_indicators(df)
# 生成信号
print("生成交易信号...")
df = strategy.generate_signals(df)
# 执行策略
print("执行交易策略...")
portfolio_values = strategy.execute_strategy(df)
# 计算表现指标
metrics = strategy.calculate_performance_metrics()
print("\n=== 策略表现报告 ===")
print(f"总收益率: {metrics['total_return']:.2%}")
print(f"夏普比率: {metrics['sharpe_ratio']:.2f}")
print(f"最大回撤: {metrics['max_drawdown']:.2%}")
print(f"胜率: {metrics['win_rate']:.2%}")
print(f"平均交易收益率: {metrics['avg_trade_return']:.2%}")
print(f"总交易次数: {metrics['total_trades']}")
# 绘制图表
strategy.plot_results(df)
if name == "main":
main()
参数优化
使用网格搜索优化参数
def optimize_parameters(df):
best_sharpe = -np.inf
best_params = {}
for rsi_low in [25, 30, 35]:
for rsi_high in [65, 70, 75]:
for ma_short in [15, 20, 25]:
for ma_long in [45, 50, 55]:
# 测试不同参数组合
pass
return best_params
def enhanced_risk_management(self, current_price, position):
# 动态止损
stop_loss_price = current_price * 0.95 # 5% 止损
take_profit_price = current_price * 1.08 # 8% 止盈
风险控制增强
# 仓位调整基于市场波动率
volatility = df['close'].pct_change().std()
position_size = min(0.1, 0.05 / volatility) # 波动率高时降低仓位