# VGG16 with Gabor filters

In [None]:
from keras.applications.vgg16 import VGG16
weights = None  # 'imagenet'
input_shape = (224, 224, 3)  # (224, 224, 16)
classes = 10
model = VGG16(include_top=True, weights=weights, input_tensor=None, input_shape=input_shape, pooling=None, classes=classes)

In [None]:
model.summary()

In [None]:
from keras.layers import InputLayer
model.layers.pop(0)
gabor_input = InputLayer(input_shape=(224, 224, 32))
gabor_output = model(gabor_input)
gabor_model = Model(gabor_input, gabor_output)

In [None]:
import numpy as np
import cv2
from keras import backend as K
from keras.layers import Lambda
from keras import Model
import tensorflow as tf

def get_kernel_tensor(ksize, sigmas, thetas, lambdas, gammas, psis):
#     size = (25, 25)  # [(5, 5), (15, 15), (25, 25)]
#     sigmas = [2, 4]
#     thetas = np.linspace(0, 2*np.pi, 8, endpoint=False)  # [0, np.pi/4, np.pi/2, np.pi*3/4]
#     lambdas = [2, 4, 8, 16]
#     psi = np.pi/2
#     gamma = 0.5
    n_kernels = len(sigmas) * len(thetas) * len(lambdas) * len(gammas)
    gabors = []
    for sigma in sigmas:
        for theta in thetas:
            for lambd in lambdas:
                for gamma in gammas:
                    for psi in psis:
                        params = {'ksize': ksize, 'sigma': sigma, 'theta': theta, 'lambd': lambd, 'gamma': gamma, 'psi': psi}
                        gf = cv2.getGaborKernel(**params, ktype=cv2.CV_32F)
                        gf = K.expand_dims(gf, -1)
                        gabors.append(gf)
    assert len(gabors) == n_kernels
    print(f"Created {n_kernels} kernels.")
    return K.stack(gabors, axis=-1)
#     print(img.shape)
#     print(gft.shape)

ksize = (25, 25)
sigmas = [2, 4]
thetas = np.linspace(0, 2*np.pi, 8, endpoint=False)  # [0, np.pi/4, np.pi/2, np.pi*3/4]
lambdas = [2, 4, 8, 16]
psis = [np.pi/2]
gammas = [0.5]
gft = get_kernel_tensor(ksize, sigmas, thetas, lambdas, gammas, psis)

def gabor_filter(x, kernel_tensor=None):
    '''
    conv2d
    input tensor of shape [batch, in_height, in_width, in_channels]
    kernel tensor of shape [filter_height, filter_width, in_channels, out_channels]
    '''
    x = tf.image.rgb_to_grayscale(x)
    return K.conv2d(x, kernel_tensor, padding='same')

def lambda_output_shape(input_shape):
    return input_shape

# gabor_layer = Lambda(gabor_filter, mask=None, arguments={'kernel_tensor': gft})  #, output_shape=lambda_output_shape)

In [None]:
layers = [l for l in model.layers]
x = layers[0].output
x = Lambda(gabor_filter, arguments={'kernel_tensor': gft})(x)
for l in range(2, len(layers)):
    x = layers[l](x)

GaborNet = Model(inputs=layers[0].input, outputs=x)
GaborNet.summary()

In [None]:
import os
import sys
import argparse
import json

import numpy as np
from matplotlib import pyplot as plt
from tqdm import tqdm
import keras
from keras import backend as K
from keras import activations, initializers, regularizers, constraints, metrics
from keras.datasets import cifar10
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential, Model
from keras.layers import (Dense, Dropout, Activation, Flatten, Reshape, Layer,
                          BatchNormalization, LocallyConnected2D,
                          ZeroPadding2D, Conv2D, MaxPooling2D, Conv2DTranspose,
                          GaussianNoise, UpSampling2D, Input)
from keras.engine import Layer, InputSpec
from keras.utils import conv_utils, multi_gpu_model
from keras.legacy import interfaces

from keras.layers import Lambda
import tensorflow as tf
import cv2
from keras.applications.vgg16 import VGG16


