In [1]:
import yfinance as yf
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import optuna
from optuna.visualization import plot_optimization_history
import plotly.graph_objects as go

In [2]:
# ======================
# 1. 数据获取与预处理
# ======================
def download_stock_data(ticker, start_date, end_date):
    """
    从Yahoo Finance下载股票数据
    参数：
    - ticker: 股票代码（如'AAPL'）
    - start_date/end_date: 日期字符串（'YYYY-MM-DD'）
    返回：
    - 包含OHLCV数据的DataFrame
    """
    data = yf.download(ticker, start=start_date, end=end_date)
    data = data[['Open', 'High', 'Low', 'Close', 'Volume']]  # 选择关键特征
    print(f"下载数据量：{len(data)}条")
    return data

# 下载示例数据（苹果公司股票）
raw_data = download_stock_data('AAPL', '2010-01-01', '2023-12-31')


[*********************100%***********************]  1 of 1 completed

下载数据量：3522条





In [3]:
# ======================
# 2. 特征工程
# ======================
def add_technical_indicators(df):
    """添加技术指标特征"""
    # 计算简单技术指标
    df['MA5'] = df['Close'].rolling(window=5).mean()
    df['MA20'] = df['Close'].rolling(window=20).mean()
    df['RSI'] = 100 - (100 / (1 + df['Close'].diff(1).clip(lower=0).rolling(14).mean() 
                             / df['Close'].diff(1).clip(upper=0).abs().rolling(14).mean()))
    df = df.dropna()
    return df

processed_data = add_technical_indicators(raw_data)

In [5]:
processed_data.head()

Price,Open,High,Low,Close,Volume,MA5,MA20,RSI
Ticker,AAPL,AAPL,AAPL,AAPL,AAPL,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2
2010-02-01,5.79547,5.90483,5.763235,5.866569,749876400,6.024735,6.238213,38.180105
2010-02-02,5.90212,5.914472,5.825899,5.900613,698342400,5.964,6.210873,40.705299
2010-02-03,5.879824,6.031362,5.857229,6.002139,615328000,5.91188,6.188052,41.111452
2010-02-04,5.926824,5.976232,5.771371,5.785832,757652000,5.868257,6.159552,37.62109
2010-02-05,5.803303,5.90483,5.749677,5.888561,850306800,5.888743,6.136776,42.533102


In [7]:
# ======================
# 3. 数据标准化
# ======================
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(processed_data)

In [10]:
# ======================
# 4. 动态序列生成（核心改进点）
# ======================
def create_sequences(data, window_size, target_col=3):  # target_col=3对应Close列
    """
    生成LSTM所需的序列数据
    参数：
    - data: 标准化后的数据矩阵
    - window_size: 时间窗口长度
    - target_col: 目标列索引（收盘价）
    返回：
    - X: 输入序列 (samples, window_size, features)
    - y: 输出值 (samples,)
    """
    X, y = [], []
    for i in range(len(data)-window_size):
        X.append(data[i:i+window_size])
        y.append(data[i+window_size, target_col])
    return np.array(X), np.array(y)

In [11]:
# ======================
# 5. Optuna优化目标函数
# ======================
def objective(trial):
    # 动态超参数定义
    window_size = trial.suggest_int('window_size', 10, 60, step=5)
    n_features = scaled_data.shape[1]
    
    # 生成动态序列
    X, y = create_sequences(scaled_data, window_size)
    
    # 数据集划分（每次试验独立划分）
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, shuffle=False)
    
    # 模型构建
    model = tf.keras.Sequential()
    
    # LSTM层配置
    n_layers = trial.suggest_int('n_layers', 1, 3)
    for i in range(n_layers):
        return_sequences = (i < n_layers-1)  # 最后一层不返回序列
        model.add(LSTM(
            units=trial.suggest_categorical(f'units_layer_{i}', [64, 128, 256]),
            return_sequences=return_sequences,
            input_shape=(window_size, n_features) if i==0 else None
        ))
        model.add(Dropout(trial.suggest_float(f'dropout_{i}', 0.1, 0.5)))
    
    # 输出层
    model.add(Dense(1))
    
    # 编译配置
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss='mse',
        metrics=['mae']
    )
    
    # 训练配置
    batch_size = trial.suggest_categorical('batch_size', [32, 64, 128])
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=100,
        batch_size=batch_size,
        verbose=0,
        callbacks=[
            tf.keras.callbacks.EarlyStopping(
                patience=10,
                restore_best_weights=True
            )
        ]
    )
    
    return history.history['val_loss'][-1]

In [12]:
# ======================
# 6. 执行超参数优化
# ======================
study = optuna.create_study(
    direction='minimize',
    sampler=optuna.samplers.TPESampler(),
    pruner=optuna.pruners.MedianPruner()
)
study.optimize(objective, n_trials=50, show_progress_bar=True)

# 输出最佳参数
print("最佳参数:")
for key, value in study.best_params.items():
    print(f"{key}: {value}")
print(f"最佳验证MSE: {study.best_value:.6f}")

[I 2025-02-06 21:20:09,345] A new study created in memory with name: no-name-09bd86e1-2910-4816-9fbb-63e505d744e3


  0%|          | 0/50 [00:00<?, ?it/s]

  super().__init__(**kwargs)


