In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [2]:
from keras.layers import Input, Lambda, BatchNormalization, Conv2D, Reshape, Dense,\
                         Dropout, Activation, Flatten, LeakyReLU, Add, MaxPooling2D,\
                         GlobalMaxPooling2D, Subtract
from keras.losses import categorical_crossentropy
from keras.models import Model
from keras import backend as K
from keras.optimizers import Adam

from pathlib import Path
import numpy as np
from src.data.dataset import load_ferg
from src import PROJECT_ROOT
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '0'

Using TensorFlow backend.


In [3]:
loader = load_ferg()
(x_train, y_train, p_train), (x_test, y_test, p_test) = loader.load_data()
num_y, num_p = loader.get_num_classes()
input_shape = x_train[0].shape
print(f'{num_y}, {num_p}, {input_shape}')

7, 6, (64, 64, 3)


In [4]:
def empty_loss(y_true, y_pred):
    return y_pred
def make_trainable(net, val):
    net.trainable = val
    for l in net.layers:
        l.trainable = val
def show_model(model):
    print('-'*80)
    print(model.summary())
    print(model.metrics_names)
    print('-'*80)

In [14]:
def build_decoder(num_classes, feature_dim=128):
    x_in = Input((feature_dim,))
    out = Dense(num_classes, activation='softmax')(x_in)
    model = Model(x_in, out)
    return model
def build_encoder(input_shape, z_dim=128):
    x_in = Input(input_shape)
    x = x_in
    field_size = 8
    for i in range(3):
        x = Conv2D(int(z_dim / 2**(2-i)),
                   kernel_size=(field_size, field_size),
                   padding='SAME')(x)
        x = BatchNormalization()(x)
        x = LeakyReLU(0.2)(x)
        x = MaxPooling2D((2, 2))(x)
    x = GlobalMaxPooling2D()(x)
    return Model(x_in, x)
def build_classifier(input_shape, encoder, decoder):
    x_in = Input(input_shape)
    z = encoder(x_in)
    y_pred = decoder(z)
    return Model(x_in, y_pred)
def evaluate_encoder(train_data, test_data, feature_dim, num_classes, batch_size=256, num_epochs=20):
    decoder = build_decoder(num_classes, feature_dim)
    x_train, y_train = train_data
    x_test, y_test = test_data
    decoder.compile(optimizer=Adam(1e-3), loss='categorical_crossentropy', metrics=['accuracy'])
    history = decoder.fit(x=x_train, y=y_train, epochs=num_epochs,batch_size=batch_size,\
                validation_data=(x_test, y_test),verbose=1)
    return np.max(history.history['val_acc'])
def train(train_data, test_data, input_shape, num_y, num_p, num_epochs=20, batch_size=128, dry_run=False,\
          load_weights=False, model_path='./best_model.h5'):
    x_train, y_train, p_train = train_data
    x_test, y_test, p_test = test_data
    encoder = build_encoder(input_shape)
    decoder_y = build_decoder(num_y)
    model = build_classifier(input_shape, encoder, decoder_y)
    model.compile(optimizer=Adam(1e-3), loss='categorical_crossentropy', metrics=['accuracy'])
    if dry_run:
        show_model(encoder)
        show_model(decoder_y)
        show_model(model)
        return
    if load_weights:
        model.load_weights(model_path)
    else:
        model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=num_epochs, batch_size=batch_size)
        model.save_weights(model_path)
    z_train = model.predict(x_train)
    z_test = model.predict(x_test)
    #acc = evaluate_encoder((z_train, p_train), (z_test, p_test), feature_dim=z_train.shape[1], num_classes=num_p)
    acc = evaluate_encoder((z_train, y_train), (z_test, y_test), feature_dim=z_train.shape[1], num_classes=num_y)
    print(f'{acc}')
train((x_train, y_train, p_train), (x_test, y_test, p_test), input_shape, num_y, num_p, num_epochs=5, load_weights=True, dry_run=False)

Train on 47401 samples, validate on 8365 samples
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
1.0
