In [1]:
import numpy as np
from copy import deepcopy
import tensorflow as tf
import tensorflow.keras as keras
import matplotlib.pyplot as plt
from tensorflow.keras import layers,regularizers,metrics,optimizers
import random
import pandas as pd
from scipy.linalg import sqrtm
import pickle
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import math

In [2]:
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
config=tf.compat.v1.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.8
config.gpu_options.allow_growth=True
sess=tf.compat.v1.Session(config=config) 

In [None]:
"""
This code is used to train ResNet-32 using the complete CIFAR-10 dataset. 
It employs an explicit Weight Decay SGD optimizer along with WarmUp + 
Cosine Annealing Learning Rate Scheduler, and enhances the generalization 
ability of the trained network through data augmentation.
"""

In [3]:
num_classes = 10
initial_lr = 0.1
weight_decay = 1e-4
epochs = 200
warmup_epochs = 5
batch_size = 32
image_size = 32

In [4]:
with open('data.pkl', 'rb') as f:
    [x_train,y_train,x_test,y_test]=pickle.load(f)
y_train_onehot=tf.keras.utils.to_categorical(y_train,num_classes=3)
y_test_onehot=tf.keras.utils.to_categorical(y_test,num_classes=3)

In [5]:
NN_32=[16,16,16,16,16,16,32,32,32,32,32,64,64,64,64,64]