[I 2025-02-06 21:20:30,544] Trial 0 finished with value: 0.000501026923302561 and parameters: {'window_size': 50, 'n_layers': 1, 'units_layer_0': 64, 'dropout_0': 0.4121669798720007, 'learning_rate': 0.000623229327715508, 'batch_size': 64}. Best is trial 0 with value: 0.000501026923302561.
[I 2025-02-06 21:20:52,842] Trial 1 finished with value: 0.0024769639130681753 and parameters: {'window_size': 25, 'n_layers': 3, 'units_layer_0': 256, 'dropout_0': 0.4053122235583695, 'units_layer_1': 64, 'dropout_1': 0.137096083597686, 'units_layer_2': 64, 'dropout_2': 0.1349475946850077, 'learning_rate': 3.063218348677036e-05, 'batch_size': 128}. Best is trial 0 with value: 0.000501026923302561.
[I 2025-02-06 21:25:37,843] Trial 2 finished with value: 0.01954404078423977 and parameters: {'window_size': 55, 'n_layers': 3, 'units_layer_0': 256, 'dropout_0': 0.2695648977229521, 'units_layer_1': 128, 'dropout_1': 0.4725290486706535, 'units_layer_2': 256, 'dropout_2': 0.33487331455963226, 'learning_rat

In [13]:
# ======================
# 7. 使用最佳参数训练最终模型
# ======================
# 获取最佳参数
best_window = study.best_params['window_size']
X_full, y_full = create_sequences(scaled_data, best_window)

# 划分最终数据集
X_train_final, X_test_final, y_train_final, y_test_final = train_test_split(
    X_full, y_full, test_size=0.2, shuffle=False
)

In [14]:
# 构建最终模型
final_model = tf.keras.Sequential()
for i in range(study.best_params['n_layers']):
    return_sequences = (i < study.best_params['n_layers']-1)
    final_model.add(LSTM(
        units=study.best_params[f'units_layer_{i}'],
        return_sequences=return_sequences,
        input_shape=(best_window, X_full.shape[2])
    ))
    final_model.add(Dropout(study.best_params[f'dropout_{i}']))
final_model.add(Dense(1))

final_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=study.best_params['learning_rate']),
    loss='mse'
)

In [15]:
# 训练最终模型
history = final_model.fit(
    X_train_final, y_train_final,
    epochs=200,
    batch_size=study.best_params['batch_size'],
    validation_data=(X_test_final, y_test_final),
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=20)
    ]
)


Epoch 1/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - loss: 0.0074 - val_loss: 0.0024
Epoch 2/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 7.0966e-04 - val_loss: 0.0041
Epoch 3/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 4.8436e-04 - val_loss: 6.3363e-04
Epoch 4/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 3.0969e-04 - val_loss: 0.0022
Epoch 5/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 3.5050e-04 - val_loss: 6.1080e-04
Epoch 6/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 3.0602e-04 - val_loss: 0.0033
Epoch 7/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - loss: 2.7611e-04 - val_loss: 0.0024
Epoch 8/200
[1m88/88[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - loss: 2.4310e-04 - val_loss: 6.2595e-04
Epoch 9/

In [17]:
# ======================
# 8. 结果可视化
# ======================
# 预测测试集
predictions = final_model.predict(X_test_final)
test_predictions = predictions.flatten()
# 反标准化
def inverse_scale_predictions(scaler, data, target_col=3):
    """
    专用反标准化函数（针对单列预测值）
    参数：
    - scaler: 已训练的MinMaxScaler对象
    - data: 预测值数组 (n_samples,)
    - target_col: 目标列在原始数据中的索引
    返回：
    - 反标准化后的目标列值 (n_samples,)
    """
    dummy = np.zeros((len(data), scaler.n_features_in_))
    dummy[:, target_col] = data
    return scaler.inverse_transform(dummy)[:, target_col]

[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step


In [18]:
# 反标准化预测值
test_predictions_inv = inverse_scale_predictions(scaler, test_predictions)

# 反标准化真实值
true_values_inv = inverse_scale_predictions(scaler, y_test_final)

# --- 可视化修正 ---
# 生成日期索引（确保长度匹配）
last_dates = processed_data.index[-len(true_values_inv):]

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=last_dates,
    y=true_values_inv,
    mode='lines',
    name='实际值',
    line=dict(color='#1f77b4')
))
fig.add_trace(go.Scatter(
    x=last_dates,
    y=test_predictions_inv,
    mode='lines',
    name='预测值',
    line=dict(color='#ff7f0e', dash='dot')
))

fig.update_layout(
    title=f'股票价格预测效果（最佳窗口：{best_window}天）',
    xaxis_title='日期',
    yaxis_title='收盘价（美元）',
    template='plotly_dark',
    hovermode='x unified'
)
fig.show()

In [19]:
# --- 性能指标计算 ---
mse = np.mean((true_values_inv - test_predictions_inv)**2)
print(f"\n测试集MSE: {mse:.4f}")
print(f"测试集RMSE: {np.sqrt(mse):.4f}")
print(f"测试集MAE: {np.mean(np.abs(true_values_inv - test_predictions_inv)):.4f}")


测试集MSE: 9.3911
测试集RMSE: 3.0645
测试集MAE: 2.4147
