In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model, Input
from itertools import combinations
from tensorflow.keras.callbacks import Callback
import tensorflow_probability as tfp
import os


In [None]:
config = {
    'epochs': 10,
    'batch_size': 64,
    'learning_rate': 0.001,
    'activation': 'gelu',
    'dropout_rate': 0.25,
    'val_size': 0.1,
}

# Data Loader

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def mixup_tf(x, y, alpha=1):
    lam = tfp.distributions.Beta(alpha, alpha).sample()
    index = tf.random.shuffle(tf.range(tf.shape(x)[0]))
    x_mix = lam * x + (1 - lam) * tf.gather(x, index)
    y_mix = lam * y + (1 - lam) * tf.gather(y, index)
    return x_mix, y_mix

In [None]:
file_dir = '/content/drive/MyDrive/ai_lab_wc/4-10-c8'

# training data
# input
file_path = os.path.join(file_dir, 'train_Covariance.mat')
mat = sio.loadmat(file_path)
train_input = mat['Covariance']
train_input = np.float32(train_input)
# print("train_input shape:", train_input.shape)

# output
file_path = os.path.join(file_dir, 'train_Label.mat')
mat = sio.loadmat(file_path)
train_output_old = mat['Label']
train_output_old = np.float32(train_output_old)
train_output = train_output_old

dataset = tf.data.Dataset.from_tensor_slices((train_input, train_output))
# print orginal number of samples in train_dataset
num_samples_train_org = sum(1 for _ in dataset)
print("Original number of samples in train_dataset:", num_samples_train_org)

# train_dataset = (
#     dataset
#     .shuffle(buffer_size=1000)
#     .batch(config['batch_size'])
#     .map(mixup_tf, num_parallel_calls=tf.data.AUTOTUNE)
#     .prefetch(tf.data.AUTOTUNE)
# )

# Mixup 的資料集（先 shuffle 才 batch 才 mix）
mix_dataset = (
    dataset
    .shuffle(num_samples_train_org)  # shuffle before batching
    .batch(config['batch_size'])
    .map(mixup_tf, num_parallel_calls=tf.data.AUTOTUNE)
    .unbatch()  # 重要！把 mixup 的 batch 還原，才能與原始資料合併
)
# # 輸出 mix_dataset 的label example
# for batch in mix_dataset.take(5):
#     mix_input, mix_output = batch
#     print("Mixup output:", mix_output)

# validation data
val_dataset = mix_dataset.take(int(num_samples_train_org * config['val_size']))
mix_dataset = mix_dataset.skip(int(num_samples_train_org * config['val_size']))

val_dataset = (
    val_dataset
    .batch(config['batch_size'])
    .prefetch(tf.data.AUTOTUNE)
)

# 合併原始與 mix 資料
combined_dataset = dataset.concatenate(mix_dataset).shuffle(num_samples_train_org * 2)

# for batch in combined_dataset.take(10):
#     input_example, output_example = batch
#     print("Combined output example:", output_example)



# 將 validation 資料從原始資料中移除
# train_dataset = combined_dataset.skip(int(num_samples_train_org * config['val_size']))

train_dataset = (
    combined_dataset
    .batch(config['batch_size'])
    .prefetch(tf.data.AUTOTUNE)
)

# print("train_dataset.shape:", train_dataset.element_spec)
# print number of samples in train_dataset
total_samples_train = sum(batch[0].shape[0] for batch in train_dataset)
print("Total number of training samples (after batching):", total_samples_train)

# print("val_dataset shape:", val_dataset.element_spec)
# print number of samples in val_dataset
total_samples_val = sum(batch[0].shape[0] for batch in val_dataset)
print("Total number of validation samples (after batching):", total_samples_val)


# testing data
# input
file_path = os.path.join(file_dir, 'test_Covariance.mat')
mat = sio.loadmat(file_path)
test_input = mat['Covariance']
test_input = np.float32(test_input)
# print("test_input shape:", test_input.shape)

# output
file_path = os.path.join(file_dir, 'test_Label.mat')
mat = sio.loadmat(file_path)
test_output_old = mat['Label']
test_output_old = np.float32(test_output_old)
test_output = test_output_old

dataset = tf.data.Dataset.from_tensor_slices((test_input, test_output))
test_dataset = dataset.batch(config['batch_size']).prefetch(tf.data.AUTOTUNE)

