In [10]:
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder

In [4]:
df = pd.read_csv('cleaned_data.csv')

df['Date'] = pd.to_datetime(df['Date'])

df['T10Y3M'] = pd.to_numeric(df['T10Y3M'], errors='coerce')
df['T10YFF'] = pd.to_numeric(df['T10YFF'], errors='coerce')
df['T10Y2Y'] = pd.to_numeric(df['T10Y2Y'], errors='coerce')

print(df.dtypes)

Date                  datetime64[ns]
Open                         float64
High                         float64
Low                          float64
Close                        float64
Adj Close                    float64
Volume                         int64
sp500return                  float64
CPIAUCSL                     float64
FEDFUNDS                     float64
mktrf                        float64
smb                          float64
hml                          float64
rf                           float64
umd                          float64
GDP                          float64
PPIACO                       float64
T10Y2Y                       float64
T10Y3M                       float64
T10YFF                       float64
consumer_sentiment           float64
VIXCLS                       float64
WM2NS                        float64
dtype: object


In [5]:
window_size = 20
df['target'] = df['Close'].rolling(window=window_size).mean()
df['target'] = df['target'].pct_change()
df['target'] = df['target'].apply(lambda x: 1 if x > 0.0001 else (-1 if x < -0.0001 else 0))
df.dropna(inplace=True)

X = df.drop('target', axis=1)
y = df['target']
y = y.shift(-1)
y.dropna(inplace=True)
X = X.iloc[:len(y)]

start_date = pd.to_datetime("2018-01-01")
end_date = pd.to_datetime("2021-12-31")

# 创建一个布尔序列，指示每行数据是否属于测试集
is_test = (df['Date'] >= start_date) & (df['Date'] <= end_date)

X_train = X[~is_test].drop('Date', axis=1)
y_train = y[~is_test]
X_test = X[is_test].drop('Date', axis=1)
y_test = y[is_test]

  X_train = X[~is_test].drop('Date', axis=1)
  X_test = X[is_test].drop('Date', axis=1)


In [6]:
print(f'total data: {df.shape[0]}')
print(f'training data: {X_train.shape[0]}')
print(f'y training data: {y_train.shape[0]}')
print(f'testing data: {X_test.shape[0]}')
print(f'y testing data: {y_test.shape[0]}')

total data: 3968
training data: 2998
y training data: 2998
testing data: 969
y testing data: 969


In [7]:
unique, counts = np.unique(y_train.values, return_counts=True)
dict(zip(unique, counts))

{-1.0: 998, 0.0: 139, 1.0: 1861}

调参：使用时间序列交叉验证器

In [24]:
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

param_grid = {
    'max_depth': [2, 3, 4],
    'n_estimators': [150,200,250],
    'learning_rate': [0.01, 0.1, 0.5]
}

model = xgb.XGBClassifier(objective='multi:softprob', num_class=3)  # 适用于你的三分类任务

tscv = TimeSeriesSplit(n_splits=3)

grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=tscv, scoring='accuracy')  # 使用准确度作为评分指标

grid_search.fit(X_train, y_train_encoded)

print("Best parameters:", grid_search.best_params_)

Best parameters: {'learning_rate': 0.01, 'max_depth': 3, 'n_estimators': 250}


In [25]:
model = grid_search.best_estimator_
y_pred_encoded = model.predict(X_test)

y_pred = label_encoder.inverse_transform(y_pred_encoded)

accuracy = accuracy_score(y_test_encoded, y_pred_encoded)
print("Accuracy:", accuracy)

Accuracy: 0.5748194014447885


调参：使用贝叶斯优化

In [28]:
from hyperopt import hp, fmin, tpe, STATUS_OK, Trials
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score

def objective(space):
    clf = xgb.XGBClassifier(
        n_estimators =int(space['n_estimators']), 
        max_depth = int(space['max_depth']), 
        learning_rate = space['learning_rate'],
        min_child_weight = space['min_child_weight'],
        gamma = space['gamma'],
        colsample_bytree = space['colsample_bytree']
    )
    
    accuracy = cross_val_score(clf, X_train, y_train_encoded, cv=TimeSeriesSplit(n_splits=3), scoring="accuracy").mean()

    return {'loss': -accuracy, 'status': STATUS_OK }


space = {
    'max_depth': hp.quniform('max_depth', 3, 12, 1),
    'n_estimators': hp.quniform('n_estimators', 50, 500, 50),
    'learning_rate': hp.uniform('learning_rate', 0.01, 0.2),
    'min_child_weight': hp.quniform('min_child_weight', 1, 6, 1),
    'gamma': hp.uniform('gamma', 0, 0.5),
    'colsample_bytree': hp.uniform('colsample_bytree', 0.3, 1)
}


trials = Trials()
best_hyperparams = fmin(fn = objective,
                        space = space,
                        algo = tpe.suggest,
                        max_evals = 100,
                        trials = trials)

print("The best hyperparameters are : ","\n")
print(best_hyperparams)

100%|██████████| 100/100 [01:06<00:00,  1.50trial/s, best loss: -0.7058299955496218]
The best hyperparameters are :  

{'colsample_bytree': 0.5280025235020058, 'gamma': 0.1641097714897643, 'learning_rate': 0.15311594393133118, 'max_depth': 3.0, 'min_child_weight': 4.0, 'n_estimators': 300.0}


In [29]:
best_params = {
    'max_depth': int(best_hyperparams['max_depth']),
    'n_estimators': int(best_hyperparams['n_estimators']),
    'learning_rate': best_hyperparams['learning_rate'],
    'min_child_weight': best_hyperparams['min_child_weight'],
    'gamma': best_hyperparams['gamma'],
    'colsample_bytree': best_hyperparams['colsample_bytree']
}

# 创建并训练XGBoost模型
model_2 = xgb.XGBClassifier(objective='multi:softprob', num_class=3, **best_params)
model_2.fit(X_train, y_train_encoded)

# 进行预测
y_pred_encoded_2 = model_2.predict(X_test)

y_pred_2 = label_encoder.inverse_transform(y_pred_encoded_2)

# 计算准确度
accuracy = accuracy_score(y_test_encoded, y_pred_encoded_2)
print("Accuracy:", accuracy)


Accuracy: 0.6202270381836945
