In [1]:
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
import random
import pandas as pd

from keras.datasets import mnist
from keras.models import Model, Sequential
from keras.layers import Input, Conv2D, Flatten, Dense, Dropout, Lambda, Add, Subtract, MaxPooling2D
from keras.optimizers import RMSprop
from keras import backend as K
from keras.regularizers import l2
from numpy.random import permutation
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt

import warnings
warnings.filterwarnings('ignore')

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [132]:
np.random.seed(12345)

In [133]:
plt.rcParams["figure.figsize"] = (14,8)

In [134]:
num_classes = 10
epochs = 10

class_1 = 1
class_2 = 1
n_same_class_samples = 5000
n_diff_class_samples = 1500  # 3000
n_hybrid_class_samples = 1750

classes_to_train = [0,1,2,3,4,5,6,7,8,9]

In [135]:
def generate_example_pairs(data, indices, class_1, class_2):
    class_1_indices = indices[class_1]
    class_2_indices = indices[class_2]

    image_0 = data[np.random.choice(class_1_indices)]
    image_1 = data[np.random.choice(class_2_indices)]
    # return [image_0, image_1]
    return [image_0.reshape(28,28,1), image_1.reshape(28,28,1)]


def generate_examples(data, indices, class_1, class_2):
    same_classes = [generate_example_pairs(data, indices, class_1, class_1) for _ in range(n_same_class_samples)]
    different_classes = [generate_example_pairs(data, indices,
                                                       np.random.choice(list(set(classes_to_train) - {class_1})),
                                                       np.random.choice(list(set(classes_to_train) - {class_2})))
                                for _ in range(n_diff_class_samples)]
    hybrid_classes = [generate_example_pairs(data, indices, class_1, np.random.choice(list(set(classes_to_train) - {class_2}))) for _ in range(n_hybrid_class_samples)] + \
                     [generate_example_pairs(data, indices, class_2, np.random.choice(list(set(classes_to_train) - {class_1}))) for _ in range(n_hybrid_class_samples)]

    images = same_classes + different_classes + hybrid_classes
    labels = [1] * n_same_class_samples + [0] * (n_diff_class_samples + n_hybrid_class_samples + n_hybrid_class_samples)
    return np.array(images), np.array(labels)

In [136]:
# https://keras.io/examples/mnist_siamese/
def create_base_network(input_shape):
    input = Input(shape=input_shape)
    x = Flatten()(input)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.1)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(0.1)(x)
    x = Dense(128, activation='relu')(x)
    return Model(input, x)

In [137]:
def create_conv_network(input_shape):
    convnet = Sequential()
    convnet.add(Conv2D(32,(3,3),padding="same",activation='relu',input_shape=input_shape))
    convnet.add(MaxPooling2D())
    convnet.add(Conv2D(64,(7,7), padding="same", activation='relu'))
    convnet.add(MaxPooling2D())
    convnet.add(Conv2D(64,(4,4), padding = "same", activation='relu'))
    convnet.add(Flatten())
    convnet.add(Dense(64,activation="sigmoid"))
    return convnet

In [138]:
def contrastive_loss(y_true, y_pred):
    '''Contrastive loss from Hadsell-et-al.'06
    http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
    '''
    margin = 1
    square_pred = K.square(y_pred)
    margin_square = K.square(K.maximum(margin - y_pred, 0))
    #   return K.mean(y_true * square_pred + (1-y_true) * margin_square)
    return K.mean(y_true * margin_square + (1 - y_true) * square_pred)

In [139]:
def accuracy(y_true, y_pred):
    return K.mean(K.equal(y_true, K.cast(y_pred > 0.5, y_true.dtype)))

