# 数据导入与清洗

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import shutup

shutup.please()

data = pd.read_csv("EURGBP.csv", sep='\t')
# data.drop(columns=['<TIME>'], inplace=True)
data

In [None]:
data = data.iloc[:, :-2]
data

## 构建乖离度指标

In [None]:
data['bias'] = data['<CLOSE>'] / data['<CLOSE>'].rolling(5).mean() - 1
data

In [None]:
data['return5'] = data['<CLOSE>'] / np.roll(data['<CLOSE>'], 5) - 1
data

### 对乖离度指标进行回归

In [None]:
import statsmodels.api as sm
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['bias']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

In [None]:
plt.scatter(res.predict(x1), data_to_reg['return5'])
x_set = np.linspace(-0.04, 0.04, 1000)
y_set = x_set
plt.plot(x_set, y_set)
plt.show()

乖离度指标表现很好

## 构建ATR指标

In [None]:
from ta.volatility import AverageTrueRange

ATR = AverageTrueRange(data['<HIGH>'], data['<LOW>'], data['<CLOSE>'], window=5).average_true_range()
data['ATR'] = ATR
data

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['ATR']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

ATR指标一坨

In [None]:
plt.scatter(res.predict(x1), data_to_reg['return5'])
x_set = np.linspace(-0.04, 0.04, 1000)
y_set = x_set
plt.plot(x_set, y_set)
plt.show()

## 构建CCI指标

In [None]:
from ta.trend import CCIIndicator

CCI = CCIIndicator(data['<HIGH>'], data['<LOW>'], data['<CLOSE>'], window=5).cci()
data['cci'] = CCI
data

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['cci']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['cci', 'bias']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

Bias是比CCI更好的指标

## 构建RSI

In [None]:
from ta.momentum import RSIIndicator

RSI = RSIIndicator(data['<CLOSE>'], window=5).rsi()
data['rsi'] = RSI
data

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['rsi']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['rsi', 'bias']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

## 构建ADX

In [None]:
from ta.trend import ADXIndicator

ADX = ADXIndicator(data['<HIGH>'], data['<LOW>'], data['<CLOSE>'], window=5).adx()
data['adx'] = ADX
data

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['adx']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

adx也是一坨

## 试一试mean reversion

In [None]:
data['mv'] = np.log(data['<OPEN>'] / np.roll(data['<CLOSE>'], 1)).rolling(5).sum()
data

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['mv']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

In [None]:
data[['rsi', 'bias', 'mv']].corr()

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['rsi', 'bias']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

## 构建动量因子

In [None]:
data['momentum'] = pd.DataFrame(np.log(np.roll(data['<CLOSE>'], 1) / np.roll(data['<OPEN>'], 1))).rolling(5).sum()
data

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['momentum']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

In [None]:
data[['rsi', 'bias', 'momentum']].corr()

In [None]:
data_to_reg = data.dropna()
data_to_reg = data_to_reg[(data_to_reg.mean() - 3 * data_to_reg.std() < data_to_reg) & (data_to_reg < data_to_reg.mean() + 3 * data_to_reg.std())]
data_to_reg = data_to_reg.iloc[:, 1:].dropna()
y1 = data_to_reg['return5']
x1 = data_to_reg[['momentum', 'rsi', 'bias']]
x1 = sm.add_constant(x1)
mod = sm.OLS(y1, x1)
res = mod.fit()
print(res.summary())

# 建立随机森林模型

In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler

scaler = StandardScaler()

data_for_test = data.dropna(axis=0, how='any')[['momentum', 'rsi', 'bias', 'return5']]
data_for_test = data_for_test[(data_for_test > data_for_test.mean() - 3 * data_for_test.std()) & (data_for_test < data_for_test.mean() + 3 * data_for_test.std())]
data_for_test.dropna(inplace=True)
data_for_test.reset_index(inplace=True)

for i in range(len(data_for_test)):
    if data_for_test.loc[i, 'return5'] > 0:
        data_for_test.loc[i, 'bullish'] = 1
    else:
        data_for_test.loc[i, 'bullish'] = 0
data_for_test = data_for_test[['momentum', 'rsi', 'bias', 'bullish']]

X = data_for_test[['momentum', 'rsi', 'bias']]
y = data_for_test[['bullish']]

X = pd.DataFrame(scaler.fit_transform(X), columns=['momentum', 'rsi', 'bias'])

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
X_train

In [None]:
y_train

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error, recall_score, f1_score

rf_classifier = RandomForestClassifier(n_estimators=100, random_state=42, max_features='sqrt', bootstrap=False)
rf_classifier.fit(X_train, y_train)

y_pred = rf_classifier.predict(X_test)

from sklearn.metrics import f1_score