# print("test_output shape:", test_output.shape)
# print("test_dataset.shape:", test_dataset.element_spec)
# print number of samples in test_dataset
total_samples_test = sum(batch[0].shape[0] for batch in test_dataset)
print("Total number of testing samples (after batching):", total_samples_test)

Original number of samples in train_dataset: 57344
Total number of training samples (after batching): 108954
Total number of validation samples (after batching): 5734
Total number of testing samples (after batching): 14336


# model

In [None]:
def residual_block(x, filters, use_pooling=False):
    shortcut = x

    x = layers.Conv2D(filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(config['activation'])(x)
    x = layers.Dropout(config['dropout_rate'])(x)

    x = layers.Conv2D(filters, (3, 3), padding='same')(x)
    x = layers.BatchNormalization()(x)

    # 如果shortcut維度不一樣，需要用1x1 conv調整
    if shortcut.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, (1, 1), padding='same')(shortcut)
        shortcut = layers.BatchNormalization()(shortcut)

    x = layers.Add()([x, shortcut])
    x = layers.Activation(config['activation'])(x)
    x = layers.Dropout(config['dropout_rate'])(x)

    if use_pooling:
        x = layers.MaxPooling2D((2, 2))(x)

    return x


In [None]:
# input_layer = Input(shape=(train_input.shape[1], train_input.shape[2], 2))
# x = residual_block(input_layer, 64)
# x = residual_block(x, 128, use_pooling=True)
# # x = residual_block(x, 128)
# x = residual_block(x, 512, use_pooling=True)
# # x = residual_block(x, 512)
# x = residual_block(x, 1024, use_pooling=True)


# x = layers.Flatten()(x)
# x = layers.Dense(1024)(x)
# x = layers.BatchNormalization()(x)
# x = layers.Activation(config['activation'])(x)
# x = layers.Dropout(config['dropout_rate'])(x)

# x = layers.Dense(2048)(x)
# x = layers.BatchNormalization()(x)
# x = layers.Activation(config['activation'])(x)
# x = layers.Dropout(config['dropout_rate'])(x)

# x = layers.Dense(1024)(x)
# x = layers.BatchNormalization()(x)
# x = layers.Activation(config['activation'])(x)
# x = layers.Dropout(config['dropout_rate'])(x)

# x = layers.Dense(train_output.shape[1])(x)
# output_layer = layers.Activation('sigmoid')(x)


# model = Model(inputs=input_layer, outputs=output_layer)
# model.summary()


In [None]:
model = keras.Sequential([
    keras.layers.Conv2D(128, (3, 3), input_shape=(train_input.shape[1], train_input.shape[2], 2)),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Conv2D(128, (3, 3), padding='same'),
    keras.layers.MaxPooling2D((2, 2)),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),

    keras.layers.Conv2D(256, (3, 3), padding='same'),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Conv2D(256, (3, 3), padding='same'),
    keras.layers.MaxPooling2D((2, 2)),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),

    keras.layers.Conv2D(512, (3, 3), padding='same'),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Conv2D(512, (3, 3), padding='same'),
    # keras.layers.MaxPooling2D((2, 2)),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),

    keras.layers.Conv2D(1024, (3, 3), padding='same'),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Conv2D(1024, (3, 3), padding='same'),
    keras.layers.MaxPooling2D((2, 2)),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),

    keras.layers.Conv2D(1024, (3, 3), padding='same'),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Conv2D(1024, (3, 3), padding='same'),
    # keras.layers.MaxPooling2D((2, 2)),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.GlobalAveragePooling2D(),

    # keras.layers.Flatten(),
    keras.layers.Dense(512),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Dense(1024),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Dense(128),
    keras.layers.BatchNormalization(),
    keras.layers.Activation(config['activation']),
    keras.layers.Dropout(config['dropout_rate']),
    keras.layers.Dense(train_output.shape[1]),
    keras.layers.Activation('sigmoid'),
])

model.summary()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [None]:
# model = keras.Sequential([
#     keras.layers.Conv2D(64, (3, 3), padding='same', input_shape=(train_input.shape[1], train_input.shape[2], 2)),
#     keras.layers.BatchNormalization(),
#     keras.layers.Activation(config['activation']),
#     keras.layers.MaxPooling2D((2, 2)),

#     keras.layers.Conv2D(128, (3, 3), padding='same'),
#     keras.layers.BatchNormalization(),
#     keras.layers.Activation(config['activation']),
#     keras.layers.MaxPooling2D((2, 2)),

