In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, Concatenate, Lambda
from tensorflow.keras.models import Model
import numpy as np
import random
import os
import tensorflow.keras.backend as K
# **设置随机种子**
SEED = 200
np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)

# **1️⃣ 加载数据**
X_train_dd = np.load(r"D:\Dataset\Temp_Results\DelayDoppler\X_train_dd_cnn.npy")
Y_train = np.load(r"D:\Dataset\Temp_Results\Train_Test\y_train.npy")
X_test_dd = np.load(r"D:\Dataset\Temp_Results\DelayDoppler\X_test_dd_cnn.npy")
Y_test = np.load(r"D:\Dataset\Temp_Results\Train_Test\y_test.npy")

X_train_mfcc = np.load(r"D:\Dataset\Temp_Results\MFCC\X_train_mfcc_cnn.npy")
X_test_mfcc = np.load(r"D:\Dataset\Temp_Results\MFCC\X_test_mfcc_cnn.npy")

X_train_wavelet = np.load(r"D:\Dataset\Temp_Results\Wavelet\X_train_cwt_abs.npy")
X_test_wavelet = np.load(r"D:\Dataset\Temp_Results\Wavelet\X_test_cwt_abs.npy")

# **2️⃣ 加载 SNR 和 LHR**
snr_lhr_train = np.load(r"D:\Dataset\Temp_Results\SNRandLHR\SNR_LHR_train.npy", allow_pickle=True)
snr_lhr_test = np.load(r"D:\Dataset\Temp_Results\SNRandLHR\SNR_LHR_test.npy", allow_pickle=True)

snr_lhr_train = snr_lhr_train[:, 1:].astype(np.float32)  # 保留数值列并转换为 float32
snr_lhr_test = snr_lhr_test[:, 1:].astype(np.float32)


# **3️⃣ 数据预处理**
X_train_dd = X_train_dd.astype(np.float32) / 255.0
X_test_dd = X_test_dd.astype(np.float32) / 255.0
X_train_mfcc = X_train_mfcc.astype(np.float32) / 255.0
X_test_mfcc = X_test_mfcc.astype(np.float32) / 255.0
X_train_wavelet = X_train_wavelet.astype(np.float32) / 255.0
X_test_wavelet = X_test_wavelet.astype(np.float32) / 255.0

X_train_dd = np.expand_dims(X_train_dd, axis=-1)
X_test_dd = np.expand_dims(X_test_dd, axis=-1)
X_train_mfcc = np.expand_dims(X_train_mfcc, axis=-1)
X_test_mfcc = np.expand_dims(X_test_mfcc, axis=-1)
X_train_wavelet = np.expand_dims(X_train_wavelet, axis=-1)
X_test_wavelet = np.expand_dims(X_test_wavelet, axis=-1)

num_classes = len(np.unique(Y_train))
Y_train = tf.keras.utils.to_categorical(Y_train, num_classes)
Y_test = tf.keras.utils.to_categorical(Y_test, num_classes)


# **4️⃣ 构建 CNN 模型（DelayDoppler, MFCC, Wavelet）**
def create_cnn(input_shape):
    model = tf.keras.Sequential([
        Conv2D(32, (3, 3), activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0005), input_shape=input_shape),
        MaxPooling2D((2, 2)),
        Dropout(0.4),
        
        Conv2D(64, (3, 3), activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0005)),
        MaxPooling2D((2, 2)),
        Dropout(0.4),
        
        Conv2D(128, (3, 3), activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0005)),
        MaxPooling2D((2, 2)),
        Dropout(0.4),
        
        Flatten(),
        Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0005)),
        Dropout(0.4),
        
        Dense(num_classes, activation='softmax')
    ])
    return model

dd_model = create_cnn((128, 128, 1))
mfcc_model = create_cnn((128, 128, 1))
wavelet_model = create_cnn((128, 128, 1))

# **5️⃣ 构建门控网络**
snr_lhr_input = Input(shape=(snr_lhr_train.shape[1],))  # 假设 SNR 和 LHR 数据是二维的

