In [None]:
import os
import cv2
import h5py
import time
import imgaug as ia
import imgaug.augmenters as iaa
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
cpus = tf.config.experimental.list_physical_devices(device_type='CPU')

# tf.config.experimental.set_visible_devices(devices=cpus[0], device_type='CPU')
# tf.config.experimental.set_visible_devices(devices=gpus[0], device_type='GPU')
# for gpu in gpus:
#     tf.config.experimental.set_memory_growth(device=gpu, enable=True)
    
tf.config.experimental.set_virtual_device_configuration(
    gpus[0],
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=8192)])

In [None]:
from model.models import CNN, MLP
from layers.utils import get_custom_objects
from layers import densenet, stn, inception_v3, anonymous_net, xception, vgg, resnet, resnet_v2, resnext, mobilenet, mobilenet_v2

In [None]:
NUM_CLASSES = 257
IMAGE_SIZE = 224
DATA_DIR = 'data/256_ObjectCategories/'
CACHE_FILE = 'data/dataset_{}.h5'.format(IMAGE_SIZE)
SEED = 1127
MODEL_FILE = 'data/model.keras'

In [None]:
class H5Cacher():
    """ H5 数据缓存器 """

    def __init__(self, path):
        self.path = path

    def load(self):
        data = {}
        try:
            h5f = h5py.File(self.path, 'r')
            for key in h5f.keys():
                data[key] = h5f[key][:]
        finally:
            h5f.close()
        return data

    def dump(self, data):
        try:
            h5f = h5py.File(self.path, 'w')
            for key in data.keys():
                h5f[key] = data[key]
        finally:
            h5f.close()

    def __call__(self, get_data):
        def get_data_wrapper():
            if os.path.isfile(self.path):
                print('data << {}'.format(self.path))
                data = self.load()
            else:
                data = get_data()
                print('data >> {}'.format(self.path))
                self.dump(data)
            return data
        return get_data_wrapper

@H5Cacher(CACHE_FILE)
def read_images():
    dir_names = os.listdir(DATA_DIR)
    X = []
    Y = []
    for i, dir_name in enumerate(dir_names):
        class_dir = os.path.join(DATA_DIR, dir_name)
        if not os.path.isdir(class_dir):
            continue
        label, name = dir_name.split('.')
        label = int(label) - 1
        print("{}/{} {} ".format(i, len(dir_names), dir_name), end='\r')
        for img_name in os.listdir(class_dir):
            img_path = os.path.join(class_dir, img_name)
            if not os.path.isfile(img_path):
                continue
            if not img_path.endswith('.jpg'):
                continue
            img = read_and_resize(img_path, shape=(IMAGE_SIZE, IMAGE_SIZE))
            X.append(img)
            Y.append(label)
    
    data_set = {
        'X': np.array(X),
        'Y': np.array(Y)
    }
                
    return data_set

def get_name_list():
    label_to_name = [None for i in range(NUM_CLASSES)]
    dir_names = os.listdir(DATA_DIR)
    for dir_name in dir_names:
        if not os.path.isdir(os.path.join(DATA_DIR, dir_name)):
            continue
        label, name = dir_name.split('.')
        label_to_name[int(label)-1] = name
    assert all(label_to_name)
    return label_to_name

def read_and_resize(img_path, shape=(224, 224)):
    img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
    return cv2.resize(img, shape)

def batch_loader(X, y, batch_size=64):

    data_size = X.shape[0]
    permutation = np.random.permutation(data_size)
    batch_permutation_indices = [permutation[i: i + batch_size] for i in range(0, data_size, batch_size)]
    for batch_permutation in batch_permutation_indices:
        yield X[batch_permutation], y[batch_permutation]
        
def change_range(X):
    """ 将 [0, 255] 变为 [-1, 1]"""
    X = np.divide(X, 127.5, dtype=np.float32) - 1
