In [None]:
import os
import tensorflow.keras.backend as K
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import GlobalAveragePooling2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Lambda

import ipynb.fs
from ipynb.fs.full.graphNN import anchor_pairs, positive_pairs
from ipynb.fs.full.graph_features import build_subgraph

tf.random.set_seed(0)

#Returns dictionary of anchors and positives, with key=subgraph label and value=tensors
anchor_tensor = anchor_pairs()
positive_tensor = positive_pairs()
#_, subgraph_class = build_subgraph()

In [None]:
def create_pairs():
    _, sub_class = build_subgraph()
    imagePairs = []
    labelPairs = []
    
    num_pairs = 15
    for i in range(num_pairs):
        subgraph_class = sub_class.copy()
        
        np.random.seed(i)
        idx_sub = np.random.choice(subgraph_class)
        print(idx_sub)
        anchor_sub = anchor_tensor[idx_sub]
        positive_sub = positive_tensor[idx_sub]
        anchor_sub = tf.expand_dims(anchor_sub, axis=-1)
        positive_sub = tf.expand_dims(positive_sub, axis=-1)
        
        subgraph_class.remove(idx_sub)
        idx_diss = np.random.choice(subgraph_class)
        print(idx_diss)
        diss_sub1 = anchor_tensor[idx_diss]
        diss_sub1 = tf.expand_dims(diss_sub1, axis=-1)
        
        imagePairs.append([anchor_sub, positive_sub])
        labelPairs.append([1])
                          
        imagePairs.append([anchor_sub, diss_sub1])
        labelPairs.append([0]) 
    
    return (np.array(imagePairs), np.array(labelPairs))

In [None]:
def euclidean_distance(vecs):
    (imgA, imgB) = vecs
    ss = K.sum(K.square(imgA - imgB), axis = 1, keepdims=True)
    return K.sqrt(K.maximum(ss, K.epsilon()))

In [None]:
latent_dim = 32

def siamese_model(input_shape, embeddingDim = latent_dim):
    tf.keras.backend.clear_session()
    
    inputs = Input(input_shape)
    x = Conv2D(10, (3, 3), padding = "same", activation = "relu")(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.2)(x)
    
    x = Conv2D(10, (3, 3), padding = "same", activation = "relu")(inputs)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Dropout(0.2)(x)
    
    pooling = GlobalAveragePooling2D()(x)
    outputs = Dense(embeddingDim)(pooling)
    model = Model(inputs, outputs)
    
    return model

In [None]:
def contrastiveLoss(y, y_preds, margin=1):
    y = tf.cast(y, y_preds.dtype)
    y_preds_squared = K.square(y_preds)
    margin_squared = K.square(K.maximum(margin - y_preds, 0))
    loss = K.mean(y * y_preds_squared + (1 - y) * margin_squared)
    return loss

In [None]:
def model_compile():
    training_pairs, training_labels = create_pairs()
    image_shape = (latent_dim,latent_dim,1)
    # specify the batch size and number of epochs
    #batch_size = 64
    epochs = 50
    
    imageA = Input(shape = image_shape)
    imageB = Input(shape = image_shape)
    
    model_build = siamese_model(image_shape)
    modelA = model_build(imageA)
    modelB = model_build(imageB)
    
    distance = Lambda(euclidean_distance)([modelA, modelB])
    model = Model(inputs=[imageA, imageB], outputs=distance)
    
    model.compile(loss=contrastiveLoss, 
                  optimizer="adam", 
                  metrics=['accuracy'])
    history = model.fit(
        [training_pairs[:, 0], training_pairs[:, 1]], training_labels[:],
        #validation_data=([test_pairs[:, 0], test_pairs[:, 1]], test_labels[:]),
        #batch_size = batch_size,
        steps_per_epoch=5,
        epochs = epochs)
    
    model.summary()
    
    return model, history

In [None]:
def make_predictions():
    import pandas as pd
    import matplotlib.pyplot as plt
    
    model, history = model_compile()
    
    df = pd.DataFrame(history.history)
    epoch = range(len(df.loss))
    plt.plot(epoch, df.loss)
    plt.xlabel('Epoch')
    plt.ylabel('Contrastive Loss')
    plt.show() 
        
    test_pairs = create_pairs()[0]
    
    imageA = test_pairs[:,0]
    imageB = test_pairs[:,1]
    predicts = model.predict([imageA, imageB]) 
    return predicts

In [None]:
def plot_prediction(result):
    len_predict = len(result) 
    pos_range = np.arange(0, len_predict, 2) 
    neg_range = pos_range + 1 
    
    positive_results = np.array([result[i][0] for i in pos_range]) 
    negative_results = np.array([result[j][0] for j in neg_range])  

    for x in range(len(positive_results)):
        print('Anchor/Positive: {} - Anchor/Negative: {}'.format(positive_results[x], negative_results[x])) 
    
    plot_prediction(make_predictions())