In [1]:
from IPython.display import display, Image
from sklearn.utils import shuffle

from keras.optimizers import Adam
from keras.regularizers import l2
from keras.layers import Input, Conv1D, BatchNormalization, Activation, MaxPooling1D, Flatten, Dense, Lambda, Dropout
from keras.models import Sequential, Model, load_model, model_from_json
from keras.utils import plot_model
from keras import backend as K

import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np

import itertools
import pickle
import keras
import time
import os

Using TensorFlow backend.


In [2]:
BALANCED = False
model_dir = os.path.join(os.getcwd(), "models")

## Utilities

In [3]:
def plot_confusion_matrix(cm, class_labels, model_name, cmap=plt.cm.Blues):
    diagram_dir = os.path.join(os.getcwd(), "images")
    if not os.path.exists(diagram_dir):
        os.makedirs(diagram_dir)
    
    stats_dir = os.path.join(os.getcwd(), "stats")
    if not os.path.exists(stats_dir):
        os.makedirs(stats_dir)
    
    # create the figure
    fig = plt.figure()
    fig.set_size_inches(18,15)
    
    # labels
    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    
    # tick marks
    class_count = len(class_labels)
    tick_marks = np.arange(class_count + 1)
    plt.xticks(tick_marks, class_labels, rotation=45)
    plt.yticks(tick_marks, class_labels)
    
    # normalization
    cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
    cm = np.around(cm, 6)
    
    # numbers
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, "{:.4f}".format(cm[i, j]), horizontalalignment="center", color="white" if cm[i,j] > thresh else "black")
    
    # color bar
    plt.imshow(cm, interpolation="nearest", cmap=cmap)
    plt.colorbar()
    
    # save and show
    np.set_printoptions(formatter={"float": lambda x: "{0:0.4f}".format(x)})
    plt.savefig(os.path.join(diagram_dir, model_name + "_confusion_matrix.png"), dpi=100)
    pd.DataFrame(cm).to_csv(os.path.join(stats_dir, model_name + "_confusion_matrix.txt"), header=None, index=None)
    plt.show(block=False)
    