f1 = f1_score(y_test, y_pred)
# recall = recall_score(y_test, y_pred)

print(f'表现{f1}')

# 开始回测

## Version 2 回测程序（重新开始）

## version 1 回测程序（有错误）

In [None]:
# import datetime
# 
# # 交易规则：预测涨全仓买入，否则直接卖出，这规则错了
# data_to_backtest = data.dropna()
# data_to_backtest = data_to_backtest.iloc[1:, :]
# data_to_backtest.reset_index(inplace=True)
# backtest_dict = {'capital': [100000], 'position': [0], 'date': [data_to_backtest['<DATE>'][0]]}
# backtest = pd.DataFrame(backtest_dict)
# 
# 
# def buy(day_data):
#     global backtest
#     capital = backtest.iloc[len(backtest) - 1, :]['capital']
#     position = backtest.iloc[len(backtest) - 1, :]['position']
#     if capital > 0:
#         print(f'{day_data["<DATE>"]} buy at price {day_data["<OPEN>"]}')
#     if capital > 0:
#         position += capital / day_data['<OPEN>']
#         capital = 0
#     backtest.loc[len(backtest), :] = [capital, position, day_data['<DATE>']]
# 
# 
# def sell(day_data):
#     global backtest
#     capital = backtest.iloc[len(backtest) - 1, :]['capital']
#     position = backtest.iloc[len(backtest) - 1, :]['position']
#     if position > 0:
#         print(f'{day_data["<DATE>"]} sell at price {day_data["<OPEN>"]}')
#     if position > 0:
#         capital += position * day_data['<OPEN>']
#         position = 0
#     backtest.loc[len(backtest), :] = [capital, position, day_data['<DATE>']]
# 
# 
# def no_trade():
#     global backtest
#     capital = backtest.iloc[len(backtest) - 1, :]['capital']
#     position = backtest.iloc[len(backtest) - 1, :]['position']
#     last_date = backtest.iloc[len(backtest) - 1, :]['date']
#     backtest.loc[len(backtest), :] = [capital, position, datetime.datetime.strftime(datetime.datetime.strptime(last_date, '%Y.%m.%d') + datetime.timedelta(days=1), '%Y.%m.%d')]
# 
# 
# hold_iter = -100
# for i in range(len(data_to_backtest) - 1):
#     # 只能看到今天的数据，明天开盘根据今天收盘的数据进行买入
#     tomorrow_data = data_to_backtest.iloc[i+1, :]
#     if backtest.iloc[i, 1] > 0 and i <= hold_iter + 5:
#         # 如果还处于五天的持有期，不进行交易，否则根据预测进行交易
#         no_trade()
#     else:
#         # 如果今天预测股价会涨，那么全仓买入并持有五天，第五天的时候如果还预测涨再持有五天，否则卖出
#         if rf_classifier.predict(pd.DataFrame((data_to_backtest.loc[i, ['momentum', 'rsi', 'bias']] - scaler.mean_) / scaler.scale_).T)[0] == 1:
#             print(f'{data_to_backtest.iloc[i, :]["<DATE>"]} predict bullish')
#             buy(tomorrow_data)
#             hold_iter = i
#         else:
#             print(f'{data_to_backtest.iloc[i, :]["<DATE>"]} predict bearish')
#             sell(tomorrow_data)
# 
# backtest

In [None]:
# data_to_backtest

In [None]:
# result = pd.merge(data_to_backtest, backtest.rename(columns={'date': '<DATE>'}))
# result

In [None]:
# result['asset'] = result['capital'] + result['position'] * result['<CLOSE>']
# result

In [None]:
# import seaborn as sns
# import matplotlib.pyplot as plt
# import matplotlib.dates as mdates
# 
# plt.rcParams['font.family'] = 'Arial Unicode MS'
# plt.rcParams['axes.unicode_minus'] = False
# 
# ax = sns.lineplot(data=result, x='<DATE>', y='asset', color='#d20000')
# sns.set_style('white')
# sns.despine()
# ax.figure.set_size_inches(16, 4)
# ax.set_xlabel('日期')
# ax.set_ylabel('总资产')
# ax.xaxis.set_major_locator(mdates.DayLocator(interval=50))
# plt.xticks(rotation=45)
# ax.hlines(y=100000, xmin=result.loc[0, :]['<DATE>'], xmax=result.loc[len(result) - 1, :]['<DATE>'], colors='#8c8c8c', linestyles='--')
# ax2 = plt.twinx()
# sns.lineplot(data=result, x='<DATE>', y='<CLOSE>', color='blue', ax=ax2)
# plt.show()

In [None]:
# result['predict'] = rf_classifier.predict(pd.DataFrame((result[['momentum', 'rsi', 'bias']] - scaler.mean_) / scaler.scale_))
# result