# 门控机制，使用全连接层计算加权系数
gate_layer = Dense(64, activation='relu')(snr_lhr_input)
gate_layer = Dense(3, activation='sigmoid')(gate_layer)  # 输出3个加权系数，用于控制三个CNN网络的融合

# 输入层
dd_input = Input(shape=(128, 128, 1))
mfcc_input = Input(shape=(128, 128, 1))
wavelet_input = Input(shape=(128, 128, 1))

# 各自的CNN网络输出
dd_output = dd_model(dd_input)
mfcc_output = mfcc_model(mfcc_input)
wavelet_output = wavelet_model(wavelet_input)

# 门控机制调整网络输出
dd_weighted = Lambda(lambda x: x[0] * x[1])([dd_output, gate_layer[:, 0:1]])  # 使用门控权重来加权输出
mfcc_weighted = Lambda(lambda x: x[0] * x[1])([mfcc_output, gate_layer[:, 1:2]])
wavelet_weighted = Lambda(lambda x: x[0] * x[1])([wavelet_output, gate_layer[:, 2:3]])

# 融合三个加权输出
merged = Concatenate()([dd_weighted, mfcc_weighted, wavelet_weighted])

# 最终分类层
final_output = Dense(num_classes, activation='softmax')(merged)

# **6️⃣ 定义整个模型**
model = Model(inputs=[dd_input, mfcc_input, wavelet_input, snr_lhr_input], outputs=final_output)

# **7️⃣ 编译模型**
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
              loss='categorical_crossentropy',
              metrics=['accuracy'])


# **8️⃣ 训练模型**
history = model.fit(
    [X_train_dd, X_train_mfcc, X_train_wavelet, snr_lhr_train], Y_train,
    epochs=30, batch_size=32,
    validation_data=([X_test_dd, X_test_mfcc, X_test_wavelet, snr_lhr_test], Y_test)
)

# 定义一个 Keras 函数，用于获取 gate_layer 输出
get_gate_layer_output = K.function([model.input], [model.get_layer('dense_124').output])

# 获取 gate_layer 的输出
gate_layer_output = get_gate_layer_output([X_train_dd, X_train_mfcc, X_train_wavelet, snr_lhr_train])

print(gate_layer_output)

# **9️⃣ 评估模型**
test_loss, test_acc = model.evaluate([X_test_dd, X_test_mfcc, X_test_wavelet, snr_lhr_test], Y_test, verbose=2)
print(f"\n✅ 测试集准确率: {test_acc:.4f}")

# **10️⃣ 保存模型**
model.save(r"D:\Dataset\Temp_Results\Model\dynamic_gate_model.keras")
print("✅ 模型已保存为 dynamic_gate_model.keras")


Epoch 1/30
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 335ms/step - accuracy: 0.3956 - loss: 1.5842 - val_accuracy: 0.4786 - val_loss: 1.3515
Epoch 2/30
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 314ms/step - accuracy: 0.4833 - loss: 1.3063 - val_accuracy: 0.5299 - val_loss: 1.1662
Epoch 3/30
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 316ms/step - accuracy: 0.5197 - loss: 1.1742 - val_accuracy: 0.5299 - val_loss: 1.0882
Epoch 4/30
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 388ms/step - accuracy: 0.5220 - loss: 1.1019 - val_accuracy: 0.5812 - val_loss: 1.0140
Epoch 5/30
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 384ms/step - accuracy: 0.6017 - loss: 1.0155 - val_accuracy: 0.7949 - val_loss: 0.9219
Epoch 6/30
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 386ms/step - accuracy: 0.7479 - loss: 0.9277 - val_accuracy: 0.7949 - val_loss: 0.9471
Epoch 7/30
[1m15/15[0m [3

AttributeError: module 'tensorflow.keras.backend' has no attribute 'function'

In [None]:
model.summary()  # 查看模型结构，确认输出层为 3 个神经元，并使用 softmax 激活


In [35]:
model.summary()  # 查看模型结构，确认输出层为 3 个神经元，并使用 softmax 激活