def recall_m(y_true, y_pred):
    y_pred = K.cast(y_pred > 0.5, y_true.dtype)

    true_positives = (K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = (K.round(K.clip(y_true, 0, 1)))
    recall = K.mean(K.equal(true_positives, possible_positives))
    return recall

def precision_m(y_true, y_pred):
    y_pred = K.cast(y_pred > 0.5, y_true.dtype)
    true_positives = (K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = (K.round(K.clip(y_pred, 0, 1)))
    precision = K.mean(K.equal(true_positives, predicted_positives))
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2 * ((precision * recall) / (precision + recall + K.epsilon()))

In [140]:
def compute_accuracy(y_true, y_pred, threshold = 0.5):
    pred = y_pred.ravel() > threshold
    return np.mean(pred == y_true)

In [141]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
input_shape = (28,28, 1)

# Class to Class SM

In [142]:
def C2C_SN(c1, c2):
    # create training+test positive and negative pairs
    training_digit_indices = {i:np.where(y_train == i)[0] for i in classes_to_train}
    training_pairs, training_label = generate_examples(x_train, training_digit_indices, c1, c2)

    # Use the classes that are being trained upon from the test data as the validation
    testing_digit_indices = {i:np.where(y_test == i)[0] for i in classes_to_train}
    testing_pairs, testing_label = generate_examples(x_test, testing_digit_indices, c1, c2)

    # network definition
    base_network = create_conv_network(input_shape)

    #base_network.summary()
    input_a = Input(shape=input_shape)
    input_b = Input(shape=input_shape) 

    processed_a = base_network(input_a)
    processed_b = base_network(input_b)

    # c2c lower model
    subtracted = Subtract()([processed_a, processed_b])
    x = Lambda(lambda val: abs(val))(subtracted)
    x = Dense(128, activation='relu')(x)
    x = Dense(128, activation='relu')(x)
    x = Dense(128, activation='relu')(x)
    x = Dense(128, activation='relu')(x)
    out = Dense(1, activation="sigmoid")(x)
    model = Model([input_a, input_b], out)

    rms = RMSprop()
    model.compile(loss=contrastive_loss, optimizer=rms, metrics=[accuracy, recall_m, precision_m, f1_m])
    model.fit([training_pairs[:,0], training_pairs[:,1]], training_label,
              batch_size=128,
              epochs=epochs,
              validation_data=([testing_pairs[:, 0], testing_pairs[:, 1]], testing_label))
    
    return model

In [143]:
pretrained_c2c_models = {class_1: C2C_SN(class_1, class_1) for class_1 in classes_to_train}

Train on 10000 samples, validate on 10000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Train on 10000 samples, validate on 10000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Train on 10000 samples, validate on 10000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Train on 10000 samples, validate on 10000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Train on 10000 samples, validate on 10000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Train on 10000 samples, validate on 10000 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Train on 10000 samples, validate o

In [144]:
attributes = pd.read_csv("mnist_attributes.csv")

In [145]:
digit_to_attribute_mapping = {digit : attributes[attributes.Digit == digit].values[0][1:] for digit in [0,1,2,3,4,5,6,7,8,9]}

In [146]:
len_attributes = len(attributes.columns) - 1

# Zero Shot Learning

In [154]:
def generate_example_pairs(data, indices, class_1, class_2):
    class_1_indices = indices[class_1]
    class_2_indices = indices[class_2]

    image_0 = data[np.random.choice(class_1_indices)]
    image_1 = data[np.random.choice(class_2_indices)]
    # return [image_0, image_1]
    return [image_0.reshape(28,28,1), image_1.reshape(28,28,1)]


def generate_examples(data, indices, labels):
    same_classes = [generate_example_pairs(data, indices, label, label) for label in labels]
    attributes = [digit_to_attribute_mapping[label] for label in labels]
    return np.array(same_classes), np.array(attributes)

In [38]:
def create_intermediate_model(model, input_shape):
    input_a = Input(shape=input_shape)
    input_b = Input(shape=input_shape) 

    tmp0 = Sequential()
    tmp0.add(model.layers[2])
    tmp0.trainable = False

    tmp1 = Sequential()
    tmp1.add(model.layers[2])
    tmp1.trainable = False

    processed_a = tmp0(input_a)
    processed_b = tmp1(input_b)

    out = Add()([processed_a, processed_b])
    return Model([input_a, input_b], out)

In [39]:
intermediate_models = {label: create_intermediate_model(model, input_shape) for label, model in pretrained_c2c_models.items()}

In [152]:
labels = np.random.choice(classes_to_train, size = 20000)

In [155]:
digit_indices = {i:np.where(y_train == i)[0] for i in classes_to_train}
pairs, attributes = generate_examples(x_train, digit_indices, labels)

In [156]:
zsl_training_classes = [0,1,2,3,4,5]
zsl_testing_classes = [6,7,8,9]
intermediate_representations_train, intermediate_representations_test = [],[]
attributes_train, attributes_test = [],[]
labels_train, labels_test = [],[]

for image_pairs, attribute, label in zip(pairs, attributes, labels):
    if label in zsl_training_classes:
        intermediate_representations_train.append(intermediate_models[label].predict([image_pairs[0].reshape(1,28,28,1), image_pairs[1].reshape(1,28,28,1)]))
        labels_train.append(label)
        attributes_train.append(attribute)
    elif label in zsl_testing_classes:
        intermediate_representations_test.append(intermediate_models[label].predict([image_pairs[0].reshape(1,28,28,1), image_pairs[1].reshape(1,28,28,1)]))
        labels_test.append(label)
        attributes_test.append(attribute)
        
train_validation_split = 0.8
intermediate_representations_train = np.array(intermediate_representations_train)
n_validation_examples = int(train_validation_split * len(intermediate_representations_train))

intermediate_representations_validation = intermediate_representations_train[:n_validation_examples]
intermediate_representations_train = intermediate_representations_train[n_validation_examples:]
intermediate_representations_test = np.array(intermediate_representations_test)

attributes_train = np.array(attributes_train)
attributes_validation = attributes_train[:n_validation_examples]
attributes_train = attributes_train[n_validation_examples:]
attributes_test = np.array(attributes_test)

labels_train = np.array(labels_train)
labels_validation = labels_train[:n_validation_examples]
labels_train = labels_train[n_validation_examples:]
labels_test = np.array(labels_test)

In [157]:
input_a = Input(shape = (1, 128))

out=Dense(7, activation='sigmoid')(input_a)
model = Model(input_a, out)

rms = RMSprop()
model.compile(loss = 'mean_squared_error', optimizer=rms)
model.fit(intermediate_representations_train, attributes_train.reshape(-1, 1,len_attributes),
              batch_size=128,
              epochs=10,
              validation_data=(intermediate_representations_validation, attributes_validation.reshape(-1, 1, len_attributes)))

Train on 2415 samples, validate on 9656 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.callbacks.History at 0x7f13d6fa38d0>

# Testing

In [158]:
def euclidean(a, b):
    return np.sqrt(np.sum((a-b)**2))

In [159]:
top_1 = 0
top_2 = 0

for representation, true_label in zip(intermediate_representations_test, labels_test):
    predicted_attribute = model.predict(representation.reshape(1,1,128))
    distances_from_attributes = {class_label: euclidean(predicted_attribute,digit_to_attribute_mapping[class_label]) for class_label in zsl_testing_classes}
    sorted_distances = sorted(distances_from_attributes.items(), key = lambda el: el[1])
    prediction = sorted_distances[0][0]
    if prediction == true_label:
        top_1 += 1
        top_2 += 1
    else:
        second_best = sorted_distances[1][0]
        if second_best == true_label:
            top_2 += 1 

In [160]:
top_1/len(intermediate_representations_test)

0.2445453398915374

In [161]:
top_2/len(intermediate_representations_test)

0.4942615714465885

# Testing on validation set

In [162]:
top_1 = 0
top_2 = 0

for representation, true_label in zip(intermediate_representations_validation, labels_validation):
    predicted_attribute = model.predict(representation.reshape(1,1,128))
    distances_from_attributes = {class_label: euclidean(predicted_attribute,digit_to_attribute_mapping[class_label]) for class_label in zsl_training_classes}
    sorted_distances = sorted(distances_from_attributes.items(), key = lambda el: el[1])
    prediction = sorted_distances[0][0]
    if prediction == true_label:
        top_1 += 1
        top_2 += 1
    else:
        second_best = sorted_distances[1][0]
        if second_best == true_label:
            top_2 += 1 

In [163]:
top_1/len(intermediate_representations_validation)

1.0

In [164]:
top_2/len(intermediate_representations_validation)

1.0