In [None]:
# model modifying
dropout_rate = 0.054988

# bzq modifying
len_x_target = 3
len_y_target = 3
stride_x_target = 1
stride_y_target = 1

# mean, std proportion
alpha = 0.5

bins_size = 30  # 統計採樣數
poly_degree = bins_size - 1
window_size = 1

#target image preprocessing
angle = 0
pixels = 0

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks
import os
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
import pickle
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
import seaborn as sns
from sklearn.preprocessing import PolynomialFeatures
import pandas as pd
from sklearn.metrics import r2_score
from scipy.interpolate import interp1d
import tensorflow_datasets as tfds
from collections import Counter
import scipy.ndimage
from tensorflow.keras.callbacks import ModelCheckpoint
import keras
import gc
import random

In [None]:
# Define the ResNet-20 V1 architecture
def resnet_block(inputs, filters, kernel_size=3, stride=1, activation='relu', dropout_rate=dropout_rate):
    x = layers.Dropout(dropout_rate)(inputs, training=True)  # Add Dropout layer here
    x = layers.Conv2D(filters, kernel_size, strides=stride, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(activation)(x)
    x = layers.Dropout(dropout_rate)(x, training=True)  # Add Dropout layer here
    x = layers.Conv2D(filters, kernel_size, strides=1, padding='same')(x)
    x = layers.BatchNormalization()(x)
    
    if stride != 1 or inputs.shape[-1] != filters:
        shortcut = layers.Conv2D(filters, kernel_size=1, strides=stride, padding='same')(inputs)
    else:
        shortcut = inputs
    
    x = layers.add([x, shortcut])
    x = layers.Activation(activation)(x)
    return x

def resnet_v1(input_shape, num_classes, dropout_rate=dropout_rate):
    inputs = layers.Input(shape=input_shape)
    x = layers.Dropout(dropout_rate)(inputs, training=True)  # Add Dropout layer here
    x = layers.Conv2D(16, kernel_size=3, strides=1, padding='same')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    
    for _ in range(3):
        x = resnet_block(x, 16)
    for _ in range(3):
        x = resnet_block(x, 32, stride=2)
    for _ in range(3):
        x = resnet_block(x, 64, stride=2)
    
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(dropout_rate)(x, training=True)  # Add Dropout layer here
    x = layers.Dense(num_classes, activation='softmax', kernel_initializer='he_normal')(x)

    model = models.Model(inputs, x)
    return model

# Load CIFAR-10 data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
y_train, y_test = tf.keras.utils.to_categorical(y_train), tf.keras.utils.to_categorical(y_test)

# 創建 tf.data.Dataset 並加入 RandomRotation 
data_augmentation = tf.keras.Sequential([
    layers.Lambda(lambda x: tf.image.random_flip_left_right(x)),
    layers.Lambda(lambda x: tf.pad(x, [[4, 4], [4, 4], [0, 0]])), 
    layers.Lambda(lambda x: tf.image.random_crop(x, (32, 32, 3))), 
    layers.Lambda(lambda x: tf.cast(x, tf.float32)),
    ]) # 隨機旋轉圖片
                                         
# Create tf.data.Dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_dataset = train_dataset.shuffle(buffer_size=50000).batch(5).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.map(lambda x, y: (tf.cast(x, tf.float32), y)).batch(5)

# Define model
model = resnet_v1(input_shape=(32, 32, 3), num_classes=10)

# Compile model
initial_lr = 0.000250
model.compile(optimizer=optimizers.Adam(learning_rate=initial_lr),
              loss='categorical_crossentropy', 
              metrics=['categorical_accuracy'])

# Learning rate schedule
def lr_schedule(epoch):
    lr = initial_lr
    if epoch > 180:
        lr *= 0.5e-3
    elif epoch > 160:
        lr *= 1e-3
    elif epoch > 120:
        lr *= 1e-2
    elif epoch > 80:
        lr *= 1e-1
    return lr

lr_scheduler = callbacks.LearningRateScheduler(lr_schedule)

# Train model
epochs = 200
if os.path.exists('cifarcD.weights.h5'):
    model.load_weights("cifarcD.weights.h5")
    model.compile(loss=keras.losses.CategoricalCrossentropy(from_logits=True), metrics=[keras.metrics.CategoricalAccuracy()])
else:
    model.fit(train_dataset,
              validation_data=test_dataset,
              epochs=epochs,
              callbacks=[lr_scheduler])
    model.save_weights("cifarcD.weights.h5")


In [None]:

correct_predictions_cifar = []
incorrect_predictions_cifar = []


# 下載並準備CIFAR-10-C資料集
def load_cifar10_c():
    url = 'https://zenodo.org/record/2535967/files/CIFAR-10-C.tar'
    path = tf.keras.utils.get_file('CIFAR-10-C.tar', url, untar=True)
    return path

# 載入CIFAR-10-C資料集
def load_cifar10_c_data(data_dir):
    corruption_types = ['brightness', 'contrast', 'defocus_blur', 'elastic_transform', 'fog', 'frost', 'gaussian_blur', 'gaussian_noise', 'glass_blur', 'impulse_noise', 'pixelate', 'saturate', 'shot_noise', 'spatter', 'speckle_noise', 'zoom_blur']
    #corruption_types = ['gaussian_blur']
    images = []
    labels = []

    for corruption in corruption_types:
        print(corruption)
        file_path = os.path.join(data_dir.replace(".tar", ""), f'{corruption}.npy')
        with open(file_path, 'rb') as f:
            all_images = np.load(f)
            images.append(all_images[20000:30000])
            #images.append(np.load(f))
        labels.append(np.load(os.path.join(data_dir.replace(".tar", ""), 'labels.npy'))[20000:30000])

    images = np.concatenate(images, axis=0)
    labels = np.concatenate(labels, axis=0)
    return images, labels

# 下載資料集
cifar10_c_path = load_cifar10_c()

# 載入CIFAR-10-C資料集
test_images, test_labels = load_cifar10_c_data(cifar10_c_path)

test_images = np.float32(test_images)
preprocessed_data = test_images

test_labels = tf.keras.utils.to_categorical(test_labels, num_classes=10)


In [None]:
print(preprocessed_data.shape, test_labels.shape)
'''
def data_generator(preprocessed_data, test_labels):
    for image, label in zip(preprocessed_data, test_labels):
        yield image, label

batch_size = 32  # 可以根據需要調整批次大小
dataset = tf.data.Dataset.from_generator(
    lambda: data_generator(preprocessed_data, test_labels),
    output_signature=(
        tf.TensorSpec(shape=(32, 32, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(10,), dtype=tf.int64)
    )
)
dataset = dataset.batch(batch_size)

scores = model.evaluate(dataset)
print(f"Test Loss: {scores[0]}")
print(f"Test Accuracy: {scores[1]}")
'''


In [None]:
batch_size = 10000  # 設定批次大小

original_predictions_cifar = []
for start in range(0, len(preprocessed_data), batch_size):
    end = min(start + batch_size, len(preprocessed_data))
    batch_data = preprocessed_data[start:end]
    batch_labels = test_labels[start:end]

    batch_predictions_cifar = model.predict(batch_data, verbose=0)
    original_predictions_cifar.append(batch_predictions_cifar)

    for i in range(len(batch_data)):
        if np.argmax(batch_predictions_cifar[i]) == np.argmax(batch_labels[i]):
            correct_predictions_cifar.append(start + i)
        else:
            incorrect_predictions_cifar.append(start + i)

original_predictions_cifar = np.vstack(original_predictions_cifar)

print(f"{len(correct_predictions_cifar)}, {len(incorrect_predictions_cifar)}")

print(len(correct_predictions_cifar) / (len(correct_predictions_cifar) + len(incorrect_predictions_cifar)))

In [None]:
# Define MC Dropout prediction function
def mc_dropout_predictions(model, X, n_predictions):
    f = tf.function(lambda x: model(x, training=True))  # Keep dropout during prediction
    predictions = [f(X) for _ in range(n_predictions)]
    return tf.stack(predictions)

mean_preds = []
# Generate MC Dropout predictions
for start in range(0, len(preprocessed_data), batch_size):
    end = min(start + batch_size, len(preprocessed_data))
    n_predictions = 128
    batch_data = preprocessed_data[start:end]
    preds = mc_dropout_predictions(model, batch_data, n_predictions)
    mean_preds.extend(tf.reduce_mean(preds, axis=0))
    #std_preds += tf.math.reduce_std(preds, axis=0)
mean_preds = tf.stack(mean_preds)
confidence = np.max(mean_preds, axis=1)
print("平均預測值:", confidence)

In [None]:
from collections import defaultdict
#vanilla
# 初始化信心值和準確率列表
result_pred_cifar = np.ones(len(preprocessed_data)) 
for i in incorrect_predictions_cifar:
    result_pred_cifar[i] = 0

print(sum(result_pred_cifar))

# 初始化 confidence_map_vanilla 為 defaultdict
confidence_map_vanilla = defaultdict(list)

# 將預測結果和信心值存入字典
for i, _ in enumerate(original_predictions_cifar):
    conf = np.max(mean_preds[i])
    confidence_map_vanilla[conf].append(result_pred_cifar[i])

print("finish")
print(len(confidence_map_vanilla))

confidence_values_vanilla = []
accuracies_vanilla = []
element_counts_vanilla = []

# 計算每個信心值範圍的準確率
sorted_confidences = sorted(confidence_map_vanilla.keys(), reverse=True)
combined_results_vanilla = []

for confidence in sorted_confidences:
    combined_results_vanilla.extend(confidence_map_vanilla[confidence])
    element_count_vanilla = len(combined_results_vanilla)
    accuracy_vanilla = np.mean(combined_results_vanilla)
    confidence_values_vanilla.append(confidence)
    accuracies_vanilla.append(accuracy_vanilla)
    element_counts_vanilla.append(element_count_vanilla)
 


In [None]:
# 繪製圖形
plt.figure(figsize=(10, 6))
plt.plot(confidence_values_vanilla, element_counts_vanilla, marker='o', linestyle='-', color='b')
plt.xlabel('Confidence Threshold (τ)')
plt.ylabel('Number of Elements (p(y|x) >= τ)')
plt.title('Confidence Threshold vs Number of Elements')
plt.grid(True)
plt.show()

# 假設 confidence_values_vanilla、accuracies_vanilla、confidence_values_scaled 和 accuracies 已經定義
plt.figure(figsize=(10, 6))
plt.plot(confidence_values_vanilla, accuracies_vanilla, marker='.', linestyle='-', color='b', label='Vanilla', markersize=4)

# 設定 X 軸顯示範圍: 0.0 ~ 1.0
plt.xlim(0.0, 1.0)

# 設定 Y 軸顯示範圍: 0.65 ~ 1.0
plt.ylim(0.65, 1.0)

# 新增垂直線
plt.axvline(x=0.6827, color='g', linestyle='--', label='x=0.6827')
plt.axvline(x=0.9545, color='m', linestyle='--', label='x=0.9545')
plt.axvline(x=0.9973, color='c', linestyle='--', label='x=0.9973')

# 找到最接近的值
def find_nearest(array, value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return idx

idx_6827_vanilla = find_nearest(confidence_values_vanilla, 0.6827)
idx_9545_vanilla = find_nearest(confidence_values_vanilla, 0.9545)
idx_9973_vanilla = find_nearest(confidence_values_vanilla, 0.9973)

# 新增交點標記
plt.scatter([confidence_values_vanilla[idx_6827_vanilla], confidence_values_vanilla[idx_9545_vanilla], confidence_values_vanilla[idx_9973_vanilla]], 
            [accuracies_vanilla[idx_6827_vanilla], accuracies_vanilla[idx_9545_vanilla], accuracies_vanilla[idx_9973_vanilla]], 
            color='black', zorder=5)

plt.xlabel('Confidence Threshold (τ)')
plt.ylabel('Accuracy (p(y|x) >= τ)')
plt.title('Confidence vs Accuracy')
plt.legend()
plt.grid(True)
plt.show()

print(accuracies_vanilla[idx_6827_vanilla], accuracies_vanilla[idx_9545_vanilla], accuracies_vanilla[idx_9973_vanilla])

In [None]:
def calculate_ece(confidences, labels, num_bins=15):
    bin_boundaries = np.linspace(0, 1, num_bins + 1)
    bin_lowers = bin_boundaries[:-1]
    bin_uppers = bin_boundaries[1:]

    ece = 0.0
    for bin_lower, bin_upper in zip(bin_lowers, bin_uppers):
        in_bin = np.logical_and(confidences > bin_lower, confidences <= bin_upper)
        prop_in_bin = np.mean(in_bin)
        if prop_in_bin > 0:
            accuracy_in_bin = np.mean(labels[in_bin])
            avg_confidence_in_bin = np.mean(confidences[in_bin])
            ece += np.abs(avg_confidence_in_bin - accuracy_in_bin) * prop_in_bin

    return ece


print(np.max(mean_preds, axis=1))
# 計算ECE
ece = [calculate_ece(np.max(mean_preds, axis=1)[10000 * i : 10000 * (i + 1)], 
                     result_pred_cifar[10000 * i : 10000 * (i + 1)]) 
                     for i in range(16)]

print("Expected Calibration Error (ECE):", ece)
fig, ax = plt.subplots() 
ax.boxplot(ece) 
ax.set_title('ECE Boxplot') 
ax.set_ybound(0, 0.7)
ax.set_ylabel('ECE') 
plt.show()

print(np.percentile(ece, 25), np.percentile(ece, 50), np.percentile(ece, 75))