<a href="https://colab.research.google.com/github/eroj333/learning-cv-ml/blob/master/SNN/Offline%20Triplet%20Mining.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import keras as k
import keras.backend as K
import numpy as np 
from keras.layers import *
from keras.models import Sequential, Model
from keras.regularizers import l2
import matplotlib.pyplot as plt
from keras.optimizers import Adam, Adadelta
from keras.callbacks import ModelCheckpoint, EarlyStopping
import os 
import cv2
import pickle

In [None]:
path = 'E:\hackathon\\trainset\\'

In [None]:
def process_data(path):
    X_train = []
    train_labels = []
    for root, dirs, files in os.walk(path):
        for img_path in files:
            if img_path.endswith(".jpg"):
                img_abs_path = os.path.abspath(os.path.join(root, img_path))
                try:
                    image = cv2.imread(img_abs_path)
                    gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                    rez_img = cv2.resize(gray_image, (60, 60))
                    X_train.append(rez_img)
                    labelPath = os.path.split(img_abs_path)[0]
                    label = os.path.split(labelPath)[1]
                    train_labels.append(label)
                except:
                    print(img_abs_path,'image reading error')
    X_train = np.array(X_train)
    train_labels = np.array(train_labels)
    #X_train = np.expand_dims(X_train, axis=(-1))
    X_train = X_train.reshape(X_train.shape[0], 60, 60, 1)
    X_train = X_train.astype('float32')
    X_train /= 255
    
    return X_train,train_labels
                

In [None]:
(x_train_master, y_train_master) = process_data(path)

In [None]:
x_train_master.shape

In [None]:
y_train_master = y_train_master.flatten()

In [None]:
num_train_per_class = 5
n_epochs = 200

In [None]:
def generate_triplets(dataset, label, sample_per_class=10):
    x, y = None, None
    for i in os.listdir(path+'\\train'):
        pos_indices = np.argwhere(label == i)[:,0]
        neg_indices = np.argwhere(label != i)[:,0]

        # print("pos indices: {}, neg_indices: {}".format(pos_indices.shape, neg_indices.shape))
        choice_anchor = np.random.choice(pos_indices.shape[0], sample_per_class, replace=True)
        choice_anchor = pos_indices[choice_anchor]

        choice_pos = np.random.choice(pos_indices.shape[0], sample_per_class, replace=True)
        choice_pos = pos_indices[choice_pos]

        choice_neg = np.random.choice(neg_indices.shape[0], sample_per_class, replace=True)
        choice_neg = neg_indices[choice_neg]

        sub_x_anc = dataset[choice_anchor]
    
        sub_x_pos = dataset[choice_pos]

        sub_x_neg = dataset[choice_neg]
      
        
        if(x is None):
            x = [(sub_x_anc), (sub_x_pos), (sub_x_neg)]
            y = [label[choice_anchor], label[choice_pos], label[choice_neg]]
        else:
            x[0] = np.vstack((x[0], (sub_x_anc)))
            x[1] = np.vstack((x[1], (sub_x_pos)))
            x[2] = np.vstack((x[2], (sub_x_neg)))

            y[0] = np.hstack((y[0].flatten(), label[choice_anchor].flatten()))
            y[1] = np.hstack((y[1].flatten(), label[choice_pos].flatten()))
            y[2] = np.hstack((y[2].flatten(), label[choice_neg].flatten()))

    return x, y

In [None]:
train_x, train_y = generate_triplets(x_train_master, y_train_master)

In [None]:
train_x[0].shape

In [None]:
def visualize_data(data, n):
  n = min(len(data[0]), n)
  random_choices = np.random.choice(len(data[0]),n, replace=False)
  fig, ax = plt.subplots(n, 3,figsize=(10,40))
  anc, pos, neg = data
  for i,ch in enumerate(random_choices):
    ax[i, 0].imshow(np.squeeze(anc[ch] ))
    ax[i, 1].imshow(np.squeeze(pos[ch] ))
    ax[i, 2].imshow(np.squeeze(neg[ch] ))
    

    ax[i, 0].set_axis_off()
    ax[i, 1].set_axis_off()
    ax[i, 2].set_axis_off()

In [None]:
visualize_data(train_x, 10)

In [None]:
train_x, train_y = generate_triplets(x_train_master, y_train_master, num_train_per_class)
#test_x, test_y = generate_triplets(x_test_master, y_test_master,5)