#     X -= np.mean(X, axis=0)
#     X /= np.std(X, axis=0)
    return X

In [None]:
sometimes = lambda aug: iaa.Sometimes(0.5, aug)

seq = iaa.Sequential(
    [
        # apply the following augmenters to most images
        iaa.Fliplr(0.5), # horizontally flip 50% of all images
        # crop images by -5% to 10% of their height/width
        sometimes(iaa.CropAndPad(
            percent=(-0.05, 0.1),
            pad_mode=ia.ALL,
            pad_cval=(0, 255)
        )),
        sometimes(iaa.Affine(
            scale={"x": (0.9, 1.1), "y": (0.9, 1.1)}, # scale images to 80-120% of their size, individually per axis
            translate_percent={"x": (-0.1, 0.1), "y": (-0.1, 0.1)}, # translate by -20 to +20 percent (per axis)
            rotate=(-10, 10), # rotate by -45 to +45 degrees
            shear=(-3, 3), # shear by -16 to +16 degrees
            order=[0, 1], # use nearest neighbour or bilinear interpolation (fast)
            cval=(0, 255), # if mode is constant, use a cval between 0 and 255
            mode=ia.ALL # use any of scikit-image's warping modes (see 2nd image from the top for examples)
        )),
        iaa.OneOf([
            iaa.GaussianBlur((0, 3.0)), # blur images with a sigma between 0 and 3.0
            iaa.AverageBlur(k=(2, 7)), # blur image using local means with kernel sizes between 2 and 7
            iaa.MedianBlur(k=(3, 7)), # blur image using local medians with kernel sizes between 2 and 7
        ]),
        iaa.OneOf([
            iaa.Dropout((0.01, 0.1), per_channel=0.5), # randomly remove up to 10% of the pixels
            iaa.CoarseDropout((0.03, 0.15), size_percent=(0.02, 0.05), per_channel=0.2),
        ]),
        sometimes(iaa.AdditiveGaussianNoise(loc=0, scale=(0.0, 0.05*255), per_channel=0.5)),
        iaa.Resize({"height":IMAGE_SIZE, "width":IMAGE_SIZE})
    ],
    random_order=True
)