def get_kernel_tensor(ksize, sigmas, thetas, lambdas, gammas, psis):

    n_kernels = len(sigmas) * len(thetas) * len(lambdas) * len(gammas)
    gabors = []
    for sigma in sigmas:
        for theta in thetas:
            for lambd in lambdas:
                for gamma in gammas:
                    for psi in psis:
                        params = {'ksize': ksize, 'sigma': sigma, 'theta': theta, 'lambd': lambd, 'gamma': gamma, 'psi': psi}
                        gf = cv2.getGaborKernel(**params, ktype=cv2.CV_32F)
                        gf = K.expand_dims(gf, -1)
                        gabors.append(gf)
    assert len(gabors) == n_kernels
    print(f"Created {n_kernels} kernels.")
    return K.stack(gabors, axis=-1)


def gabor_filter(x, kernel_tensor=None):
    '''
    conv2d
    input tensor of shape [batch, in_height, in_width, in_channels]
    kernel tensor of shape [filter_height, filter_width, in_channels, out_channels]
    '''
    # x = tf.image.rgb_to_grayscale(x)
    return K.conv2d(x, kernel_tensor, padding='same')


def lambda_output_shape(input_shape):
    return input_shape


weights = None  # 'imagenet'
input_shape = (32, 32, 1) # (224, 224, 3)  # (224, 224, 16)

save_dir = os.path.join(os.getcwd(), 'results')  # TODO: /workspace/results
data_set = 'pixel'
data_root = '/workspace/data/pixel/small'  # TODO: Pass in
stimulus_set = 'jitter'  # 'static'  # 'set_32_32'
noise_types = ['Original', 'Salt-and-pepper', 'Additive', 'Single-pixel']
pretrained_model = False
data_augmentation = False
task = 'classification'
fresh_data = True
n_gpus = 1
epochs = 20

# Gabor filter parameters
ksize = (25, 25)
sigmas = [2, 4]
thetas = np.linspace(0, 2*np.pi, 8, endpoint=False)  # [0, np.pi/4, np.pi/2, np.pi*3/4]
lambdas = [8, 16, 32, 64]
psis = [np.pi/2]
gammas = [0.5]

# fresh_data = True
batch_size = 64
num_classes = 10

