<a href="https://colab.research.google.com/github/funway/nid-imbalance-study/blob/main/imbalance%20processing/CGAN_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 使用 ROS+CGAN 对训练集进行过采样
🚀 NYIT 880 | 🧑🏻‍💻 funway




## Modules import & Globals setup

In [1]:
### Modules ###
from pathlib import Path
from datetime import datetime
from collections import Counter

import os, sys
import pandas as pd
import numpy as np
import tensorflow as tf

## mount google drive
from google.colab import drive
drive.mount('/content/drive')


### Globals ###
## 数据文件目录
dataset = 'cse-cic-ids2018'
project_folder = Path('/content/drive/MyDrive/NYIT/880')
preprocessed_folder = project_folder / 'data/preprocessed'
scaled_folder = preprocessed_folder / 'scaled'
splits_folder = preprocessed_folder / 'splits'
balanced_folder = project_folder / 'data/balanced'

## Label 列的所有可能值(有序)
unique_labels = ['Benign', 'Bot', 'Brute Force -Web', 'Brute Force -XSS', 'DDOS attack-HOIC', 'DDOS attack-LOIC-UDP', 'DDoS attacks-LOIC-HTTP', 'DoS attacks-GoldenEye', 'DoS attacks-Hulk', 'DoS attacks-SlowHTTPTest', 'DoS attacks-Slowloris', 'FTP-BruteForce', 'Infilteration', 'SQL Injection', 'SSH-Bruteforce']
label_mapping = {label: idx for idx, label in enumerate(unique_labels)}
print(f"[{datetime.now().strftime('%x %X')}] 🏷️ Label mapping: {label_mapping}")

# 定义极少数类. [13, 3, 2] 属于极少数, [5, 10, 7] 属于少数
minority_labels = [13, 3, 2, 5, 10, 7]

### 全局随机数种子 ###
np.random.seed(42)
tf.random.set_seed(42)
op_seed = 42

### Utilities ###
# 导入 utility.ipynb 模块
%run /content/drive/MyDrive/NYIT/880/code/utils/utility.ipynb

Mounted at /content/drive
[06/25/25 06:14:22] 🏷️ Label mapping: {'Benign': 0, 'Bot': 1, 'Brute Force -Web': 2, 'Brute Force -XSS': 3, 'DDOS attack-HOIC': 4, 'DDOS attack-LOIC-UDP': 5, 'DDoS attacks-LOIC-HTTP': 6, 'DoS attacks-GoldenEye': 7, 'DoS attacks-Hulk': 8, 'DoS attacks-SlowHTTPTest': 9, 'DoS attacks-Slowloris': 10, 'FTP-BruteForce': 11, 'Infilteration': 12, 'SQL Injection': 13, 'SSH-Bruteforce': 14}
导入 utility.ipynb 模块. version 1.0.1


## 可调参数

In [2]:
# 是否强制重新训练 CGAN
retrain = False

# 选择 scaling 方法. 可选[standard, minmax, robust, l1pminmax]
scaling_method = 'minmax'

# 过采样方法. 可以选[cgan-a, cgan-m, cgan-b, ros1+cgan-a, ros2+cgan-b, ...]
oversampling_method = 'cgan-m'

## CGAN 参数 ##
noise_dim = 128     # 噪声维度(给生成器的)
epochs = 500        # 多少轮训练
batch_size = 512    # 每轮训练中划分 batch 的大小
buffer_size = 200_000  # tf.data 随机切片数据时的缓存大小

gen_embed_dim = 128                # 生成器嵌入层维度
gen_hidden_dims = [128, 256, 512] # 生成器隐藏层
gen_learning_rate = 3e-4          # 生成器学习率

disc_embed_dim = 64           # 判别器嵌入层维度
disc_hidden_dims = [256, 128] # 判别器隐藏层
disc_learning_rate = 1e-4     # 判别器学习率

# 生成过采样文件 (空数组就表示不生成 过采样文件)
resample_outputs = []