In [None]:
num_epoch = 1000
batch_size = 16
learning_rate = 1e-4
num_epoch_for_lr_decay = 5

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3), weights=None, include_top=False)
# base_model.trainable = False
inputs = tf.keras.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
x = inputs
x = base_model(x)
# x = tf.keras.layers.Flatten()(x)
# x = tf.keras.layers.Dense(NUM_CLASSES)(x)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(NUM_CLASSES)(x)
outputs = tf.keras.layers.Softmax()(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.summary()

In [None]:
# blocks = [6, 12, 24, 16]
# theta = np.array([[1., 0, 0], [0, 1., 0]], dtype=np.float32)
# theta = theta.flatten()

inputs = tf.keras.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
x = inputs
x = mobilenet_v2.MobileNetV2()(x)
# x = tf.keras.layers.Flatten()(x)
# x = tf.keras.layers.Dense(NUM_CLASSES)(x)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(NUM_CLASSES)(x)
outputs = tf.keras.layers.Softmax()(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

model.summary()

In [None]:
model.save(MODEL_FILE)
model = tf.keras.models.load_model(MODEL_FILE, custom_objects=get_custom_objects())

# Load Data

In [None]:
data_set = read_images()
X, y = data_set['X'], data_set['Y']
# X = change_range(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=SEED)
train_size, test_size = len(X_train), len(X_test)
name_list = get_name_list()

In [None]:
step_per_epoch = train_size // batch_size

lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
    learning_rate,
    decay_steps=step_per_epoch*num_epoch_for_lr_decay,
    decay_rate=1,
    staircase=False)
optimizer = tf.keras.optimizers.Adam(lr_schedule)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

# Training

In [None]:
@tf.function
def train_on_batch(X_batch, y_batch):
    with tf.GradientTape() as tape:
        y_pred = model(X_batch, training=True)
        loss = loss_object(y_true=y_batch, y_pred=y_pred)
        loss = tf.reduce_mean(loss)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
    
    train_loss(loss)
    train_accuracy(y_batch, y_pred)
    return loss

@tf.function
def test_on_batch(X_batch, y_batch):
    y_pred = model(X_batch, training=False)
    t_loss = loss_object(y_batch, y_pred)
    
    test_loss(t_loss)
    test_accuracy(y_batch, y_pred)
    return t_loss

for epoch in range(num_epoch):
    
    train_loss.reset_states()
    train_accuracy.reset_states()
    test_loss.reset_states()
    test_accuracy.reset_states()
    
    for batch_index, (X_batch, y_batch) in enumerate(batch_loader(X_train, y_train, batch_size=batch_size)):
        X_batch = np.array(seq(images=X_batch))
        X_batch = change_range(X_batch)
        loss = train_on_batch(X_batch, y_batch)
        template = '[Training] Epoch {}, Batch {}/{}, Loss: {}, Accuracy: {:.2%} '
        print(template.format(epoch+1,
                              batch_index,
                              train_size // batch_size,
                              loss,
                              train_accuracy.result()),
             end='\r')

    for batch_index, (X_batch, y_batch) in enumerate(batch_loader(X_test, y_test, batch_size=batch_size)):
        X_batch = change_range(X_batch)
        loss = test_on_batch(X_batch, y_batch)
        template = '[Testing] Epoch {}, Batch {}/{}, Loss: {}, Accuracy: {:.2%} '
        print(template.format(epoch+1,
                              batch_index,
                              test_size // batch_size,
                              loss,
                              test_accuracy.result()),
             end='\r')
        
    template = 'Epoch {}, Loss: {}, Accuracy: {:.2%}, Test Loss: {}, Test Accuracy: {:.2%} '
    print(template.format(epoch+1,
                         train_loss.result(),
                         train_accuracy.result(),
                         test_loss.result(),
                         test_accuracy.result()))
    
    model.save(MODEL_FILE)

# Testing

In [None]:
test_accuracy.reset_states()
for batch_index, (X_batch, y_batch) in enumerate(batch_loader(X_test, y_test, batch_size=batch_size)):
    X_batch_ori = X_batch # 
    X_batch = change_range(X_batch)
    y_pred = model(X_batch)
    test_accuracy(y_batch, y_pred)
    y_pred_label = np.array([np.argmax(one_hot) for one_hot in y_pred.numpy()])
    template = '[Testing] Batch {}/{}, Accuracy: {:.2%} '
    print(template.format(batch_index,
                          test_size // batch_size,
                          test_accuracy.result()),
             end='\r')
#     for i in range(X_batch.shape[0]):
#         plt.imshow(X_batch_ori[i])
#         plt.title('Real: {}, Predict: {}'.format(name_list[y_batch[i]], name_list[y_pred_label[i]]))
#         plt.show()
        
template = 'Test Accuracy: {:.2%} '
print(template.format(test_accuracy.result()))

In [None]:
img_path = 'data/TestSet/163.playing-card/Test_163_0009.jpg'
img_path = 'data/TestSet/096.hammock/Test_096_0002.jpg'
img = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)
# img = img[0:200,0:200]
img = cv2.resize(img, (224, 224))
X_batch_ori = np.array([img])
X_batch = change_range(X_batch_ori)
y_pred = model(X_batch)
y_pred_label = np.array([np.argmax(one_hot) for one_hot in y_pred.numpy()])
plt.imshow(img)
plt.title(' Predict: {}'.format(name_list[y_pred_label[0]]))
plt.show()

X_batch_trans = transform_layer(X_batch)
plt.imshow((X_batch_trans[0] + 1)/2)
plt.show()
print(transform_layer.get_dense_weights())