# 带有一维分类器的循环一致生成对抗网络模型

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np

获取样本数据和标签数据

In [None]:
source_signals = np.load('simulate_signals.npy')  
target_signals = np.load('real_signals.npy') 
source_labels = np.load('simulate_labels.npy')
target_labels = np.load('real_labels.npy')

print(source_signals.shape)
print(target_signals.shape)

创建生成器、判别器和分类器

In [None]:
seed = 1234
tf.random.set_seed(seed)
initializer = tf.keras.initializers.GlorotNormal()

class residual_block(layers.Layer):         #自定义残差块
    def __init__(self, filters=8, kernel_size=12, strides=1, **kwargs):
        super(residual_block, self).__init__(**kwargs)
        self.conv = layers.Conv1D(filters=filters, kernel_size=kernel_size, strides=strides, padding='same', kernel_initializer=initializer)
        self.bn = layers.BatchNormalization()
        self.relu = layers.ReLU()


    def call(self, inputs):
        x = self.conv(inputs)
        x = self.bn(x)
        x = layers.add([inputs, x])
        return self.relu(x)

def build_generator():
    model = tf.keras.Sequential()
    model.add(layers.Input(shape=(2048,1)))
    model.add(layers.Conv1D(filters=20, kernel_size=64, strides=8, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Conv1D(filters=30, kernel_size=24, strides=4, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Conv1D(filters=40, kernel_size=12, strides=4, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(residual_block(filters=40, kernel_size=12, strides=1))
    model.add(residual_block(filters=40, kernel_size=12, strides=1))
    model.add(residual_block(filters=40, kernel_size=12, strides=1))
    model.add(layers.Conv1DTranspose(filters=30, kernel_size=12, strides=4, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Conv1DTranspose(filters=20, kernel_size=24, strides=4, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Conv1DTranspose(filters=1, kernel_size=64, strides=8, padding='same', activation='tanh', kernel_initializer=initializer))
    return model

def build_discriminator():
    model = tf.keras.Sequential()
    model.add(layers.Input(shape=(2048,1)))
    model.add(layers.Conv1D(filters=20, kernel_size=64, strides=8, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Conv1D(filters=30, kernel_size=24, strides=4, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Conv1D(filters=40, kernel_size=12, strides=4, padding='same', kernel_initializer=initializer))
    model.add(layers.Flatten())
    model.add(layers.Dense(1, activation='sigmoid', kernel_initializer=initializer))
    return model

def build_classifier(num_class=4):
    model = tf.keras.Sequential()
    model.add(layers.Input(shape=(2048,1)))
    model.add(layers.Conv1D(filters=20, kernel_size=64, strides=8, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Conv1D(filters=30, kernel_size=24, strides=4, padding='same', kernel_initializer=initializer))
    model.add(layers.BatchNormalization())
    model.add(layers.ReLU())
    model.add(layers.Flatten())
    model.add(layers.Dense(500, kernel_initializer=initializer))
    model.add(layers.Dense(num_class, activation='softmax', kernel_initializer=initializer))
    return model

# generator = build_generator()
# discriminator = build_discriminator()
# classifier = build_classifier()

定义损失函数

In [None]:
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=False)

def discriminator_loss(real, fake):
    real_loss = loss_object(tf.ones_like(real), real)
    fake_loss = loss_object(tf.zeros_like(fake), fake)
    return real_loss + fake_loss

def generator_loss(fake):
    return loss_object(tf.ones_like(fake), fake)

def cycle_loss(real, cycled):
    return tf.reduce_mean(tf.abs(tf.cast(real, tf.float64) - tf.cast(cycled, tf.float64)))

def classifier_loss(real, pred):
    return tf.cast(loss_object(real, pred), tf.float64)


In [None]:
# 基本超参数
uncorresponded_weight = 0.3
cycle_loss_weight = 10
source_batch = 16
target_batch = 4


定义训练流程和关键模块

In [None]:
generator_G = build_generator()  # 从源域到目标域
generator_F = build_generator()  # 从目标域到源域
discriminator_Ds = build_discriminator()  # 判别源域
discriminator_Dt = build_discriminator()  # 判别目标域
classifier_Cs = build_classifier()  # 对源域信号进行分类
classifier_Ct = build_classifier()  # 对目标域信号进行分类

generator_G_optimizer = tf.keras.optimizers.Adam(learning_rate=0.002, beta_1=0.5)
generator_F_optimizer = tf.keras.optimizers.Adam(learning_rate=0.002, beta_1=0.5)
discriminator_Ds_optimizer = tf.keras.optimizers.Adam(learning_rate=0.002, beta_1=0.5)
discriminator_Dt_optimizer = tf.keras.optimizers.Adam(learning_rate=0.002, beta_1=0.5)
classifier_Cs_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)
classifier_Ct_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5)

# @tf.function
def train_on_batch(real_x, real_y, real_label_x, real_label_y, normal_count):
    with tf.GradientTape(persistent=True) as tape:
        fake_y = generator_G(real_x)
        cycled_x = generator_F(fake_y)

        fake_x = generator_F(real_y)
        cycled_y = generator_G(fake_x)

        # 判别器结果
        disc_real_x = discriminator_Ds(real_x)
        disc_fake_x = discriminator_Ds(fake_x)

        disc_real_y = discriminator_Dt(real_y)
        disc_fake_y = discriminator_Dt(fake_y)

        # 分类器结果
        pred_labels_real_x = classifier_Cs(real_x)
        pred_labels_fake_x = classifier_Cs(fake_x)
        
        pred_labels_real_y = classifier_Ct(real_y)
        pred_labels_fake_y = classifier_Ct(fake_y)

        # 判别器损失
        disc_Ds_loss = discriminator_loss(disc_real_x, disc_fake_x)
        disc_Dt_loss = discriminator_loss(disc_real_y, disc_fake_y)

        # 分类器损失
        class_Cs_loss = classifier_loss(real_label_x, pred_labels_real_x) + classifier_loss(real_label_y, pred_labels_fake_x)
        class_Ct_loss = classifier_loss(real_label_x[:normal_count], pred_labels_fake_y[:normal_count]) + uncorresponded_weight * classifier_loss(real_label_x[normal_count:], pred_labels_fake_y[normal_count:])

        # 生成器损失
        gen_G_loss = tf.cast(disc_Ds_loss, tf.float64) + tf.cast(disc_Dt_loss, tf.float64) + cycle_loss_weight * (cycle_loss(tf.reshape(real_x, (source_batch,2048,1)), cycled_x) + cycle_loss(tf.reshape(real_y, (target_batch,2048,1)), cycled_y)) + class_Cs_loss + class_Ct_loss
        gen_F_loss = tf.cast(disc_Ds_loss, tf.float64) + tf.cast(disc_Dt_loss, tf.float64) + cycle_loss_weight * (cycle_loss(tf.reshape(real_x, (source_batch,2048,1)), cycled_x) + cycle_loss(tf.reshape(real_y, (target_batch,2048,1)), cycled_y)) + class_Cs_loss + class_Ct_loss

    print(f'gen_loss: {gen_G_loss}, class_Ct_loss: {class_Ct_loss}, disc_Dt_loss: {disc_Dt_loss}')
    
    # 计算梯度
    gen_G_gradients = tape.gradient(gen_G_loss, generator_G.trainable_variables)
    gen_F_gradients = tape.gradient(gen_F_loss, generator_F.trainable_variables)
    disc_Ds_gradients = tape.gradient(disc_Ds_loss, discriminator_Ds.trainable_variables)
    disc_Dt_gradients = tape.gradient(disc_Dt_loss, discriminator_Dt.trainable_variables)
    class_Cs_gradients = tape.gradient(class_Cs_loss, classifier_Cs.trainable_variables)
    class_Ct_gradients = tape.gradient(class_Ct_loss, classifier_Ct.trainable_variables)

    # 更新权重参数
    generator_G_optimizer.apply_gradients(zip(gen_G_gradients, generator_G.trainable_variables))
    generator_F_optimizer.apply_gradients(zip(gen_F_gradients, generator_F.trainable_variables))
    discriminator_Ds_optimizer.apply_gradients(zip(disc_Ds_gradients, discriminator_Ds.trainable_variables))
    discriminator_Dt_optimizer.apply_gradients(zip(disc_Dt_gradients, discriminator_Dt.trainable_variables))
    classifier_Cs_optimizer.apply_gradients(zip(class_Cs_gradients, classifier_Cs.trainable_variables))
    classifier_Ct_optimizer.apply_gradients(zip(class_Ct_gradients, classifier_Ct.trainable_variables))



模型训练demo

In [None]:
num_class = 4
def get_train_batch(source_signals, target_signals, source_labels, target_labels, source_batch, target_batch):
    if (source_batch > len(source_labels)) or (target_batch > len(target_labels)):
        print("batch size is too large")
        return None, None, None, None, None
    selected_source_indices = np.random.choice(len(source_labels), size=source_batch, replace=False)
    selected_target_indices = np.random.choice(len(target_labels), size=target_batch, replace=False)

    selected_source_indices = np.sort(selected_source_indices)
    normal_count = np.sum(selected_source_indices < (len(source_labels) / num_class))     # 这里默认模拟生成的各类别样本数量相同，如果模拟样本数量不一样，则索引号应该小于总样本集中正常样本数量
    selected_source_signals = source_signals[selected_source_indices]
    selected_source_labels = source_labels[selected_source_indices]
    # print(selected_source_labels)
    selected_target_signals = target_signals[selected_target_indices]
    selected_target_labels = target_labels[selected_target_indices]

    return selected_source_signals, selected_source_labels, selected_target_signals, selected_target_labels, normal_count

def label_to_oneshot(label, num_classes):
    output = np.zeros((len(label), num_classes))
    for i in range(len(label)):
        index = label[i]
        output[i][int(index)] = 1
    return output

EPOCHS = 100
for epoch in range(EPOCHS):
    real_x, real_label_x, real_y, real_label_y, normal_count = get_train_batch(source_signals, target_signals, source_labels, target_labels, source_batch, target_batch)
    train_on_batch(real_x, real_y, label_to_oneshot(real_label_x, 4), label_to_oneshot(real_label_y, 4), normal_count)
    print(f'Epoch {epoch + 1}/{EPOCHS} completed.')

测试用例
(整体流程，只是一个能跑的例子，试了一下，效果很差，不过模型基本上是对的，样本和训练需要增加)
根据真实和模拟样本数量之间的比值，调整Ct损失函数中uncorresponded_weight的数值，范围（0，1），太大会过拟合，模拟样本数较少的话，可以适当调大

In [None]:
for test_signal in source_signals:
    test_signal = test_signal.reshape(1, 2048, 1)
    pred_label = classifier_Ct(generator_G(test_signal))
    print(pred_label)