In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, MultiHeadAttention
from tensorflow.keras.models import Model
import yfinance as yf  # 导入获取股票数据的库

In [5]:
# --------------------------1. 从网上获取股票数据--------------------------
# 定义股票代码和时间范围，可自行修改
stock_code = "AAPL"  # 苹果公司股票代码，例如腾讯是0700.HK，阿里巴巴是BABA
start_date = "2018-01-01"
end_date = "2025-10-31"

# 下载股票数据，data包含开盘价、收盘价、最高价、最低价、成交量等信息
# 这里获取的就是多个截面特征，构成后续建模的特征矩阵
data = yf.download(stock_code, start=start_date, end=end_date)
# 保留需要的特征（可根据需求增减，比如去掉Volume）
features = ['Open', 'High', 'Low', 'Close', 'Volume']
data = data[features].dropna()  # 删除缺失值
print(f"获取到的股票数据形状: {data.shape}")
print("前5行数据：")
print(data.head())

  data = yf.download(stock_code, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed

1 Failed download:
['AAPL']: YFRateLimitError('Too Many Requests. Rate limited. Try after a while.')


获取到的股票数据形状: (0, 5)
前5行数据：
Empty DataFrame
Columns: [(Open, AAPL), (High, AAPL), (Low, AAPL), (Close, AAPL), (Volume, AAPL)]
Index: []


In [6]:
data

Price,Open,High,Low,Close,Volume
Ticker,AAPL,AAPL,AAPL,AAPL,AAPL
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2


In [None]:

# --------------------------2. 数据预处理--------------------------
# 初始化归一化器，对所有特征做归一化（股票价格和成交量量级差异大）
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(data)

# 划分训练集和测试集，80%训练，20%测试
train_size = int(len(scaled_data) * 0.8)
train_data = scaled_data[:train_size]
test_data = scaled_data[train_size:]

# 定义时间窗口，可根据数据频率调整
# 日度数据常用20-60，这里选30；若为小时级数据可缩小到5-10
time_window = 30
# 预测目标是下一个时间步的收盘价，先记录收盘价在特征中的索引
close_idx = features.index('Close')

In [None]:

# --------------------------3. 构建序列数据--------------------------
def create_sequences(data, time_window, target_idx):
    """
    生成LSTM输入序列和对应标签
    data: 归一化后的特征数据
    time_window: 时间窗口大小
    target_idx: 目标特征（收盘价）的索引
    返回：X(样本数, 时间窗口, 特征数)，y(样本数, 1)
    """
    X, y = [], []
    for i in range(time_window, len(data)):
        # 每个样本包含time_window个时间步的所有特征
        X.append(data[i - time_window:i])
        # 标签是下一个时间步的收盘价
        y.append(data[i, target_idx])
    return np.array(X), np.array(y)

# 生成训练集和测试集的序列数据
train_X, train_y = create_sequences(train_data, time_window, close_idx)
test_X, test_y = create_sequences(test_data, time_window, close_idx)

print(f"训练集输入形状: {train_X.shape}")  # (样本数, 30, 5)，5个特征
print(f"训练集标签形状: {train_y.shape}")
print(f"测试集输入形状: {test_X.shape}")

# --------------------------4. 构建LSTM+Attention模型--------------------------
def build_lstm_attention_model(input_shape):
    inputs = Input(shape=input_shape)
    # LSTM层：50个神经元，return_sequences=True返回所有时间步输出，供Attention使用
    lstm_out = LSTM(50, return_sequences=True, activation='tanh')(inputs)
    # 多头注意力层：8个注意力头，key_dim是每个头的键维度
    # 自注意力机制，对LSTM输出的整个序列计算注意力权重
    attention_out = MultiHeadAttention(num_heads=8, key_dim=64)(lstm_out, lstm_out)
    # 拼接LSTM输出和注意力输出，融合两种特征
    combined = lstm_out + attention_out
    # 取最后一个时间步的特征用于预测
    last_step = combined[:, -1, :]
    # 全连接层输出预测结果
    output = Dense(1, activation='linear')(last_step)

    model = Model(inputs=inputs, outputs=output)
    return model

# 初始化模型，输入形状为(时间窗口, 特征数)
model = build_lstm_attention_model((time_window, len(features)))
# 编译模型：adam优化器适合时序数据，均方误差适合回归问题
model.compile(optimizer='adam', loss='mean_squared_error')
model.summary()  # 打印模型结构

# --------------------------5. 模型训练--------------------------
# 训练模型，validation_split=0.2表示从训练集中分20%作为验证集
history = model.fit(
    train_X, train_y,
    epochs=50,  # 训练轮数，可根据验证集损失调整，避免过拟合
    batch_size=32,  # 批次大小，小批次训练更稳定但速度慢
    validation_split=0.2,
    verbose=1
)

# --------------------------6. 预测与反归一化--------------------------
# 模型预测
train_pred = model.predict(train_X, verbose=0)
test_pred = model.predict(test_X, verbose=0)

# 反归一化：需要构造和原始数据同维度的数组才能反变换
# 因为预测的是收盘价，先初始化全0数组，再填入预测值
train_pred_reshaped = np.zeros(shape=(len(train_pred), len(features)))
train_pred_reshaped[:, close_idx] = train_pred.flatten()
train_pred = scaler.inverse_transform(train_pred_reshaped)[:, close_idx]

test_pred_reshaped = np.zeros(shape=(len(test_pred), len(features)))
test_pred_reshaped[:, close_idx] = test_pred.flatten()
test_pred = scaler.inverse_transform(test_pred_reshaped)[:, close_idx]

# 对真实标签反归一化
train_y_reshaped = np.zeros(shape=(len(train_y), len(features)))
train_y_reshaped[:, close_idx] = train_y.flatten()
train_y_true = scaler.inverse_transform(train_y_reshaped)[:, close_idx]

test_y_reshaped = np.zeros(shape=(len(test_y), len(features)))
test_y_reshaped[:, close_idx] = test_y.flatten()
test_y_true = scaler.inverse_transform(test_y_reshaped)[:, close_idx]

# --------------------------7. 结果可视化--------------------------
# 构建时间索引，用于绘图
train_dates = data.index[time_window:train_size]
test_dates = data.index[train_size + time_window:]

# 绘制训练集预测结果
plt.figure(figsize=(12, 4))
plt.plot(train_dates, train_y_true, label='真实收盘价', color='blue')
plt.plot(train_dates, train_pred, label='预测收盘价', color='red', alpha=0.7)
plt.title(f'{stock_code} 训练集价格预测')
plt.xlabel('日期')
plt.ylabel('价格')
plt.legend()
plt.show()

# 绘制测试集预测结果
plt.figure(figsize=(12, 4))
plt.plot(test_dates, test_y_true, label='真实收盘价', color='blue')
plt.plot(test_dates, test_pred, label='预测收盘价', color='red', alpha=0.7)
plt.title(f'{stock_code} 测试集价格预测')
plt.xlabel('日期')
plt.ylabel('价格')
plt.legend()
plt.show()

# 绘制训练损失曲线
plt.figure(figsize=(8, 4))
plt.plot(history.history['loss'], label='训练损失')
plt.plot(history.history['val_loss'], label='验证损失')
plt.title('模型损失变化')
plt.xlabel('训练轮数')
plt.ylabel('损失值')
plt.legend()
plt.show()