for noise_type in noise_types:

    model_name = f"{data_set}_{stimulus_set}_{noise_type}"
    print(model_name)

    if not os.path.isdir(save_dir):
        os.makedirs(save_dir)
    model_path = os.path.join(save_dir, model_name)


    # if data_set == 'cifar10':
    #     (x_train, y_train), (x_test, y_test) = cifar10.load_data()
    #     x_train = np.mean(x_train, 3, keepdims=True)  # Average over RGB channels
    #     x_test = np.mean(x_test, 3, keepdims=True)  # Average over RGB channels
    #     x_train = x_train.astype('float32')
    #     x_test = x_test.astype('float32')
    #     x_train /= 255
    #     x_test /= 255

    #     # Convert class vectors to binary class matrices.
    #     y_train = keras.utils.to_categorical(y_train, num_classes)
    #     y_test = keras.utils.to_categorical(y_test, num_classes)

    # elif data_set == 'pixel':


    test_conditions = ['Same', 'Diff', 'NoPix']

    #   (noise_type, trial) = trial_label.split("_")

    if noise_type == 'Original':
        data_path = os.path.join(data_root, stimulus_set, 'orig')
    elif noise_type == 'Salt-and-pepper':
        data_path = os.path.join(data_root, stimulus_set, 'salt_n_pepper')
    elif noise_type == 'Additive':
        data_path = os.path.join(data_root, stimulus_set, 'uniform')
    elif noise_type == 'Single-pixel':
        data_path = os.path.join(data_root, stimulus_set, 'single_pixel')
    else:
        sys.exit(f"Unknown noise type requested: {noise_type}")

    def load_images(path):

        image_set = {}
        for root, dirs, files in os.walk(path):
            if root == path:
                categories = sorted(dirs)
                image_set = {cat: [] for cat in categories}
            else:
                image_set[os.path.basename(root)] = sorted(files)

        n_cat_images = {cat: len(files) for (cat, files) in image_set.items()}
        n_images = sum(n_cat_images.values())
        image_dims = plt.imread(os.path.join(path, categories[0],
                                image_set[categories[0]][0])).shape

        print(image_dims)
        # X = np.zeros((n_images, *image_dims), dtype='float32')
        X = np.zeros((n_images, image_dims[0], image_dims[1], 1), dtype='float32')
        y = np.zeros((n_images, len(categories)), dtype=int)
        # y = np.zeros(n_images, dtype=int)

        tally = 0
        for c, (cat, files) in enumerate(tqdm(image_set.items(), desc=path)):
            for i, image in enumerate(files):
                cimg = plt.imread(os.path.join(path, cat, image))
                X[i+tally] = np.expand_dims(cv2.cvtColor(cimg, cv2.COLOR_BGR2GRAY), axis=-1)
            y[tally:tally+len(files), c] = True
            tally += len(files)

        shuffle = np.random.permutation(y.shape[0])

        return image_set, X[shuffle], y[shuffle]

    train_path = os.path.join(data_path, 'train')
    # test_path = os.path.join(data_path, f"test_{noise_cond.lower()}")

    if os.path.isfile(os.path.join(train_path, 'x_train.npy')) and not fresh_data:
        print(f'Loading {data_set} data arrays.')
        x_train = np.load(os.path.join(train_path, 'x_train.npy'))
        y_train = np.load(os.path.join(train_path, 'y_train.npy'))
        # num_classes = len(os.listdir(train_path)) - 1
        cat_dirs = [os.path.join(train_path, o) for o in os.listdir(train_path)
                    if os.path.isdir(os.path.join(train_path, o))]
        assert num_classes == len(cat_dirs)
    else:
        print(f'Loading {data_set} image files.')
        train_images, x_train, y_train = load_images(train_path)
        print(train_images.keys())
        assert num_classes == len(train_images)
        np.save(os.path.join(train_path, 'x_train.npy'), x_train)
        np.save(os.path.join(train_path, 'y_train.npy'), y_train)

    # x_train = np.mean(x_train, 3, keepdims=True)  # Average over RGB channels

    test_sets = []
    for test_cond in test_conditions:
        test_path = os.path.join(data_path, f"test_{test_cond.lower()}")
        if os.path.isfile(os.path.join(test_path, 'x_test.npy')) and not fresh_data:
            x_test = np.load(os.path.join(test_path, 'x_test.npy'))
            y_test = np.load(os.path.join(test_path, 'y_test.npy'))
        else:
            test_images, x_test, y_test = load_images(test_path)
            print(test_images.keys())
            assert num_classes == len(test_images)
            np.save(os.path.join(test_path, 'x_test.npy'), x_test)
            np.save(os.path.join(test_path, 'y_test.npy'), y_test)
        # test_sets.append((np.mean(x_test, 3, keepdims=True), y_test))
        test_sets.append((x_test, y_test))
    test_cond = "NoPix"  # Use this for examining learning curves
    x_test, y_test = test_sets[test_conditions.index("NoPix")]  # Unpack default test set
    # else:
    #     sys.exit(f"Unknown data set requested: {data_set}")

    # Summarise stimuli
    print('x_train shape:', x_train.shape)
    print(x_train.shape[0], 'train samples')
    print(x_test.shape[0], 'test samples')
    print(y_train.shape[1], 'training categories')
    print(y_test.shape[1], 'testing categories')


    # Import VGG16
    model = VGG16(include_top=True, weights=weights, input_tensor=None, input_shape=input_shape, pooling=None, classes=num_classes)

    # Generate Gabor filters
    gft = get_kernel_tensor(ksize, sigmas, thetas, lambdas, gammas, psis)

    # Modify standard VGG16 with hardcoded Gabor convolutional layer
    layers = [l for l in model.layers]
    # x = layers[0].output
    inp = Input(shape=x_train[0].shape)
    x = Lambda(gabor_filter, arguments={'kernel_tensor': gft})(inp)
    for l in range(2, len(layers)):
        x = layers[l](x)

    model = Model(inputs=inp, outputs=x)
    # model.summary()


    if pretrained_model:
        # Load weights from saved model
        pretrained_model_path = os.path.join(save_dir, pretrained_model)
        model.load_weights(pretrained_model_path, by_name=True)

        # Freeze weights in convolutional layers during training
        for layer in model.layers:
            if isinstance(layer, keras.layers.convolutional.Conv2D):
                print(f"Freezing layer: {layer.name}")
                layer.trainable = False

    if n_gpus > 1:
        model = multi_gpu_model(model, gpus=n_gpus)

    # initiate RMSprop optimizer
    opt = keras.optimizers.rmsprop(lr=0.0001, decay=1e-6)

    # Compile the model last before training for all changes to take effect
    model.compile(loss='categorical_crossentropy',
                  optimizer=opt,
                  metrics=['accuracy'])

    model.summary()

    if not data_augmentation:
        print('Not using data augmentation.')
        if task == 'classification':
            hist = model.fit(x_train, y_train,
                             batch_size=batch_size,
                             epochs=epochs,
                             validation_data=(x_test, y_test),
                             shuffle=True)

    else:
        print('Using data augmentation.')
        datagen = ImageDataGenerator(
            featurewise_center=False,
            samplewise_center=False,
            featurewise_std_normalization=False,
            samplewise_std_normalization=False,
            zca_whitening=False,
            rotation_range=0,
            width_shift_range=0.1,
            height_shift_range=0.1,
            horizontal_flip=True,
            vertical_flip=False)

        datagen.fit(x_train)

        if task == 'classification':
            hist = model.fit_generator(datagen.flow(x_train, y_train,
                                                    batch_size=batch_size),
                                       steps_per_epoch=int(np.ceil(x_train.shape[0] / float(batch_size))),
                                       epochs=epochs,
                                       validation_data=(x_test, y_test),
                                       workers=4)

    print('History', hist.history)

    # Save model and weights
    if not os.path.isdir(save_dir):
        os.makedirs(save_dir)
    # model_name = 'SAVED'+'_'+model_name
    model_path = os.path.join(save_dir, model_name)
    # model.save(model_path)
    np.save(os.path.join(save_dir, f'{model_name}_VALACC.npy'), hist.history['val_acc'])
    np.save(os.path.join(save_dir, f'{model_name}_ACC.npy'), hist.history['acc'])
    np.save(os.path.join(save_dir, f'{model_name}_VALLOSS.npy'), hist.history['val_loss'])
    np.save(os.path.join(save_dir, f'{model_name}_LOSS.npy'), hist.history['loss'])

    if data_set == 'pixel':
        cond_acc = {}
        cond_loss = {}
        for test_cond, (x_test, y_test) in zip(test_conditions, test_sets):
            loss, val_acc = model.evaluate(x=x_test, y=y_test, batch_size=batch_size)
            cond_acc[test_cond] = val_acc
            cond_loss[test_cond] = loss
        print("Saving metrics: ", model.metrics_names)
        with open(os.path.join(save_dir, f'{model_name}_CONDVALACC.json'), "w") as jf:
            json.dump(cond_acc, jf)
        with open(os.path.join(save_dir, f'{model_name}_CONDVALLOSS.json'), "w") as jf:
            json.dump(cond_loss, jf)

    print(f'Saved trained model at {model_path}')