In [None]:
def triplet_loss(inputs, dist='sqeuclidean', margin='maxplus'):
    anchor, positive, negative = inputs
    positive_distance = K.square(anchor - positive)
    negative_distance = K.square(anchor - negative)
    if dist == 'euclidean':
        positive_distance = K.sqrt(K.sum(positive_distance, axis=-1, keepdims=True))
        negative_distance = K.sqrt(K.sum(negative_distance, axis=-1, keepdims=True))
    elif dist == 'sqeuclidean':
        positive_distance = K.sum(positive_distance, axis=-1, keepdims=True)
        negative_distance = K.sum(negative_distance, axis=-1, keepdims=True)
    loss = positive_distance - negative_distance
    if margin == 'maxplus':
        loss = K.maximum(0.0, 1 + loss)
    elif margin == 'softplus':
        loss = K.log(1 + K.exp(loss))
    return K.mean(loss)

def get_embedding_model(input_shape, embedding_dim):
    _input = Input(shape=input_shape)
    x = Flatten()(_input)
    x = Dense(embedding_dim * 8,activation="relu")(x)
    x = Dense(embedding_dim * 2, activation='relu')(x)
    x = Dense(embedding_dim)(x)
    return Model(_input, x)
        

def get_siamese_model(input_shape, triplet_margin=.3, embedding_dim=50):
    """
        Model architecture
    """
    
    # Define the tensors for the triplet of input images
    anchor_input = Input(input_shape, name="anchor_input")
    positive_input = Input(input_shape, name="positive_input")
    negative_input = Input(input_shape, name="negative_input")
    
    # Convolutional Neural Network (same from earlier)
    embedding_model = get_embedding_model(input_shape, embedding_dim)
    
    # Generate the embedding outputs 
    encoded_anchor = embedding_model(anchor_input)
    encoded_positive = embedding_model(positive_input)
    encoded_negative = embedding_model(negative_input)
    
    inputs = [anchor_input, positive_input, negative_input]
    outputs = [encoded_anchor, encoded_positive, encoded_negative]
    
    # Connect the inputs with the outputs
    siamese_triplet = Model(inputs=inputs,outputs=outputs)
    
    siamese_triplet.add_loss((triplet_loss(outputs, dist='euclidean', margin='maxplus')))
    
    # return the model
    return embedding_model, siamese_triplet

# Siamese NN trained on all classes

In [None]:
def shuffle_triplets(inputs, labels):
  a, p, n = inputs
  l1, l2, l3 = labels
  randomizer = np.random.choice(a.shape[0], a.shape[0], replace=False)
  a = a[randomizer]
  p = p[randomizer]
  n = n[randomizer]

  l1 = l1[randomizer]
  l2 = l2[randomizer]
  l3 = l3[randomizer]
  return [a, p, n], [l1,l2,l3]

In [None]:
train_x, train_y = shuffle_triplets(train_x, train_y)

In [None]:
embedding_model2, siamese_triplet2 = get_siamese_model((60,60,1), triplet_margin=.3, embedding_dim=150)
siamese_triplet2.compile(loss=None, optimizer=Adam(0.0001))
history_s2 = siamese_triplet2.fit(x=train_x, shuffle=True, batch_size=1000,
                              validation_split=.1, epochs=n_epochs)

In [None]:
train_embeds = embedding_model2.predict(np.vstack((train_x[0], train_x[1], train_x[2])))

In [None]:
target = np.hstack((train_y[0], train_y[1], train_y[2]))

In [None]:
from sklearn.neighbors import KNeighborsClassifier
def fit_nearest_neighbor(img_encoding, img_class, algorithm='ball_tree'):
    classifier = KNeighborsClassifier(n_neighbors=3, algorithm=algorithm)
    classifier.fit(img_encoding, img_class)
    return classifier

In [None]:
classifier2 = fit_nearest_neighbor(train_embeds, target)

In [None]:
e2 = embedding_model2.predict(x_train_master)
op2 = classifier2.predict(e2)

In [None]:
(np.where(y_train_master == op2))[0].shape[0] / y_train_master.shape[0]

In [None]:
from sklearn.metrics import confusion_matrix
mat = confusion_matrix(y_train_master, op2)

In [None]:
# import seaborn as sns

In [None]:
# sns.heatmap(mat, annot=True)

In [None]:
mat.shape

In [None]:
#save classsifier model
# Its important to use binary mode 
knnPickle = open('knnpickle_file', 'wb')
# source, destination 
pickle.dump(classifier2, knnPickle)

In [None]:
from keras.models import model_from_json

In [None]:
# serialize model to JSON
model_json = embedding_model2.to_json()
with open("model.json", "w") as json_file:
    json_file.write(model_json)
# serialize weights to HDF5
embedding_model2.save_weights("model.h5")
print("Saved model to disk")