# 在生成过采样文件之前, 是否先用 CGAN 判别器删除质量差的样本
# cgan_filter_strategy = 0 表示不使用 CGAN 进行删除
cgan_filter_strategy = 0 # 去 utility 里的 filter_schemes 取值
if cgan_filter_strategy and 'cgan-m' in oversampling_method:
    raise Exception('不要使用 cgan-m 对多数类进行欠采样！因为 cgan-m 是仅针对 minority 进行训练的。')

## 加载训练集

In [3]:
X_file = splits_folder / f'train_X_{scaling_method}.npy'
y_file = splits_folder / f'train_y.npy'

# 加载训练集文件
X = np.load(X_file)
y = np.load(y_file)

print(f'[{now()}] {X_file.name} shape: {X.shape}, {y_file.name} shape: {y.shape}')
print(f'Labels: { {int(k): v for k, v in sorted(Counter(y).items())} }\n')

[06/24/25 23:14:38 PDT] train_X_minmax.npy shape: (3797547, 70), train_y.npy shape: (3797547,)
Labels: {0: 1600000, 1: 228953, 2: 489, 3: 184, 4: 548809, 5: 1384, 6: 460953, 7: 33206, 8: 369530, 9: 111912, 10: 8792, 11: 154683, 12: 128511, 13: 70, 14: 150071}



## CGAN 定义





### 定义辅助函数与类

In [4]:
from imblearn.under_sampling import RandomUnderSampler

def select_cgan_subset(X, y, mode, rus_target=100_000):
    """
    Selects a subset of the data for cGAN training based on the specified mode.
    """
    if mode == 'a':
        # 返回全量数据
        return X, y

    elif mode == 'm':
        # 返回极少数类数据
        mask = np.isin(y, minority_labels)
        return X[mask], y[mask]

    elif mode == 'f':
        mask = np.isin(y, minority_labels + [9, 12])
        return X[mask], y[mask]

    elif mode == 'b':
        # 返回平衡后的训练数据 (将多数类都下采样到 100_000)
        strategy = {label: rus_target for label in np.unique(y) if label not in minority_labels}
        rus = RandomUnderSampler(sampling_strategy=strategy, random_state=op_seed)
        return rus.fit_resample(X, y)

    else:
        raise ValueError(f"Invalid mode: {mode}")

In [5]:
# Callback: adjust learning rates based on delta_loss
class MyReduceLR(tf.keras.callbacks.Callback):
    """
    如果 delta_loss 连续 {patience} 次大于 {delta_threshold} 或者小于 -{delta_threshold}，则调整学习率。
    """
    def __init__(self, delta_threshold=1.0, patience=5, factor=0.5, min_lr=1e-8):
        super().__init__()
        self.delta_threshold = delta_threshold
        self.patience = patience
        self.factor = factor
        self.min_lr = min_lr

        self.d_bad = 0
        self.g_bad = 0

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        delta = logs.get('delta_loss')
        if delta is None:
            return

        # 如果 d_loss 比 g_loss 小，并且相差大于 delta_threshold
        if delta <= -self.delta_threshold:
            self.d_bad += 1
        else:
            self.d_bad = 0

        # 如果 d_loss 比 g_loss 大，并且相差大于 delta_threshold
        if delta >= self.delta_threshold:
            self.g_bad += 1
        else:
            self.g_bad = 0

        # Apply LR adjustments
        if self.d_bad >= self.patience:
            old_lr = float(self.model.d_optimizer.learning_rate)
            new_lr = max(old_lr * self.factor, self.min_lr)
            if new_lr < old_lr:
              self.model.d_optimizer.learning_rate.assign(new_lr)
              self.d_bad = 0
              print(f"\n[{now()}] Adjust discriminator\'s learning rate to {new_lr}")
        if self.g_bad >= self.patience:
            old_lr = float(self.model.g_optimizer.learning_rate)
            new_lr = max(old_lr * self.factor, self.min_lr)
            if new_lr < old_lr:
              self.model.g_optimizer.learning_rate.assign(new_lr)
              self.g_bad = 0
              print(f"\n[{now()}] Adjust generator\'s learning rate to {new_lr}")