def graph_history(loss_history, acc_history, n_iterations, interval):
    # create the figure
    fig, ax1 = plt.subplots()
    ax2 = ax1.twinx()
    
    # plot the stuff
    iterations = interval * (np.arange(n_iterations // interval) + 1)
    ax1.plot(iterations, loss_history, label="Loss", color="c")
    ax2.plot(iterations, acc_history, label="Val. Accuracy", color="m")
    
    # also, indicate the iteration with the best loss
    ax1.axvline(interval * (np.argmin(loss_history) + 1), color="k", linestyle="--")

    # pretty-fy the plot
    ax1.set_xlabel("Training Iteration")
    ax1.set_ylabel("Loss")
    ax2.set_ylabel("Accuracy")
    fig.legend()

    # plt.savefig(os.path.join(os.getcwd(), "images", "history.png"), dpi=100)
    plt.show()

    
def save_model(model, model_name):
    directory = os.path.join(model_dir)
    if not os.path.exists(directory):
        os.makedirs(directory)
    
    # save architecture
    with open(os.path.join(directory, model_name + ".json"), "w+") as f:
        f.write(model.to_json())
    
    # save weights
    model.save_weights(os.path.join(directory, model_name + "_weights.h5"))

def load_model(model_name):
    directory = os.path.join(model_dir)

    if os.path.exists(directory):
        # load the architecture
        json_file = open(os.path.join(directory, model_name + ".json"))
        model = model_from_json(json_file.read())
        model.compile(loss="categorical_crossentropy", optimizer="Adam", metrics=["accuracy"])
        json_file.close()

        # load the weights and return the model
        model.load_weights(os.path.join(directory, model_name + "_weights.h5"))
        return model

## Load data

In [4]:
# root path
filename = "data/snapshots/unfiltered_snapshots.csv"

# load the csv
print("Loading snapshots...", end=" ")
col_names = pd.read_csv(filename, nrows=0).columns
types_dict = {"Vehicle": int, "Capture": int}
types_dict.update({col: int for col in col_names if col not in types_dict})
df = pd.read_csv(filename, dtype=types_dict)  
print("done!")

display(df.sample(frac=0.0001).head(5))

Loading snapshots... done!


Unnamed: 0,Vehicle,Capture,Data 1,Data 2,Data 3,Data 4,Data 5,Data 6,Data 7,Data 8,...,Data 1015,Data 1016,Data 1017,Data 1018,Data 1019,Data 1020,Data 1021,Data 1022,Data 1023,Data 1024
115565,3,14,0,0,0,0,13,176,216,50,...,80,10,109,0,0,9,0,0,14,96
293329,109,109,76,32,0,36,109,0,0,84,...,0,84,0,1,0,0,0,19,242,27
283066,105,105,39,178,38,212,39,188,0,0,...,0,184,0,0,0,0,0,0,0,0
56658,3,8,22,0,127,226,128,8,30,128,...,38,30,58,30,229,32,0,96,228,255
13759,3,4,255,0,255,255,0,0,128,0,...,150,0,127,226,80,0,34,176,123,168


In [5]:
# filter down to Stone's vehicles
df = df[df["Vehicle"] > 100]

## Split into known/unknown sets

In [6]:
# randomly select known vehicles
all_vehicles = df["Vehicle"].unique()
known_vehicles = np.random.choice(all_vehicles, size=int(all_vehicles.shape[0] * 0.75), replace=False)

# balance the dataset
if BALANCED:
    min_size = np.min(df["Vehicle"].value_counts())
    balanced_df = df.groupby("Vehicle", group_keys=False).apply(pd.DataFrame.sample, n=min_size)

# pick the desired dataframe
samples = df if not BALANCED else balanced_df

# split the dataframe into known/unknown
known_df = samples[samples["Vehicle"].isin(known_vehicles)]
unknown_df = samples[~samples["Vehicle"].isin(known_vehicles)]

# save these
known_vehicles = known_df["Vehicle"].unique()
unknown_vehicles = unknown_df["Vehicle"].unique()

print("Known vehicles   :", known_vehicles)
print("Unknown vehicles :", unknown_vehicles)

Known vehicles   : [101 102 103 104 106 107 108 110]
Unknown vehicles : [105 109 111]


In [7]:
# split into (x, y) sets
known_x = known_df.drop(columns=["Vehicle", "Capture"])
known_y = known_df["Vehicle"]

unknown_x = unknown_df.drop(columns=["Vehicle", "Capture"])
unknown_y = unknown_df["Vehicle"]

# scale the inputs
known_x /= 255
unknown_x /= 255

# add the extra dimensions
known_x = np.expand_dims(known_x, axis=2)
unknown_x = np.expand_dims(unknown_x, axis=2)

In [8]:
# restructure the known vehicles
print("Training set")
train_x = []

for vehicle in known_vehicles:
    samples = []
    indices = np.where(known_y == vehicle)[0]
    
    for idx in indices:
        samples.append(known_x[idx])
    
    train_x.append(np.stack(samples))
    print(train_x[-1].shape)

# restructure the unknown vehicles
print("\nTesting set")
test_x = []

for vehicle in unknown_vehicles:
    samples = []
    indices = np.where(unknown_y == vehicle)[0]
    
    for idx in indices:
        samples.append(unknown_x[idx])
    
    test_x.append(np.stack(samples))
    print(test_x[-1].shape)

Training set
(4115, 1024, 1)
(1856, 1024, 1)
(1812, 1024, 1)
(2221, 1024, 1)
(1709, 1024, 1)
(2220, 1024, 1)
(3041, 1024, 1)
(2980, 1024, 1)

Testing set
(3806, 1024, 1)
(2564, 1024, 1)
(2101, 1024, 1)


## Build a one-shot batch

In [None]:
# build a set of [n] sample pairs
# for half of the pairs, the samples come from the same class
# for the other half, the samples come from different classes
def make_batch(n, samples):
    # save these numbers
    n_vehicles = len(samples)
    length = 1024
    width = 1
    
    # initialize two empty arrays for the input batch
    x = [np.zeros((n, length, width)) for i in range(2)]
    
    # randomly select [n] vehicles
    vehicles = np.random.choice(n_vehicles, size=(n,), replace=False)
    
    # for each sample pair...
    for i in range(n):
        # select the left vehicle
        left_vehicle = vehicles[i]
        
        # select the sample of the left vehicle
        left_sample = np.random.randint(0, len(samples[left_vehicle]))
        
        # copy the left sample into the batch
        x[0][i, :, :] = samples[left_vehicle][left_sample]
        
        # select the right vehicle
        # the first half of the pairs should have the same vehicle
        if i >= n // 2:
            right_vehicle = left_vehicle
        
        # the second half of the pairs should have different vehicles
        else: 
            right_vehicle = (left_vehicle + np.random.randint(1, n_vehicles)) % n_vehicles
        
        # select the sample of the right vehicle
        right_sample = np.random.randint(0, len(samples[right_vehicle]))
        
        # copy the right sample into the batch
        x[1][i, :, :] = samples[right_vehicle][right_sample]
    
    # half of the targets should be [1] to indicate "same class"
    # the other half should be [0] to indicate "different class"
    y = np.zeros((n,))    
    y[n // 2:] = 1
    
    return x, y

## Test one-shot learning

In [None]:
# make a set of [n] sample pairs consisting of random images of random characters
# one pair should consist of two images from the same class
# the remaining pairs should consist of images from different classes
def make_oneshot_task(n, samples):
    # save these numbers
    n_vehicles = len(samples)
    length = 1024
    width = 1

    # randomly select [n] vehicles
    vehicles = np.random.choice(range(n_vehicles), size=(n,), replace=False)
    
    # make [n] copies of a random sample from a random test vehicle (this is the anchor)
    test_vehicle = vehicles[0]
    n_examples = len(samples[test_vehicle])
    
    # we'll need two different random numbers
    test_sample, support_sample = np.random.choice(n_examples, replace=False, size=(2,))
    test_samples = np.asarray([samples[test_vehicle][test_sample]] * n)
    
    # we'll need these
    support_samples = []
    
    # select one random image from each of the [n] random classes
    for i in vehicles:
        n_examples = len(samples[i])
        idx = np.random.randint(0, n_examples)
        support_samples.append(samples[i][idx])
    
    # we want a numpy array
    support_samples = np.stack(support_samples)
    
    # replace the test class's random image with the previously selected image
    # we do this to guarantee that the test image for the test class
    # is different from the support image for the test class
    support_samples[0] = samples[test_vehicle][support_sample]
    
    # the target for the first pair is [1] because it contains two samples from the same vehicle
    # the output for each of the remaining images is [0] because they come from different classes
    targets = np.zeros((n,))
    targets[0] = 1
    
    # shuffle the images
    targets, test_samples, support_samples = shuffle(targets, test_samples, support_samples)
    
    # we want a set of image pairs, not two sets of images
    pairs = [test_samples, support_samples]

    # return the stuff
    return pairs, targets

In [None]:
# evaluate [k] instances of [n]-way one-shot learning
def test_oneshot_model(n, k, model, samples):
    print("Evaluating model on {} random {}-way one-shot learning tasks...".format(k,n))
    
    # count the number of correct predictions
    n_correct = 0
    
    # for each task...
    for i in range(k):
        # generate test set and predict outputs
        inputs, targets = make_oneshot_task(n, samples)
        preds = model.predict(inputs)
        
        # check prediction
        if np.argmax(preds) == np.argmax(targets):
            n_correct += 1
    
    # output stats
    percent_correct = 100. * n_correct / k
    print("Average {}-way one-shot learning accuracy: {:.2f}%".format(n, percent_correct))
    return percent_correct

## Build siamese neural network

In [None]:
def initialize_weights(size):
    return np.random.normal(loc=0.0, scale=1e-2, size=size)

def initialize_bias(size):
    return np.random.normal(loc=0.5, scale=1e-2, size=size)

def build_siamese_model(filters, input_shape):
    # siamese networks have two inputs
    left_input = Input(shape=input_shape)
    right_input = Input(shape=input_shape)

    # build the model
    model = Sequential()
    model.add(Conv1D(filters[0], 4, input_shape=input_shape, padding="same",
                     kernel_initializer=initialize_weights, kernel_regularizer=l2(2e-4)))
    model.add(BatchNormalization())
    model.add(Activation("relu"))
    model.add(MaxPooling1D())
    
    model = Sequential()
    model.add(Conv1D(filters[1], 4, input_shape=input_shape, padding="same",
                     kernel_initializer=initialize_weights, kernel_regularizer=l2(2e-4)))
    model.add(BatchNormalization())
    model.add(Activation("relu"))
    model.add(MaxPooling1D())
    
    for i in filters[2:]:
        model.add(Conv1D(i, 4, padding="same", kernel_initializer=initialize_weights,
            bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
        model.add(Conv1D(i, 4, padding="same", kernel_initializer=initialize_weights,
            bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
        model.add(Conv1D(i, 4, padding="same", kernel_initializer=initialize_weights,
            bias_initializer=initialize_bias, kernel_regularizer=l2(2e-4)))
        model.add(BatchNormalization())
        model.add(Activation("relu"))
        model.add(MaxPooling1D())
        
    model.add(Flatten())
    model.add(Dense(512, activation="sigmoid", kernel_initializer=initialize_weights,
                    bias_initializer=initialize_bias, kernel_regularizer=l2(1e-3)))
    model.add(Dropout(0.2))
    
    # generate the encodings for the two samples
    left_encoding = model(left_input)
    right_encoding = model(right_input)
    
    # add a custom layer to compute the absolute difference between the encodings
    l1_layer = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))
    l1_distance = l1_layer([left_encoding, right_encoding])
    
    # add a dense layer to compute the similarity score
    prediction = Dense(1, activation="sigmoid", bias_initializer=initialize_bias)(l1_distance)
    
    # connect the inputs to the outputs
    siamese_net = Model(inputs=[left_input, right_input], outputs=prediction)
    
    # compile the model
    siamese_net.compile(loss="binary_crossentropy", optimizer=Adam(lr=6e-5))
    
    # return the model
    return siamese_net

## Train and test one-shot learning

In [None]:
# hyperparameters
interval = 200  # interval for evaluating on one-shot tasks
batch_size = len(known_vehicles)  # number of image pairs in each training batch
n_iterations = 25000  # number of training iterations
n_vehicles = len(unknown_vehicles)  # number of classes for each one-shot test
n_tasks = 250  # number of one-shot tasks to use for validation
best_acc = -1  # best validation accuracy during training

# different model configurations
filter_sets = [
    [32, 32, 64, 32, 128, 64, 256, 128, 512],
]

# for each configuration...
for filters in filter_sets:
    # build the model
    print(filters)
    model = build_siamese_model(filters=filters, input_shape=(1024,1))
    model.summary()

    # train the model
    # how long does this take?
    start_time = time.time()

    print("Training model")
    print("-" * 50)

    # we'll need these
    loss_history = []
    acc_history = []

    # for each training iteration...
    for i in range(1, n_iterations + 1):
        # make a training batch and train model using this batch
        inputs, targets = make_batch(batch_size, train_x)
        loss = model.train_on_batch(inputs, targets)

        # if it's time to evaluate the model...
        if i % interval == 0:
            # output some stats
            print("\n{}\n".format("-" * 20))
            print("Time for {} iterations: {:.2f} mins".format(i, (time.time() - start_time) / 60.))
            print("Train loss: {:.6f}".format(loss))

            # save model
            # model.save_weights(os.path.join(model_dir, "weights.{}.h5".format(i)))

            # evaluate model
            val_acc = test_oneshot_model(n_vehicles, n_tasks, model, test_x)
            loss_history.append(loss)
            acc_history.append(val_acc)

            # update best accuracy
            if val_acc >= best_acc:
                print("Current best: {:.2f}; previous best: {:.2f}".format(val_acc, best_acc))
                best_acc = val_acc

    print("Elapsed time: {:.2f} minutes".format((time.time() - start_time) / 60.))

    # show the training loss and accuracy over time
    graph_history(loss_history, acc_history, n_iterations, interval)

    # stats for best iteration-
    best_iteration = interval * (np.argmin(loss_history) + 1)
    print("Best iteration :", best_iteration)
    print("Training loss  :", np.min(loss_history))
    print("Val. accuracy  :", acc_history[np.argmin(loss_history)])

In [None]:
# # hyperparameters
# interval = 200  # interval for evaluating on one-shot tasks
# batch_size = len(known_vehicles)  # number of image pairs in each training batch
# n_iterations = 25000  # number of training iterations
# n_vehicles = len(unknown_vehicles)  # number of classes for each one-shot test
# n_tasks = 250  # number of one-shot tasks to use for validation
# best_acc = -1  # best validation accuracy during training

In [None]:
# # how long does this take?
# start_time = time.time()

# print("Training model")
# print("-" * 50)

# loss_history = []
# acc_history = []

# # for each training iteration...
# for i in range(1, n_iterations + 1):
#     # make a training batch and train model using this batch
#     inputs, targets = make_batch(batch_size, train_x)
#     loss = model.train_on_batch(inputs, targets)

#     # if it's time to evaluate the model...
#     if i % interval == 0:
#         # output some stats
#         print("\n{}\n".format("-" * 20))
#         print("Time for {} iterations: {:.2f} mins".format(i, (time.time() - start_time) / 60.))
#         print("Train loss: {:.6f}".format(loss))

#         # save model
#         # model.save_weights(os.path.join(model_dir, "weights.{}.h5".format(i)))

#         # evaluate model
#         val_acc = test_oneshot_model(n_vehicles, n_tasks, model, test_x)
#         loss_history.append(loss)
#         acc_history.append(val_acc)

#         # update best accuracy
#         if val_acc >= best_acc:
#             print("Current best: {:.2f}; previous best: {:.2f}".format(val_acc, best_acc))
#             best_acc = val_acc

# print("Elapsed time: {:.2f} minutes".format((time.time() - start_time) / 60.))

In [None]:
# # show the model's information
# print(filters)
# model.summary()

# # show the training loss and accuracy over time
# graph_history(loss_history, acc_history, n_iterations, interval)

# # stats for best iteration-
# best_iteration = interval * (np.argmin(loss_history) + 1)
# print("Best iteration :", best_iteration)
# print("Training loss  :", np.min(loss_history))
# print("Val. accuracy  :", acc_history[np.argmin(loss_history)])