In [None]:
import random
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow.keras.backend as K
from tensorflow import keras
from sklearn import model_selection
import imgaug.augmenters as iaa
import imgaug as ia
import os

In [None]:
#unzip miniImageNet dataset
!unzip '/content/drive/MyDrive/miniimagenet/mini/mini-imagenet.zip' -d '/content'
!rm /content/images/.DS_Store

[1;30;43m流式输出内容被截断，只能显示最后 5000 行内容。[0m
  inflating: /content/images/n0679411000000879.jpg  
  inflating: /content/images/n0679411000000880.jpg  
  inflating: /content/images/n0679411000000881.jpg  
  inflating: /content/images/n0679411000000882.jpg  
  inflating: /content/images/n0679411000000885.jpg  
  inflating: /content/images/n0679411000000894.jpg  
  inflating: /content/images/n0679411000000895.jpg  
  inflating: /content/images/n0679411000000897.jpg  
  inflating: /content/images/n0679411000000898.jpg  
  inflating: /content/images/n0679411000000899.jpg  
  inflating: /content/images/n0679411000000900.jpg  
  inflating: /content/images/n0679411000000901.jpg  
  inflating: /content/images/n0679411000000902.jpg  
  inflating: /content/images/n0679411000000904.jpg  
  inflating: /content/images/n0679411000000905.jpg  
  inflating: /content/images/n0679411000000909.jpg  
  inflating: /content/images/n0679411000000912.jpg  
  inflating: /content/images/n0679411000000913.jpg  
  inf

In [None]:
ia.seed(1)
def Data_Aug(samples):
    randomaug = iaa.Sometimes(
        0.5,
        iaa.OneOf([
            iaa.Sequential([
                iaa.AllChannelsCLAHE(clip_limit=(1, 10)),
                iaa.Affine(rotate=(-5, 5))
            ]),
            iaa.Sequential([
                iaa.Affine(translate_percent={
                    "x": (-0.2, 0.2),
                    "y": (-0.2, 0.2)
                }),
                iaa.Multiply((0.7, 1.3)),
                iaa.AdditiveLaplaceNoise(scale=0.03 * 255, per_channel=True),
                iaa.Fliplr(0.3),
                iaa.Flipud(0.3)
            ]),
            iaa.Sequential([
                iaa.Affine(rotate=(-10, 10)),
                iaa.Multiply((0.7, 1.3)),
                iaa.AdditiveLaplaceNoise(scale=0.03 * 255, per_channel=True),
                iaa.Fliplr(0.3),
                iaa.Flipud(0.3)
            ]),
            iaa.Sequential([
                iaa.PerspectiveTransform(scale=(0, 0.15)),
                iaa.Multiply((0.7, 1.3)),
                iaa.AdditiveLaplaceNoise(scale=0.03 * 255, per_channel=True),
                iaa.Fliplr(0.3),
                iaa.Flipud(0.3)
            ])
        ]))


    return randomaug.augment_images(samples)

def resize_images(imgs, hight, width):
    seq = iaa.Resize({"height": hight, "width": width})
    aug = seq.augment_images(imgs)
    return aug

In [None]:
#load dataset
def load_data(image_dir, scv_dir):
  data = pd.read_csv(scv_dir)
  data_x = []
  data_y = []
  for name,label in zip(data['filename'],data['label']):
    img = keras.preprocessing.image.load_img(os.path.join(image_dir, name),target_size=(84,84))
    img = keras.preprocessing.image.img_to_array(img,dtype='uint8')
    data_x.append(img)
    data_y.append(label)
  data_x = np.array(data_x)
  data_y = np.array(data_y)
  return data_x, data_y
class minidata(keras.utils.Sequence):
    def __init__(self, data_x, data_y, batch_size, train):
      self.data_x = data_x
      self.data_y = data_y
      self.train = train
      self.batch_size = batch_size

    def __len__(self):
      return int(np.ceil(len(self.data_y) / self.batch_size))

    def __getitem__(self, idx):
      batch_x = self.data_x[idx * self.batch_size:(idx + 1) * self.batch_size]
                            
      batch_y = self.data_y[idx * self.batch_size:(idx + 1) * self.batch_size]
      if self.train == True:
          batch_x = Data_Aug(batch_x)

      batch_x = np.clip(batch_x / 255.0, 0, 1).astype("float32")
      batch_y = keras.utils.to_categorical(batch_y, 100)
      return batch_x, batch_y

In [None]:
#save the best model during training 
class CustomEarlyStoppingAndSave(keras.callbacks.Callback):
    def __init__(self, save_path, patience=0):
        super(CustomEarlyStoppingAndSave, self).__init__()
        #self.patience = patience
        self.save_path = save_path
        self.best_weights = None

    def on_train_begin(self, logs=None):
        #self.wait = 0
        self.stopped_epoch = 0
        #self.best_val_distance_loss = np.Inf
        self.best_val_acc = 0

    def on_epoch_end(self, epoch, logs=None):
        #val_distance_loss = logs.get('val_MMD_loss')
        val_acc = logs.get('val_accuracy') 

        if  np.greater(val_acc, self.best_val_acc):
            #self.best_val_distance_loss = val_distance_loss
            self.best_val_acc = val_acc
            #self.wait = 0
            # Record the best weights if current results is better (less).
            self.best_weights = self.model.get_weights()
            self.stopped_epoch = epoch
        #else:
            #self.wait += 1
            #if self.wait >= self.patience:
                #self.stopped_epoch = epoch
                #self.model.stop_training = True
                #print("Restoring model weights from the end of the best epoch.")
                #self.model.set_weights(self.best_weights)

    def on_train_end(self, logs=None):
        self.model.set_weights(self.best_weights)
        if self.stopped_epoch >= 0:
            print("Epoch %05d: early stopping and save the model" %
                  (self.stopped_epoch + 1))
        self.model.save(self.save_path)

In [None]:
def creat_ResNet(input_shape, num_classes=100, num_filters = 64, random_seed=None):
  inputs = keras.Input(shape=input_shape)
  y = keras.layers.Conv2D(num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(inputs)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  x = keras.layers.Conv2D(num_filters, (1, 1), kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(inputs)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.add([x, y])
  x = keras.layers.LeakyReLU(alpha=0.1)(x)
  x = keras.layers.MaxPooling2D((2,2))(x)

  y = keras.layers.Conv2D(2*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(2*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(2*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  x = keras.layers.Conv2D(2*num_filters, (1, 1), kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.add([x, y])
  x = keras.layers.LeakyReLU(alpha=0.1)(x)
  x = keras.layers.MaxPooling2D((2,2))(x) 

  y = keras.layers.Conv2D(4*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(4*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(4*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  x = keras.layers.Conv2D(4*num_filters, (1, 1), kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.add([x, y])
  x = keras.layers.LeakyReLU(alpha=0.1)(x)
  x = keras.layers.MaxPooling2D((2,2))(x) 

  y = keras.layers.Conv2D(8*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(8*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  y = keras.layers.Conv2D(8*num_filters, (3, 3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(y)
  y = keras.layers.BatchNormalization()(y)
  y = keras.layers.LeakyReLU(alpha=0.1)(y)
  x = keras.layers.Conv2D(8*num_filters, (1, 1), kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.add([x, y])
  x = keras.layers.LeakyReLU(alpha=0.1)(x)
  last_acti = keras.layers.MaxPooling2D((2,2))(x) 

  x = keras.layers.GlobalAveragePooling2D()(last_acti)
  outputs = keras.layers.Dense(num_classes, activation='softmax', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  model = keras.Model(inputs=inputs, outputs=outputs)
  return model

In [None]:
def create_AlexNet(input_shape, num_classes=10, random_seed=None):
  inputs = keras.Input(shape=input_shape)
  x = keras.layers.Conv2D(96, (3,3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(inputs)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.MaxPooling2D(pool_size=(3,3),strides=(2,2),padding='same')(x)
  
  x = keras.layers.Conv2D(256, (3,3), padding='same', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x) 
  x = keras.layers.MaxPooling2D(pool_size=(3,3),strides=(2,2),padding='same')(x)
  
  x = keras.layers.Conv2D(384, (3,3), padding='same', activation='relu', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x) 
  x = keras.layers.Conv2D(384, (3,3), padding='same', activation='relu', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x) 
  x = keras.layers.Conv2D(256, (3,3), padding='same', activation='relu', kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  feature_maps = keras.layers.MaxPooling2D(pool_size=(3,3),strides=(2,2),padding='same')(x)

  x = keras.layers.Flatten()(feature_maps)
  #x = keras.layers.Dense(256,activation='relu')(x)
  #x = keras.layers.Dropout(0.5)(x)
  x = keras.layers.Dense(128,activation='relu')(x)
  x = keras.layers.Dropout(0.5)(x)
  outputs = keras.layers.Dense(num_classes, activation='softmax')(x)
  model = keras.Model(inputs=inputs, outputs=outputs)
  return model

In [None]:
def create_VGGNet(input_shape, num_classes,  random_seed=None):
  inputs = keras.Input(shape=input_shape)
  x = keras.layers.Conv2D(64, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(inputs)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(64, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.MaxPooling2D((2, 2))(x)
  x = keras.layers.Dropout(0.2)(x)

  x = keras.layers.Conv2D(128, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(128, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.MaxPooling2D((2, 2))(x)
  x = keras.layers.Dropout(0.3)(x)

  x = keras.layers.Conv2D(256, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(256, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(256, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.MaxPooling2D((2, 2))(x)
  x = keras.layers.Dropout(0.4)(x)


    
    
  x = keras.layers.Conv2D(512, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(512, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(512, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.MaxPooling2D((2, 2))(x)
  x = keras.layers.Dropout(0.5)(x)
  
  

  '''  
  x = keras.layers.Conv2D(512, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(512, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Conv2D(512, (3, 3), padding='same',  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.MaxPooling2D((2, 2))(x)
  x = keras.layers.Dropout(0.5)(x) 
  '''
 

  #x = keras.layers.GlobalAveragePooling2D()(x)
  x = keras.layers.Flatten()(x)
  x = keras.layers.Dense(256,  kernel_initializer=keras.initializers.HeNormal(seed=random_seed))(x)
  x = keras.layers.BatchNormalization()(x)
  x = keras.layers.Activation('relu')(x)
  x = keras.layers.Dropout(0.5)(x)
  outputs = keras.layers.Dense(num_classes, activation='softmax')(x)
  return keras.Model(inputs=inputs, outputs=outputs)

In [None]:
#load dataset to our programm
train_dir = '/content/drive/MyDrive/miniimagenet/new_train.csv'
val_dir = '/content/drive/MyDrive/miniimagenet/new_val.csv'
test_dir = '/content/drive/MyDrive/miniimagenet/new_test.csv'
image_dir = '/content/images'
x_train, y_train = load_data(image_dir, train_dir)
x_val, y_val = load_data(image_dir, val_dir)
x_test, y_test = load_data(image_dir, test_dir)
_, x_val, _, y_val = model_selection.train_test_split(
    x_val,
    y_val,
    test_size=2000,
    random_state=0,
    stratify=y_val)
#select training samples
train_index = []
random.seed(0)
for i in range(100):
    train_index += (random.sample(list(np.where(y_train == i)[0]), 100))
random.shuffle(train_index)
sampled_x_train = x_train[train_index]
sampled_y_train = y_train[train_index]
del train_index

In [None]:
#construct training model
input_size = (84, 84, 3)
model = creat_ResNet(input_shape=input_size, num_classes=100, num_filters = 64, random_seed=1)
model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3),
                    loss=['categorical_crossentropy'],
                    metrics=['accuracy'])

train_sequence = minidata(sampled_x_train, sampled_y_train, batch_size=10, train=True)
validation_sequence = minidata(x_val, y_val, batch_size=10, train=False)


history = model.fit(
    train_sequence,
    epochs=200,
    validation_data=validation_sequence,
    #validation_steps=12,
    callbacks=CustomEarlyStoppingAndSave(
        save_path='100_samples_seed_fix.h5',
        patience=10))

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

In [None]:
test_sequence = minidata(x_test, y_test, batch_size=10, train=False)

In [None]:
evaluate1 = model.evaluate(test_sequence)
evaluate1



[5.866496562957764, 0.3942500054836273]

In [None]:
model_train = keras.models.load_model('100_samples_seed_fix.h5')
evaluate2 = model_train.evaluate(test_sequence)
evaluate2



[5.866496562957764, 0.3942500054836273]