# Callback: early stopping when balanced
class MyEarlyStopping(tf.keras.callbacks.Callback):
    """
    如果 CGAN 的 |delta_loss| 连续 {patience} 次在 {balance_tolerance} 之间，表示 CGAN 已经收敛，停止训练。
    """
    def __init__(self, balance_tolerance=0.1, patience=10):
        super().__init__()
        self.balance_tolerance = balance_tolerance
        self.patience = patience
        self.balance = 0

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        delta = logs.get('delta_loss')
        # print(f"[{datetime.now().strftime('%x %X')}] {logs}", file=sys.stderr)
        # d = self.model.disc_loss_metric.result().numpy()
        # g = self.model.gen_loss_metric.result().numpy()
        # dl = self.model.delta_loss_metric.result().numpy()
        # print(f"[{datetime.now().strftime('%x %X')}] {d, g, dl}", file=sys.stderr)

        if delta is None:
            return

        # Check balance condition
        if abs(delta) <= self.balance_tolerance:
            self.balance += 1
        else:
            self.balance = 0

        if self.balance >= self.patience:
            self.model.stop_training = True
            print(f"\n[{datetime.now().strftime('%x %X')}] Early stopping triggered. CGAN is balanced.")


class BalanceCallback(tf.keras.callbacks.Callback):
    def __init__(self, delta_threshold=0.8, extra_steps=2):
        super().__init__()
        self.delta_threshold = delta_threshold
        self.extra_steps = extra_steps

    def on_train_batch_end(self, batch, logs=None):
        logs = logs or {}
        d_loss = logs.get('discriminator_loss')
        g_loss = logs.get('generator_loss')

        if d_loss is not None and g_loss is not None:
            delta_loss = d_loss - g_loss

            if delta_loss < -self.delta_threshold:
                # D-loss 太低，需要暂停
                self.model.pause_D = True
                self.model.extra_steps_G = self.extra_steps
                # tf.print('暂停 D')
            elif delta_loss > self.delta_threshold:
                # G-loss 太低，需要暂停
                self.model.pause_G = True
                self.model.extra_steps_D = self.extra_steps
                # tf.print('暂停 G')
            else:
                self.model.pause_D = False
                self.model.pause_G = False
                self.model.extra_steps_D = 0
                self.model.extra_steps_G = 0

class SaveOnInterval(tf.keras.callbacks.Callback):
    """
    每 Interval 轮的时候，保存一次模型
    """
    def __init__(self, interval, dest_gen_file, dest_dis_file):
        super().__init__()
        self.interval = interval
        self.dest_gen_file = dest_gen_file
        self.dest_dis_file = dest_dis_file

    def on_epoch_end(self, epoch, logs=None):
        # epoch 从 0 开始算，故 +1
        epoch_num = epoch + 1
        max_epochs = self.params['epochs']

        if epoch_num % self.interval == 0 and epoch_num < max_epochs:
            gen_file = self.dest_gen_file.with_name(self.dest_gen_file.name.replace(f'e{max_epochs},', f'e{epoch_num},'))
            dis_file = self.dest_dis_file.with_name(self.dest_dis_file.name.replace(f'e{max_epochs},', f'e{epoch_num},'))

            self.model.generator.save(gen_file)
            self.model.discriminator.save(dis_file)

            print(f'\n[{now()}] 💾 Saved model at epoch {epoch_num} to: {gen_file}')

### 定义 cGAN

In [6]:
from tensorflow.keras import layers, Model

