In [185]:
import tensorflow as tf
import numpy as np

In [186]:
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.keras.optimizers import * 
from tensorflow.keras.losses import * 

In [203]:
def softmin(r, i, j, gamma=0.01):
    if (i-1, j) in r:
        a = -1.0 * r[(i-1, j)]   / gamma
    else:
        return tf.constant(float('inf'))
    if (i-1,j-1) in r:
        b = -1.0 * r[(i-1, j-1)] / gamma
    else:
        return tf.constant(float('inf'))
    if (i,j-1) in r:
        c = -1.0 * r[(i,   j-1)] / gamma
    else:
        return tf.constant(float('inf'))
    max_val = tf.math.maximum(a, tf.math.maximum(b, c))
    tmp  = tf.exp(a - max_val)
    tmp += tf.exp(b - max_val)
    tmp += tf.exp(c - max_val)
    return -gamma * (tf.math.log(tmp) + max_val)
    
    
def dtw(x, y, gamma=0.01):
    n = x.shape[0]
    m = y.shape[0]
    band = int(((n + m) / 2) * 0.1)
    r = {}
    r[(0,0)] = tf.constant(0.0, dtype=tf.float32)
    w = max(band, abs(n - m)) + 2

    for i in range(1, n + 1):
        for j in range(max(1, i - w), min(m + 1, i + w)):
            dist = tf.sqrt(tf.reduce_sum(tf.square(x[i - 1, :] - y[j - 1, :])))
            mini = softmin(r, i, j, gamma)   
            dp   = dist + mini
            r[(i, j)] = dist + mini         
    return r[(n, m)]


class TripletLoss(Loss):

    def __init__(self, margin, latent):
        super().__init__()
        self.margin = margin
        self.latent = latent

    def call(self, y_true, y_pred):
        y_pred = tf.convert_to_tensor(y_pred)
        anchor = y_pred[:, :, 0:self.latent]
        pos    = y_pred[:, :, self.latent:2 * self.latent]
        neg    = y_pred[:, :, 2 * self.latent:3 * self.latent]
        pos_dist   = dtw(anchor[0, :, :], pos[0, :, :])         
        neg_dist   = dtw(anchor[0, :, :], neg[0, :, :])        
        basic_loss = tf.add(tf.subtract(pos_dist, neg_dist), self.margin)  
        loss       = tf.reduce_sum(tf.maximum(basic_loss, 0.0))   
        print("Done")
        return loss

    
def encoder(in_shape, latent_dim, conv_params):
    """
    A LSTM stack on top of a convolutional layer pooled in time.

    See Figure [KOH4] Figure 2.

    :param in_shape: the input shape to the model
    :param latent_dim: embedding size of the model
    :param conv_params: (conv_w, conv_h, filters)

    :returns: a keras model
    """
    kernel_size = (conv_params[0], conv_params[1])
    n_filters = conv_params[2]
    dft_dim = in_shape[1]
    inp = Input(in_shape)
    loc = Conv2D(n_filters, strides = (1, 1), kernel_size=kernel_size, activation='relu', padding='same')(inp) 
    loc = MaxPool2D(pool_size=(1, dft_dim))(loc) 
    loc = Reshape((in_shape[0], n_filters))(loc) 
    x   = Bidirectional(LSTM(latent_dim, return_sequences=True))(loc)
    return Model(inputs =[inp], outputs=[x])

In [204]:
in_pos = Input((32, 12, 1))
in_neg = Input((32, 12, 1))
in_anc = Input((32, 12, 1))

e = encoder((32, 12, 1), 128, (8,8,64))

out_pos = e(in_pos)
out_neg = e(in_neg)
out_anc = e(in_anc)
conc    = Concatenate()([out_anc, out_neg, out_pos])

model = Model(inputs=[in_anc, in_pos, in_neg], outputs=conc)  
triplet_loss = TripletLoss(0.1, 128)
model.compile(optimizer = 'adam', loss = triplet_loss)

In [None]:
a = np.random.uniform(0, 1, (1, 32, 12, 1))
p = np.random.uniform(0, 1,(1, 32, 12, 1))
z = np.random.uniform(0, 1, (1, 32, 12, 1))
loss = model.train_on_batch(x=[a, p, z], y=np.zeros((1, 32, 768 )))

Done
Done


In [None]:
print(loss)