# Plot performance

In [None]:
# Load accuracy scores and plot

%matplotlib inline
import json
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns

# Defaults
# filter_size = 9
# retina_layers = 2
# vvs_width = 32

# data_set = 'pixel'
# stimulus_set = 'static'
n_epochs = 20
num_trials = 1
noise_types = ['Original', 'Salt-and-pepper', 'Additive', 'Single-pixel']  # 'Original'


rows = []
test_rows = []

for trial in range(1, 1+num_trials):
    for noise_type in noise_types:
#         trial_label = f"{noise_type}_{trial}"
#         model_name = f"{data_set}_{trial_label}_vvs_layers={vvs_layers}_retina_out_channels={retina_out_width}"

        model_name = f"{data_set}_{stimulus_set}_{noise_type}"
        acc_scores = np.load(os.path.join('results', f'{model_name}_ACC.npy'))
        valacc_scores = np.load(os.path.join('results', f'{model_name}_VALACC.npy'))
        loss = np.load(os.path.join('results', f'{model_name}_LOSS.npy'))
        valloss = np.load(os.path.join('results', f'{model_name}_VALLOSS.npy'))

        with open(os.path.join('results', f'{model_name}_CONDVALACC.json'), "r") as jf:
            cond_acc = json.load(jf)
        with open(os.path.join('results', f'{model_name}_CONDVALLOSS.json'), "r") as jf:
            cond_loss = json.load(jf)

        for condition in test_conditions:
            test_rows.append({'Trial': trial, 'Noise Type': noise_type,
                             'Condition': condition, 'Loss': cond_loss[condition], 
                              'Accuracy': cond_acc[condition]})
        for epoch in range(n_epochs):
            rows.append({'Trial': trial, 'Noise Type': noise_type, #'Noise Condition': noise_cond, 
                         'Evaluation': 'Testing', 'Epoch': epoch+1, 'Loss': valloss[epoch], 
                         'Accuracy': valacc_scores[epoch]})

            rows.append({'Trial': trial, 'Noise Type': noise_type, #'Noise Condition': noise_cond, 
                         'Evaluation': 'Training', 'Epoch': epoch+1, 'Loss': loss[epoch], 
                         'Accuracy': acc_scores[epoch]})