# ---------------- Generator Definition ----------------
def build_generator(num_classes: int,
                    feature_dim: int,
                    noise_dim: int = 128,
                    embed_dim: int = 64,
                    hidden_dims: list = [128, 256, 512]
                    ) -> Model:

    print(f"[{datetime.now().strftime('%x %X')}] Building generator with: num_classes={num_classes}, feature_dim={feature_dim}, noise_dim={noise_dim}, embed_dim={embed_dim}, hidden_dims={hidden_dims}")

    # 定义输入层
    noise_input = layers.Input(shape=(noise_dim,), name='noise_input')         # 噪声输入，输入一个 noise_dim 维的向量作为噪声
    label_input = layers.Input(shape=(1,), dtype='int32', name='label_input')  # 类别输入, 输入一个整数作为类别标签

    # 类别嵌入层
    label_embedding = layers.Embedding(input_dim=num_classes, output_dim=embed_dim)(label_input)  # 将类别标签映射为嵌入向量(embed_dim 维)
    label_embedding = layers.Flatten()(label_embedding)  # 将嵌入向量展平, (1, embed_dim) -> (embed_dim,)

    # 合并 噪声向量 和 类别嵌入向量
    # 合并后的向量维度为 (noise_dim + embed_dim,)
    x = layers.Concatenate()([noise_input, label_embedding])

    # 隐藏层 (激活函数为 relu)
    for dim in hidden_dims:
        x = layers.Dense(dim, activation='relu')(x)
        x = layers.BatchNormalization()(x)

    # 输出层的激活函数要跟数据预处理的 scaling 方法适配
    activation = 'sigmoid' if scaling_method in ['minmax', 'l1pminmax'] else 'linear'

    # 输出层 (输出 feature_dim 维的特征向量)
    output = layers.Dense(feature_dim, activation=activation, name='generated_data')(x)

    # 返回生成器模型
    # 该模型接受两个输入，一个是噪声向量，一个是类别标签. 输出生成的特征向量
    return Model([noise_input, label_input], output, name="Generator")


# ---------------- Discriminator Definition ----------------
def build_discriminator(num_classes: int, feature_dim: int,
                        embed_dim: int = 64,
                        hidden_dims: list = [256, 128]
                        ) -> Model:

    print(f"[{datetime.now().strftime('%x %X')}] Building discriminator with: num_classes={num_classes}, feature_dim={feature_dim}, embed_dim={embed_dim}, hidden_dims={hidden_dims}")

    # 定义输入层
    data_input = layers.Input(shape=(feature_dim,), name='data_input')  # 输入一个特征向量 (feature_dim 维)
    label_input = layers.Input(shape=(1,), dtype='int32', name='label_input')  # 输入一个类别标签 (整数)

    # 类别嵌入层
    label_embedding = layers.Embedding(input_dim=num_classes, output_dim=embed_dim)(label_input) # 将类别标签映射为嵌入向量(embed_dim 维)
    label_embedding = layers.Flatten()(label_embedding)  # 将嵌入向量展平, (1, embed_dim) -> (embed_dim,)

    # 合并 特征向量 和 类别嵌入向量
    # 合并后的向量维度为 (feature_dim + embed_dim,)
    x = layers.Concatenate()([data_input, label_embedding])

    # 判别器网络结构
    for dim in hidden_dims:
        x = layers.Dense(dim)(x)
        x = layers.LeakyReLU(0.2)(x)
        x = layers.Dropout(0.4)(x)

    # 输出层 (sigmoid 激活函数. 输出为 [0, 1] 区间, 接近0: 判断为假数据, 接近1: 判断为真实数据)
    output = layers.Dense(1, activation='sigmoid', name='validity')(x)

    # 返回判别器模型
    # 该模型接受两个输入，一个是特征向量，一个是类别标签. 输出判别结果(真/假 的概率)
    return Model([data_input, label_input], output, name="Discriminator")