In [6]:
def conv_bn_relu(x, filters, kernel_size, strides=1):
    x = tf.keras.layers.Conv2D(filters, kernel_size, strides=strides, padding='same',use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    return tf.keras.layers.ReLU()(x)

def residual_block(x, filters, downsample=False):
    shortcut = x
    strides = 2 if downsample else 1
    x = conv_bn_relu(x, filters, 3, strides)
    x = tf.keras.layers.Conv2D(filters, 3, strides=1, padding='same',use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    if downsample or shortcut.shape[-1] != filters:
        shortcut = tf.keras.layers.Conv2D(filters, 1, strides=strides, padding='same',use_bias=False)(shortcut)
        shortcut = tf.keras.layers.BatchNormalization()(shortcut)
    x = tf.keras.layers.add([x, shortcut])
    return tf.keras.layers.ReLU()(x)

def build_resnet32(NN,input_shape=(32,32,3), num_classes=3):
    inputs = tf.keras.Input(shape=input_shape)
    x = conv_bn_relu(inputs, NN[0], 3)
    x = residual_block(x, NN[1])
    x = residual_block(x, NN[2])
    x = residual_block(x, NN[3])
    x = residual_block(x, NN[4])
    x = residual_block(x, NN[5])
    x = residual_block(x, NN[6], downsample=True)
    x = residual_block(x, NN[7])
    x = residual_block(x, NN[8])
    x = residual_block(x, NN[9])
    x = residual_block(x, NN[10])
    x = residual_block(x, NN[11], downsample=True)
    x = residual_block(x, NN[12])
    x = residual_block(x, NN[13])
    x = residual_block(x, NN[14])
    x = residual_block(x, NN[15])
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    outputs = tf.keras.layers.Dense(num_classes,activation='softmax')(x)
    return tf.keras.Model(inputs, outputs)
model = build_resnet32(NN_32)

In [7]:
decay_vars = []
no_decay_vars = []
for var in model.trainable_variables:
    if 'kernel' in var.name and 'bn' not in var.name.lower():
        decay_vars.append(var)
    else:
        no_decay_vars.append(var)

In [8]:
class WarmUpCosine(tf.keras.optimizers.schedules.LearningRateSchedule):
     """
   WarmUp + Cosine Annealing Learning Rate Scheduler 
   This scheduler divides the training process into two stages:
   1) Warm-up stage (step < warmup_steps):
   The learning rate linearly increases from warmup_lr to base_lr, 
   aiming to alleviate the instability of gradients or excessive 
   parameter updates at the beginning of training, thereby enhancing 
   the stability of the training process.
   2) Cosine Annealing stage (step >= warmup_steps):
   The learning rate smoothly decays from base_lr to a value close 
   to 0 according to the cosine function, which is helpful for 
   conducting more precise parameter search in the later stage of 
   training and improving the final convergence performance. 
   """
    def __init__(self, base_lr, total_steps, warmup_steps, warmup_lr=0.0):
        """
        Parameter description:
        - base_lr: The maximum learning rate to be used after the warm-up period
        - total_steps: The total number of steps for training (epochs × steps_per_epoch)
        - warmup_steps: The number of steps in the warm-up phase
        - warmup_lr: The initial learning rate for warm-up, default is 0
        """
        super().__init__()
        self.base_lr = base_lr
        self.total_steps = total_steps
        self.warmup_steps = warmup_steps
        self.warmup_lr = warmup_lr
    def __call__(self, step):
        if step is None:
            step = tf.constant(0)
        step = tf.cast(step, tf.float32)
        warmup_steps = tf.cast(self.warmup_steps, tf.float32)
        total_steps = tf.cast(self.total_steps, tf.float32)
        warmup_percent_done = step / warmup_steps
        learning_rate = tf.where(
            step < warmup_steps,
            self.warmup_lr + (self.base_lr - self.warmup_lr) * warmup_percent_done,
            self.base_lr * 0.5 * (1.0 + tf.cos(math.pi * (step - warmup_steps) / (total_steps - warmup_steps)))
        )
        return learning_rate
    def get_config(self):
        return {
            "base_lr": self.base_lr,
            "total_steps": self.total_steps,
            "warmup_steps": self.warmup_steps,
            "warmup_lr": self.warmup_lr,
        }
total_steps = epochs * (x_train.shape[0] // batch_size)
warmup_steps = warmup_epochs * (x_train.shape[0] // batch_size)
lr_schedule = WarmUpCosine(initial_lr, total_steps, warmup_steps)

In [9]:
class CustomWeightDecaySGD(tf.keras.optimizers.SGD):
    """
    SGD optimizer with explicit Weight Decay (Decoupled Weight Decay) 
    This optimizer, based on the standard SGD, manually applies weight 
    decay to the parameters, instead of achieving it through the L2 
    regularization term in the loss. This "decoupled weight decay"
    is in line with the idea of AdamW/SGDW and can avoid the problem 
    that weight decay is indirectly scaled by factors such as learning 
    rate and momentum. 
    """
    def __init__(self, weight_decay, **kwargs):
        """
        Parameter description:
        - weight_decay: Weight decay coefficient (usually ranging from 1e-4 to 1e-2)
        - **kwargs: Other parameters are directly passed to tf.keras.optimizers.SGD,
        such as learning_rate, momentum, nesterov, etc. 
        """
        super().__init__(**kwargs)
        self.weight_decay = weight_decay
    def apply_gradients(self, grads_and_vars, name=None, experimental_aggregate_gradients=True):
        super().apply_gradients(grads_and_vars, name, experimental_aggregate_gradients)
        for grad, var in grads_and_vars:
            if ('kernel' in var.name) and ('bn' not in var.name.lower()):
                var.assign_sub(self.weight_decay * var)
    def get_config(self):
        config = super().get_config()
        config.update({
            "weight_decay": float(self.weight_decay),  # 确保是float
        })
        return config
optimizer = CustomWeightDecaySGD(
    weight_decay=weight_decay,
    learning_rate=lr_schedule,
    momentum=0.9,
    nesterov=True
)

In [10]:
datagen = ImageDataGenerator(
            featurewise_center=False,  # set input mean to 0 over the dataset
            samplewise_center=False,  # set each sample mean to 0
            featurewise_std_normalization=False,  # divide inputs by std of the dataset
            samplewise_std_normalization=False,  # divide each input by its std
            zca_whitening=False,  # apply ZCA whitening
            rotation_range=15,  # randomly rotate images in the range (degrees, 0 to 180)
            width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
            height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
            horizontal_flip=True,  # randomly flip images
            vertical_flip=False)  # randomly flip images
        # (std, mean, and principal components if ZCA whitening is applied).
datagen.fit(x_train)

In [11]:
loss_fn=tf.keras.losses.CategoricalCrossentropy()
model.compile(
    optimizer=optimizer,
    loss=loss_fn,
    metrics=['accuracy']
)

In [12]:
model.fit(datagen.flow(x_train, y_train_onehot,batch_size=batch_size),
                            steps_per_epoch=x_train.shape[0] // batch_size,
                            epochs=epochs,
                            validation_data=(x_test, y_test_onehot),verbose=2)

Epoch 1/200
93/93 - 9s - loss: 1.0264 - accuracy: 0.4956 - val_loss: 1.0140 - val_accuracy: 0.4600 - 9s/epoch - 93ms/step
Epoch 2/200
93/93 - 3s - loss: 0.9712 - accuracy: 0.5573 - val_loss: 1.0958 - val_accuracy: 0.4813 - 3s/epoch - 37ms/step
Epoch 3/200
93/93 - 3s - loss: 0.8534 - accuracy: 0.6078 - val_loss: 7.8685 - val_accuracy: 0.3913 - 3s/epoch - 37ms/step
Epoch 4/200
93/93 - 3s - loss: 0.7470 - accuracy: 0.6698 - val_loss: 7.2596 - val_accuracy: 0.4057 - 3s/epoch - 36ms/step
Epoch 5/200
93/93 - 3s - loss: 0.6966 - accuracy: 0.7042 - val_loss: 5.7838 - val_accuracy: 0.3870 - 3s/epoch - 36ms/step
Epoch 6/200
93/93 - 3s - loss: 0.6624 - accuracy: 0.7099 - val_loss: 3.2887 - val_accuracy: 0.4503 - 3s/epoch - 36ms/step
Epoch 7/200
93/93 - 3s - loss: 0.6263 - accuracy: 0.7446 - val_loss: 1.1592 - val_accuracy: 0.5833 - 3s/epoch - 37ms/step
Epoch 8/200
93/93 - 3s - loss: 0.5815 - accuracy: 0.7564 - val_loss: 5.2499 - val_accuracy: 0.3943 - 3s/epoch - 36ms/step
Epoch 9/200
93/93 - 3s -

<keras.callbacks.History at 0x14a4d613f28>

In [13]:
model.save("Res_32.h5")

  layer_config = serialize_layer_fn(layer)