#     keras.layers.Conv2D(256, (3, 3), padding='same'),
#     keras.layers.BatchNormalization(),
#     keras.layers.Activation(config['activation']),
#     keras.layers.GlobalAveragePooling2D(),  # 重點改為 GAP

#     keras.layers.Dense(128),
#     keras.layers.BatchNormalization(),
#     keras.layers.Activation(config['activation']),
#     keras.layers.Dropout(0.3),

#     keras.layers.Dense(train_output.shape[1]),
#     keras.layers.Activation('sigmoid'),
# ])
# model.summary()

In [None]:
def top2_postprocess(y_pred):
    y_out = np.zeros_like(y_pred)
    top2_idx = np.argsort(y_pred, axis=1)[:, -2:]
    for i in range(len(y_pred)):
        y_out[i, top2_idx[i]] = 1
    return y_out

def exact_match_accuracy(y_true, y_pred):
    y_pred_binary = tf.cast(y_pred > 0.5, tf.float32)
    # top 2
    # y_pred_binary = top2_postprocess(y_pred.numpy())

    y_true_binary = tf.cast(y_true > 0.5, tf.float32)  # 確保 y_true 也是 float32

    # 比對是否逐 row 全部正確
    match = tf.reduce_all(tf.equal(y_true_binary, y_pred_binary), axis=1)

    # 計算準確率
    accuracy = tf.reduce_mean(tf.cast(match, tf.float32))

    return accuracy

In [None]:
def combination_loss(lambda_constraint=1):
    def loss_fn(y_true, y_pred):
        # 二進制交叉熵損失
        # bce_loss = keras.losses.binary_crossentropy(y_true, y_pred, from_logits=True)
        bce_loss = tf.reduce_mean(keras.losses.binary_crossentropy(y_true, y_pred, from_logits=False))

        # 約束：只有兩個正數
        positive_count = tf.reduce_sum(y_pred, axis=1)
        # positive_count = tf.reduce_sum(tf.cast(probs > 0.5, tf.float32), axis=1)  # 每行正數的個數
        sum_constraint = tf.reduce_mean(tf.square(positive_count - 2.0))  # 平均每行正數個數與2的差的平方
        # sum_constraint = tf.reduce_mean(tf.square(tf.reduce_sum(y_pred, axis=1) - 2.0))

        # 總損失
        total_loss = bce_loss + lambda_constraint * sum_constraint
        return total_loss

    return loss_fn

In [None]:
class PositiveCountMonitor(Callback):
    def __init__(self, test_dataset):
        super().__init__()
        self.test_dataset = test_dataset  # tf.data.Dataset 格式

    def on_epoch_end(self, epoch, logs=None):
        pos_counts = []

        for batch in self.test_dataset:
            x_batch, _ = batch
            y_pred = self.model.predict(x_batch, verbose=0)
            binary_preds = tf.cast(y_pred > 0.5, tf.float32)
            batch_pos_counts = tf.reduce_sum(binary_preds, axis=1).numpy()  # 每個樣本正例數
            pos_counts.extend(batch_pos_counts)

        pos_counts = np.array(pos_counts)
        avg_pos = np.mean(pos_counts)
        std_pos = np.std(pos_counts)

        print(f'\n[monitor] Epoch {epoch+1}: Average positive count: {avg_pos:.2f}, Std: {std_pos:.2f}')

In [None]:
from tensorflow.keras.optimizers.schedules import CosineDecay