# ---------------- CGAN Model with Custom train_step ----------------
class ConditionalGAN(Model):
    def __init__(self,
                 generator,
                 discriminator,
                 seen_labels: list = [],
                 **kwargs):
        super(ConditionalGAN, self).__init__(**kwargs)
        self.generator = generator
        self.discriminator = discriminator
        self.seen_labels = tf.constant(seen_labels, dtype=tf.int32)

        self.noise_dim = generator.input_shape[0][1]

        self.pause_D = False
        self.pause_G = False
        self.extra_steps_D = 0
        self.extra_steps_G = 0

        self.gen_loss_metric = tf.keras.metrics.Mean(name="generator_loss")
        self.disc_loss_metric = tf.keras.metrics.Mean(name="discriminator_loss")
        self.delta_loss_metric = tf.keras.metrics.Mean(name="delta_loss")
        pass

    @property
    def metrics(self):
        return [self.disc_loss_metric,
                self.gen_loss_metric,
                self.delta_loss_metric,
                ]

    def compile(self, g_optimizer, d_optimizer, loss_fn):
        super().compile()
        self.g_optimizer = g_optimizer
        self.d_optimizer = d_optimizer
        self.loss_fn = loss_fn

    def _train_D(self, real_x, real_y):
        real_size = tf.shape(real_x)[0]

        ## 训练判别器 ##
        # 生成一组 (real_size,1) 的假标签
        # 方法1: 从 seen_labels 中随机取值 (优点是完全随机，对少数类别友好。缺点是更难收敛)
        idx = tf.random.uniform((real_size,), maxval=tf.shape(self.seen_labels)[0], dtype=tf.int32)
        fake_y = tf.expand_dims(tf.gather(self.seen_labels, idx), axis=1)  # shape (real_size,1)
        # 方法2: 由 real_y 打乱顺序获得 (优点是简单，保持原类别比例，有助于收敛)
        # fake_y = tf.random.shuffle(real_y)

        with tf.GradientTape() as tape_d:
            self.discriminator.trainable = True
            fake_x = self.generator([tf.random.normal((real_size, self.noise_dim)), fake_y], training=False)

            real_validity = self.discriminator([real_x, real_y], training=True)
            fake_validity = self.discriminator([fake_x, fake_y], training=True)

            d_loss_real = self.loss_fn(tf.ones_like(real_validity), real_validity)  # 对真实样本的损失
            d_loss_fake = self.loss_fn(tf.zeros_like(fake_validity), fake_validity)  # 对生成样本的损失
            d_loss = 0.5 * (d_loss_real + d_loss_fake)  # 平均损失

        # 计算梯度，更新判别器参数
        grads_d = tape_d.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients( zip(grads_d, self.discriminator.trainable_weights) )

        self.disc_loss_metric.update_state(d_loss)
        pass

    def _train_G(self, real_x, real_y):
        real_size = tf.shape(real_x)[0]

        ## 训练生成器 ##
        idx = tf.random.uniform((real_size,), maxval=tf.shape(self.seen_labels)[0], dtype=tf.int32)
        fake_y = tf.expand_dims(tf.gather(self.seen_labels, idx), axis=1)  # shape (real_size,1)
        # fake_y = tf.random.shuffle(real_y)

        with tf.GradientTape() as tape_g:
            self.discriminator.trainable = False
            # 要在生成器的 tap 内部调用生成器
            fake_x = self.generator([tf.random.normal((real_size, self.noise_dim)), fake_y], training=True)

            fake_validity = self.discriminator([fake_x, fake_y], training=False)
            g_loss = self.loss_fn(tf.ones_like(fake_validity), fake_validity)

        # 计算梯度，更新生成器参数
        grads_g = tape_g.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients( zip(grads_g, self.generator.trainable_weights) )
        self.discriminator.trainable = True

        self.gen_loss_metric.update_state(g_loss)
        pass


    # fit 方法会自动调用 train_step, 每次传递 batch_size 的 data 给它
    def train_step(self, data):
        real_x, real_y = data

        if not self.pause_D:
            for _ in range(1 + self.extra_steps_D):
                self._train_D(real_x, real_y)

        if not self.pause_G:
            for _ in range(1 + self.extra_steps_G):
                self._train_G(real_x, real_y)

        batch_delta = self.disc_loss_metric.result() - self.gen_loss_metric.result()
        self.delta_loss_metric.update_state(batch_delta)

        return {
            "discriminator_loss": self.disc_loss_metric.result(),
            "generator_loss": self.gen_loss_metric.result(),
            "delta_loss": self.delta_loss_metric.result(),
            # 'g_loss': g_loss,
            # 'd_loss': d_loss,
            # 'd_loss_real': d_loss_real,
            # 'd_loss_fake': d_loss_fake,
            }


