In [None]:
import numpy as np
from pandas import read_csv, DataFrame, concat
from sklearn.preprocessing import MinMaxScaler
from keras.models import Model
from keras.layers import Input, LSTM, Dense, Dropout, Concatenate
from keras.regularizers import l2
from keras.callbacks import EarlyStopping, ReduceLROnPlateau


def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
    """转换时间序列为监督学习格式"""
    n_vars = 1 if type(data) is list else data.shape[1]
    df = DataFrame(data)
    cols, names = list(), list()
    for i in range(n_in, 0, -1):  # 输入序列 (t-n_in, ..., t-1)
        cols.append(df.shift(i))
        names += [('var%d(t-%d)' % (j + 1, i)) for j in range(n_vars)]
    for i in range(0, n_out):  # 预测序列 (t, t+1, ...)
        cols.append(df.shift(-i))
        if i == 0:
            names += [('var%d(t)' % (j + 1)) for j in range(n_vars)]
        else:
            names += [('var%d(t+%d)' % (j + 1, i)) for j in range(n_vars)]
    agg = concat(cols, axis=1)
    agg.columns = names
    if dropnan:
        agg.dropna(inplace=True)
    return agg


class MultiModalCurrencyLSTMModel:
    def __init__(self, price_path, sentiment_path, look_back=10):
        self.price_path = price_path
        self.sentiment_path = sentiment_path
        self.look_back = look_back
        # 分别为价格和情绪量表
        self.scaler_price = MinMaxScaler(feature_range=(0, 1))
        self.scaler_sentiment = MinMaxScaler(feature_range=(0, 1))
        self.scaler_y = MinMaxScaler(feature_range=(0, 1))  # 仅对y做归一化
        self.model = None

    def load_and_prepare_data(self):
        # 1. 读取CSV数据
        price_df = read_csv(self.price_path, header=0, index_col=0)
        sentiment_df = read_csv(self.sentiment_path, header=0, index_col=0)

        price_values = price_df.values.astype('float32')  # 价格序列
        sentiment_values = sentiment_df.values.astype('float32')  # 情绪序列

        # 2. 分别归一化
        price_scaled = self.scaler_price.fit_transform(price_values)
        sentiment_scaled = self.scaler_sentiment.fit_transform(sentiment_values)

        # 3. 转为监督学习格式
        price_supervised = series_to_supervised(price_scaled, self.look_back, 1)
        sentiment_supervised = series_to_supervised(sentiment_scaled, self.look_back, 1)

        # 4. 行数对齐（取最短长度），保证对应数据匹配
        min_len = min(len(price_supervised), len(sentiment_supervised))
        price_supervised = price_supervised.iloc[-min_len:]
        sentiment_supervised = sentiment_supervised.iloc[-min_len:]

        price_values = price_supervised.values
        sentiment_values = sentiment_supervised.values

        # 5. 分训练集和测试集（默认70%训练）
        train_size = int(min_len * 0.7)
        price_train = price_values[:train_size, :]
        price_test = price_values[train_size:, :]
        sentiment_train = sentiment_values[:train_size, :]
        sentiment_test = sentiment_values[train_size:, :]

        # 6. 拆分为X特征和y目标
        price_train_X, price_train_y = price_train[:, :-1], price_train[:, -1]
        price_test_X, price_test_y = price_test[:, :-1], price_test[:, -1]
        sentiment_train_X = sentiment_train[:, :-1]
        sentiment_test_X = sentiment_test[:, :-1]

        # 7. 目标y归一化，方便模型训练
        price_train_y = price_train_y.reshape(-1, 1)
        price_test_y = price_test_y.reshape(-1, 1)
        self.scaler_y.fit(price_train_y)
        price_train_y = self.scaler_y.transform(price_train_y)
        price_test_y = self.scaler_y.transform(price_test_y)

        # 8. 形状重塑成RNN(样本数, 时间步长, 特征数=1)
        price_train_X = price_train_X.reshape((price_train_X.shape[0], self.look_back, 1))
        price_test_X = price_test_X.reshape((price_test_X.shape[0], self.look_back, 1))
        sentiment_train_X = sentiment_train_X.reshape((sentiment_train_X.shape[0], self.look_back, 1))
        sentiment_test_X = sentiment_test_X.reshape((sentiment_test_X.shape[0], self.look_back, 1))

        return (price_train_X, sentiment_train_X, price_train_y), (price_test_X, sentiment_test_X, price_test_y)

    def build_model(self):
        # 价格输入分支
        price_input = Input(shape=(self.look_back, 1), name="price_input")
        price_lstm = LSTM(50, return_sequences=True, kernel_regularizer=l2(0.01))(price_input)
        price_lstm = Dropout(0.3)(price_lstm)
        price_lstm = LSTM(100, return_sequences=False, kernel_regularizer=l2(0.01))(price_lstm)
        price_lstm = Dropout(0.3)(price_lstm)

        # 情绪输入分支
        sentiment_input = Input(shape=(self.look_back, 1), name="sentiment_input")
        sentiment_lstm = LSTM(30, return_sequences=True, kernel_regularizer=l2(0.01))(sentiment_input)
        sentiment_lstm = Dropout(0.3)(sentiment_lstm)
        sentiment_lstm = LSTM(60, return_sequences=False, kernel_regularizer=l2(0.01))(sentiment_lstm)
        sentiment_lstm = Dropout(0.3)(sentiment_lstm)

        # 融合
        merged = Concatenate()([price_lstm, sentiment_lstm])

        # 全连接层
        dense1 = Dense(50, activation='relu')(merged)
        dense2 = Dropout(0.3)(dense1)
        output = Dense(1)(dense2)

        model = Model(inputs=[price_input, sentiment_input], outputs=output)
        model.compile(loss='mae', optimizer='adam')
        self.model = model
        return model

    def train(self, epochs=100, batch_size=64):
        (price_train_X, sentiment_train_X, train_y), (price_test_X, sentiment_test_X, test_y) = self.load_and_prepare_data()
        self.build_model()

        early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
        lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)

        history = self.model.fit(
            [price_train_X, sentiment_train_X], train_y,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=([price_test_X, sentiment_test_X], test_y),
            verbose=2,
            shuffle=False,
            callbacks=[early_stopping, lr_scheduler]
        )
        return history

    def predict(self, price_X, sentiment_X):
        yhat = self.model.predict([price_X, sentiment_X])
        # 逆归一化输出
        yhat_inv = self.scaler_y.inverse_transform(yhat)
        return yhat_inv




In [None]:
# === 使用示例 ===
# model = MultiModalCurrencyLSTMModel('price_data.csv', 'sentiment_data.csv', look_back=10)
# history = model.train(epochs=50, batch_size=32)
#
# # 准备测试数据
# (price_train_X, sentiment_train_X, train_y), (price_test_X, sentiment_test_X, test_y) = model.load_and_prepare_data()
#
# # 预测
# predictions = model.predict(price_test_X, sentiment_test_X)
#
# # 如果想用 sklearn 指标评估
# from sklearn.metrics import mean_absolute_error, mean_squared_error
# y_true = model.scaler_y.inverse_transform(test_y.reshape(-1, 1))
# mae = mean_absolute_error(y_true, predictions)
# rmse = np.sqrt(mean_squared_error(y_true, predictions))
# print(f'MAE: {mae:.4f}, RMSE: {rmse:.4f}')