lr_schedule_fn = CosineDecay(initial_learning_rate=config['learning_rate'], decay_steps=config['epochs'] * (total_samples_train // config['batch_size']), alpha=0.1)
# optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule_fn)
optimizer = tf.keras.optimizers.Adam(learning_rate=config['learning_rate'])


model.compile(optimizer = optimizer,
              # loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True),
            #   loss = tf.keras.losses.BinaryCrossentropy(from_logits=False),
              loss = combination_loss(lambda_constraint = 0.3),
              metrics = [exact_match_accuracy])

lr_scheduler = keras.callbacks.ReduceLROnPlateau(
    monitor='val_exact_match_accuracy', factor=0.5, patience=5, min_lr=1e-6
)

early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_exact_match_accuracy', mode='max', patience=10, restore_best_weights=True
)

monitor = PositiveCountMonitor(train_dataset)

In [None]:
class LrLogger(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        lr = self.model.optimizer._decayed_lr(tf.float32).numpy()
        print(f"\nEpoch {epoch + 1}: Learning rate is {lr:.6f}")

In [None]:
# history = model.fit(train_dataset, validation_data=test_dataset, epochs=config['epochs'], batch_size=config['batch_size'], callbacks=[lr_scheduler, early_stopping, monitor], verbose=1)
history = model.fit(train_dataset, validation_data=val_dataset, epochs=config['epochs'], batch_size=config['batch_size'], verbose=1, callbacks=[lr_scheduler, early_stopping])
# history = model.fit(train_input, train_output, epochs=config['epochs'], batch_size=config['batch_size'], validation_split=0.2, callbacks=[lr_scheduler, early_stopping, monitor], verbose=1)

Epoch 1/10
   1703/Unknown [1m519s[0m 275ms/step - exact_match_accuracy: 0.0204 - loss: 0.6132



[1m1703/1703[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m539s[0m 287ms/step - exact_match_accuracy: 0.0204 - loss: 0.6131 - val_exact_match_accuracy: 0.3212 - val_loss: 0.5090 - learning_rate: 0.0010
Epoch 2/10
[1m1703/1703[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m460s[0m 269ms/step - exact_match_accuracy: 0.4938 - loss: 0.3109 - val_exact_match_accuracy: 0.5077 - val_loss: 0.3587 - learning_rate: 0.0010
Epoch 3/10
[1m1703/1703[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m501s[0m 269ms/step - exact_match_accuracy: 0.7475 - loss: 0.2028 - val_exact_match_accuracy: 0.6751 - val_loss: 0.3063 - learning_rate: 0.0010
Epoch 4/10
[1m  19/1703[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m7:15[0m 258ms/step - exact_match_accuracy: 0.7886 - loss: 0.1724

# Training result

In [None]:

# 取出歷史紀錄
# acc = history.history['accuracy']
acc = history.history['exact_match_accuracy']
val_acc = history.history['val_exact_match_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(acc) + 1)

# 畫準確率
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(epochs, acc, 'bo-', label='training accuracy')
plt.plot(epochs, val_acc, 'ro-', label='validation accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# 畫損失
plt.subplot(1, 2, 2)
plt.plot(epochs, loss, 'bo-', label='training loss')
plt.plot(epochs, val_loss, 'ro-', label='validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

# Test

In [None]:
model.evaluate(test_dataset, verbose=1)

In [None]:
predictions = model.predict(test_input)
# print('\nPredictions shape:', predictions.shape)
# predictions = (predictions > 0.5).astype(np.float32)  # 將概率轉換為二進制輸出
predictions = top2_postprocess(predictions)  # 將預測結果轉換為 top-2 的 one-hot 編碼
# print('predictions:', predictions[0:10])  # 顯示前10筆預測結果

sum_is_not_2_cnt = 0
accuracy = 0
for pred, label in zip(predictions, test_output):
    # print('Predicted:', pred, 'True:', label)
    accuracy += (pred == label).all()
    if np.sum(pred) != 2:
        sum_is_not_2_cnt += 1

accuracy /= len(predictions)
print('\nExact Match Accuracy:', accuracy)
print('Number of predictions not summing to 2:', sum_is_not_2_cnt)
print('Number of prediction summing to 2 ratio:', (len(predictions) - sum_is_not_2_cnt) / len(predictions))


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import multilabel_confusion_matrix, ConfusionMatrixDisplay

# y_true, y_pred 是多標籤格式，例如：
# y_true = np.array([[1, 0, 0, 1, 0, 0, 0, 0], [...], ...])
# y_pred = np.array([[1, 0, 1, 0, 0, 0, 0, 0], [...], ...])

# 計算每個類別的混淆矩陣
cm = multilabel_confusion_matrix(test_output, predictions)

# 類別名稱（可以自己命名）
class_names = [f"Class {i}" for i in range(test_output.shape[1])]

# 畫出每個類別的混淆矩陣
fig, axes = plt.subplots(2, (len(class_names)+1)//2, figsize=(15, 6))
axes = axes.ravel()

for i in range(len(class_names)):
    disp = ConfusionMatrixDisplay(confusion_matrix=cm[i],
                                   display_labels=["Not "+class_names[i], class_names[i]])
    disp.plot(ax=axes[i], values_format='d', cmap='Blues')
    axes[i].set_title(class_names[i])

plt.tight_layout()
plt.show()