## 1️⃣ 利用 ROS 提前补充极少数类样本
- 先用 ROS 随机复制的方式，将极少数类样本扩展到可接受的程度后再进行 oversampling


In [7]:
from imblearn.over_sampling import RandomOverSampler

oversample_to = {}
# 判断 oversampling_method 字符串开头是否为 ros
if oversampling_method.startswith('ros'):
    ros_scheme = int(oversampling_method[3])
    oversample_to = ros_schemes[ros_scheme]
    print(f'[{now()}] Applying ROS oversampling to: {oversample_to}')

    oversampler = RandomOverSampler(sampling_strategy=oversample_to, random_state=op_seed)
    X, y = oversampler.fit_resample(X, y)

    print(f'[{now()}] After ROS oversampling:')
    print(f'  X.shape: {X.shape}, y.shape: {y.shape}')
    print(f'  Labels: { {int(k): v for k, v in sorted(Counter(y).items())} }\n')
else:
    print(f'[{now()}] No need to apply ROS oversampling.')

[06/24/25 23:14:40 PDT] No need to apply ROS oversampling.


## 2️⃣ 初始化 并 训练 cGAN

In [None]:
feature_dim = X.shape[1]
num_classes = len(np.unique(y))  # 统一使用全尺寸的 num_classes, 即使是只对少数类训练 CGAN. 这样就不用做额外的标签映射。

# 定义模型保存目录与文件名
save_dir = balanced_folder / 'models' / scaling_method
os.makedirs(save_dir, exist_ok=True)
generator_file = save_dir / (
    f'{oversampling_method}('
    f'n{noise_dim},f{feature_dim},c{num_classes},e{epochs},b{batch_size},'
    f'gen[{gen_embed_dim},{gen_hidden_dims},{gen_learning_rate}],'
    f'dis[{disc_embed_dim},{disc_hidden_dims},{disc_learning_rate}]'
    ')_generator.keras'
)
discriminator_file = save_dir / (generator_file.stem[:-10] + '_discriminator.keras')
cgan_file = save_dir / (generator_file.stem[:-10] + '_cgan.keras')


if generator_file.exists() and not retrain:
    # 如果已经存在预训练的生成器模型，则直接加载
    print(f"[{now()}] 📡 Loading pre-trained generator from {generator_file}")
    generator = tf.keras.models.load_model(generator_file)

    print(f"[{now()}] 📡 Loading pre-trained discriminator from {discriminator_file}")
    discriminator = tf.keras.models.load_model(discriminator_file)
else:
    print(f"[{now()}] 🚀 Training cGAN [{generator_file.stem}]...")

    ## 1️⃣ 选择要用来训练的样本
    selected_X, selected_y = select_cgan_subset(X, y, oversampling_method[-1])
    print(f"[{now()}] Selected X.shape: {selected_X.shape}, y.shape: {selected_y.shape}")
    print(f"[{now()}] Selected labels: { {int(k): v for k, v in sorted(Counter(selected_y).items())} }")

    train_dataset = tf.data.Dataset.from_tensor_slices((selected_X, selected_y))
    train_dataset = train_dataset.shuffle(buffer_size).batch(batch_size).prefetch(tf.data.AUTOTUNE)

    ## 2️⃣ 初始化 cGAN
    generator = build_generator(num_classes, feature_dim, noise_dim=noise_dim, embed_dim=gen_embed_dim, hidden_dims=gen_hidden_dims)
    discriminator = build_discriminator(num_classes, feature_dim, embed_dim=disc_embed_dim, hidden_dims=disc_hidden_dims)
    cgan = ConditionalGAN(generator, discriminator, seen_labels=np.unique(selected_y))
    cgan.compile(
        g_optimizer=tf.keras.optimizers.Adam(learning_rate=gen_learning_rate, beta_1=0.5),
        d_optimizer=tf.keras.optimizers.Adam(learning_rate=disc_learning_rate, beta_1=0.5),
        loss_fn=tf.keras.losses.BinaryCrossentropy(from_logits=False)
        )

    ## 3️⃣ 训练 cGAN
    cgan.fit(train_dataset, epochs=epochs,
             callbacks=[
                 MyReduceLR(delta_threshold=0.8, patience=5, factor=0.5),
                #  BalanceCallback(delta_threshold=0.3, extra_steps=3),
                 SaveOnInterval(interval=100, dest_gen_file=generator_file, dest_dis_file=discriminator_file),
                 MyEarlyStopping(),
                 ]
             )

    ## 4️⃣ 保存模型
    generator.save(generator_file)
    discriminator.save(discriminator_file)
    cgan.build(input_shape=[(None, noise_dim), (None, 1)]) # cgan 要 build 后再保存，不然会有告警
    cgan.save(cgan_file)
    print(f"[{now()}] ✅ Saved generator model to {generator_file}")  # 其实只需要保存 generator 就够了

    # 清理临时资源
    del train_dataset
    del selected_X
    del selected_y