scores = pd.DataFrame(rows, columns=['Trial', 'Noise Type', 'Evaluation', 'Epoch', 'Loss', 'Accuracy'])
test_scores = pd.DataFrame(test_rows, columns=['Trial', 'Noise Type', 'Condition', 'Loss', 'Accuracy'])
# scores
# test_scores

In [None]:
test_scores

In [None]:
# fig = plt.figure()
# g = sns.relplot(x='Epoch', y='Accuracy', style='Retina Out Width', hue='Evaluation', row='VVS Layers', col='Noise Type', kind='line', data=scores)
g = sns.relplot(x='Epoch', y='Accuracy', hue='Evaluation', col='Noise Type', kind='line', data=scores)
# g = sns.relplot(x='Epoch', y='Loss', style='Retina Out Width', hue='Evaluation', row='VVS Layers', col='Noise Type', kind='line', data=scores)

In [None]:
fig = plt.figure(figsize=(16,6))
# g = sns.catplot(x="Condition", y="Accuracy", hue="Retina Out Width", row="VVS Layers", col="Noise Type", kind="bar", data=test_scores)
g = sns.catplot(x="Condition", y="Accuracy", col="Noise Type", kind="bar", data=test_scores)
# g = sns.catplot(x="Condition", y="Loss", hue="Retina Out Width", row="VVS Layers", col="Noise Type", kind="bar", data=test_scores)

In [None]:
# from keras.layers import InputLayer
model.layers.pop(1)  # First convolutional layer
gabor_input = InputLayer(input_shape=(224, 224, 32))
gabor_output = model(gabor_input)
gabor_model = Model(gabor_input, gabor_output)

In [None]:
from matplotlib import pyplot as plt
# import cv2
# from keras import backend as K
# import numpy as np

image_path = "/workspace/data/Lenna.png"
# img = image.load_img(image_path)
img = plt.imread(image_path)
print(img.shape)
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
print(type(img))
img = K.expand_dims(img, 0)
img = K.expand_dims(img, -1)

gabor_filter(img, kernel_tensor=gft)

In [None]:
import tensorflow as tf
with tf.Session().as_default():
    plt.imshow(gabor_filter(img, kernel_tensor=gft).eval()[0,:,:,-1])

In [None]:
type(gabor_filters)

In [None]:
from keras.utils.vis_utils import plot_model
plot_model(model, to_file='vgg.png')