[06/24/25 23:14:42 PDT] 🚀 Training cGAN [cgan-m(n128,f70,c15,e500,b512,gen[128,[128, 256, 512],0.0003],dis[64,[256, 128],0.0001])_generator]...
[06/24/25 23:14:42 PDT] Selected X.shape: (44125, 70), y.shape: (44125,)
[06/24/25 23:14:42 PDT] Selected labels: {2: 489, 3: 184, 5: 1384, 7: 33206, 10: 8792, 13: 70}
[06/25/25 06:14:44] Building generator with: num_classes=15, feature_dim=70, noise_dim=128, embed_dim=128, hidden_dims=[128, 256, 512]
[06/25/25 06:14:45] Building discriminator with: num_classes=15, feature_dim=70, embed_dim=64, hidden_dims=[256, 128]
Epoch 1/500
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 71ms/step - delta_loss: -0.4150 - discriminator_loss: 0.5059 - generator_loss: 1.1715
Epoch 2/500
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - delta_loss: -2.1886 - discriminator_loss: 0.1846 - generator_loss: 2.6399
Epoch 3/500
[1m87/87[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - delta_loss: -3.1044 - di

## 3️⃣ 使用 cGAN 生成新样本

In [None]:
def generate_samples(generator: tf.keras.Model, target_class: int, num_samples: int):
    """
    Generates samples using the generator for a specific target class.

    Args:
        generator (tensorflow.keras.Model): The generator model.
        target_class (int): The target class for which to generate samples.
        num_samples (int): The number of samples to generate.

    Returns:
        numpy.ndarray: Generated samples as a NumPy array.
    """
    noise_dim = generator.input_shape[0][1]

    # 随机生成一组噪声向量 shape=(num_samples, noise_dim)
    noise = np.random.normal(0, 1, size=(num_samples, noise_dim))

    # 随机生成一组类别标签 shape=(num_samples, 1), 全部为 target_class
    labels = np.full((num_samples, 1), fill_value=target_class, dtype=np.int32)

    # 使用生成器生成数据
    generated_data = generator.predict([noise, labels], verbose=0)
    return generated_data


def cgan_undersample(cgan_discriminator: tf.keras.Model, sampling_strategy: dict, X: np.ndarray, y: np.ndarray):
    """
    使用 CGAN 判别器对目标数据集进行欠采样，删除评分低的数据
    """
    print(f'[{now()}] 📉 CGAN Undersampling ...')
    print(f'  original X.shaep: {X.shape}')
    print(f'  original labels_counts: {get_label_counts(y)}')
    print(f'  undersample to: {sampling_strategy}')

    keep_idxs = []

    for cls, target_n in sampling_strategy.items():
        idxs = np.where(y == cls)[0]

        # nothing to drop if already <= target
        if len(idxs) <= target_n:
            keep_idxs.extend(idxs.tolist())
            print(f'[{now()}] Skipping class [{cls}]: {len(idxs)} ≤ {target_n}')
            continue

        # Score all samples of this cls
        X_cls = X[idxs]
        y_cls = y[idxs].reshape(-1, 1)
        scores = cgan_discriminator([X_cls, y_cls], training=False)
        scores = tf.reshape(scores, [-1]).numpy()

        # 按照判别器评分降序排列，取前 n 个保留(保留评分高的)
        top_idxs = idxs[np.argsort(scores)[::-1][:target_n]]
        # 按照判别器评分升序排列，取前 n 个保留(保留评分低的)
        # top_idxs = idxs[np.argsort(scores)[:target_n]]

        keep_idxs.extend(top_idxs.tolist())
        print(f'[{now()}] Dropping {len(idxs) - target_n} samples for class [{cls}]: {len(idxs)} -> {target_n}')

    # For any classes not in sampling_strategy, keep all
    all_classes = set(np.unique(y))
    leftover = all_classes - set(sampling_strategy.keys())
    for cls in leftover:
        keep_idxs.extend(np.where(y == cls)[0].tolist())

    # produce final undersampled arrays
    keep_idxs = np.sort(keep_idxs)
    X_res = X[keep_idxs]
    y_res = y[keep_idxs]

    print(f'[{now()}] 📉 After CGAN Undersampling:')
    print(f'  X_res.shape: {X_res.shape}')
    print(f'  Labels: {get_label_counts(y_res)}')

    return X_res, y_res

def cgan_oversample(cgan_generator: tf.keras.Model, sampling_strategy: dict, X: np.ndarray, y: np.ndarray):
    current_counts = get_label_counts(y)

    print(f'[{now()}] 📈 CGAN Oversampling ...')
    print(f'  original X.shaep: {X.shape}')
    print(f'  original labels_counts: {current_counts}')
    print(f'  oversample to: {sampling_strategy}')

    all_X = [X]
    all_y = [y]

    for cls, desired_n in sampling_strategy.items():
        current_n = current_counts.get(cls, 0)
        n_to_gen = desired_n - current_n
        if n_to_gen > 0:
            print(f'[{now()}] Generating {n_to_gen} samples for class [{cls}]: {current_n} -> {desired_n}')
            gen_samples = generate_samples(cgan_generator, cls, n_to_gen)
            all_X.append(gen_samples)
            all_y.append(np.full(n_to_gen, cls, dtype=np.int32))
        else:
            print(f'[{now()}] Skipping class [{cls}]: {current_n} ≥ {desired_n}')

    X_res = np.concatenate(all_X)
    y_res = np.concatenate(all_y)

    print(f'[{now()}] 📈 After CGAN Oversampling:')
    print(f'  X_res.shape: {X_res.shape}')
    print(f'  Labels: {get_label_counts(y_res)}')

    return X_res, y_res


In [None]:
if len(resample_outputs) == 0:
    print(f'[{now()}] 不生成过采样的数据文件')

for resample_scheme in resample_outputs:
    print(f'[{now()}] 🚀 resample_scheme: {resample_scheme}')

    # CGAN 欠采样(如果需要的话)
    if cgan_filter_strategy:
        print(f'[{now()}] 🟡 Apply CGAN Undersampling.')
        X_filtered, y_filtered = cgan_undersample(discriminator, cgan_filter_schemes[cgan_filter_strategy], X, y)
    else:
        X_filtered, y_filtered = X, y

    # CGAN 过采样
    print(f'[{now()}] 🟢 Apply CGAN Oversampling.')
    resample_to = resample_schemes[resample_scheme]
    X_resampled, y_resampled = cgan_oversample(generator, resample_to, X_filtered, y_filtered)

    # 保存文件
    filename = generator_file.stem[:-10] + (f'f{cgan_filter_strategy}.npy' if cgan_filter_strategy else '.npy')
    X_resampled_file = balanced_folder / f'train_X_{scaling_method}_s{resample_scheme}_{filename}'
    y_resampled_file = balanced_folder / f'train_y_{scaling_method}_s{resample_scheme}_{filename}'
    np.save(X_resampled_file, X_resampled)
    np.save(y_resampled_file, y_resampled)
    print(f"[{now()}] 💾 Saved resampled data to {X_resampled_file} & {y_resampled_file.name}\n")

In [None]:
from google.colab import runtime
print(f'[{now()}] ⛔ 运行结束. shutdown now...')
runtime.unassign()