In [1]:
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Add, GlobalAveragePooling2D
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense, Lambda, BatchNormalization
from tensorflow.keras.utils import plot_model
import numpy as np
import random
import matplotlib.pyplot as plt
import os

In [104]:
resnet_base=tf.keras.applications.ResNet50V2(include_top=False,
                                             weights='imagenet',
                                            input_shape=(224,224,3))

In [12]:
resnet_base.summary()

Model: "resnet50v2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 230, 230, 3)  0           ['input_3[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 112, 112, 64  9472        ['conv1_pad[0][0]']              
                                )                                                                 
                                                                                         

In [17]:
layers=resnet_base.layers
for i,layer in enumerate(layers):
    print(i,layer.name)

0 input_3
1 conv1_pad
2 conv1_conv
3 pool1_pad
4 pool1_pool
5 conv2_block1_preact_bn
6 conv2_block1_preact_relu
7 conv2_block1_1_conv
8 conv2_block1_1_bn
9 conv2_block1_1_relu
10 conv2_block1_2_pad
11 conv2_block1_2_conv
12 conv2_block1_2_bn
13 conv2_block1_2_relu
14 conv2_block1_0_conv
15 conv2_block1_3_conv
16 conv2_block1_out
17 conv2_block2_preact_bn
18 conv2_block2_preact_relu
19 conv2_block2_1_conv
20 conv2_block2_1_bn
21 conv2_block2_1_relu
22 conv2_block2_2_pad
23 conv2_block2_2_conv
24 conv2_block2_2_bn
25 conv2_block2_2_relu
26 conv2_block2_3_conv
27 conv2_block2_out
28 conv2_block3_preact_bn
29 conv2_block3_preact_relu
30 conv2_block3_1_conv
31 conv2_block3_1_bn
32 conv2_block3_1_relu
33 conv2_block3_2_pad
34 conv2_block3_2_conv
35 conv2_block3_2_bn
36 conv2_block3_2_relu
37 max_pooling2d_6
38 conv2_block3_3_conv
39 conv2_block3_out
40 conv3_block1_preact_bn
41 conv3_block1_preact_relu
42 conv3_block1_1_conv
43 conv3_block1_1_bn
44 conv3_block1_1_relu
45 conv3_block1_2_pad
4

In [105]:
resnet_base.trainable=True
layers=resnet_base.layers
for layer in layers[:98]:
    layer.trainable=False


In [106]:
# define model
inputs=Input(shape=(224,224,3))
x=resnet_base(inputs)
x=GlobalAveragePooling2D()(x)
x = Flatten()(x)
x = Dropout(0.25)(x)
x= Dense(512,activation='relu')(x)
x = Dropout(0.25)(x)
x= Dense(128,activation='relu')(x)
x = Dropout(0.25)(x)
x = Dense(32, use_bias=False)(x)
outputs = Lambda(lambda x: tf.math.l2_normalize(x,axis=-1))(x)
model = tf.keras.Model(inputs,outputs)


In [107]:
def _pairwise_distances(embeddings, squared=False):
    """Compute the 2D matrix of distances between all the embeddings.

    Args:
        embeddings: tensor of shape (batch_size, embed_dim)
        squared: Boolean. If true, output is the pairwise squared euclidean distance matrix.
                 If false, output is the pairwise euclidean distance matrix.

    Returns:
        pairwise_distances: tensor of shape (batch_size, batch_size)
    """
    # Get the dot product between all embeddings
    # shape (batch_size, batch_size)
    dot_product = tf.matmul(embeddings, tf.transpose(embeddings))

    # Get squared L2 norm for each embedding. We can just take the diagonal of `dot_product`.
    # This also provides more numerical stability (the diagonal of the result will be exactly 0).
    # shape (batch_size,)
    square_norm = tf.linalg.diag_part(dot_product)

    # Compute the pairwise distance matrix as we have:
    # ||a - b||^2 = ||a||^2  - 2 <a, b> + ||b||^2
    # shape (batch_size, batch_size)
    distances = tf.expand_dims(square_norm, 0) - 2.0 * dot_product + tf.expand_dims(square_norm, 1)

    # Because of computation errors, some distances might be negative so we put everything >= 0.0
    distances = tf.maximum(distances, 0.0)

    if not squared:
        # Because the gradient of sqrt is infinite when distances == 0.0 (ex: on the diagonal)
        # we need to add a small epsilon where distances == 0.0
        mask = tf.cast(tf.equal(distances, 0.0),dtype=tf.float32)
        distances = distances + mask * 1e-16

        distances = tf.sqrt(distances)

        # Correct the epsilon added: set the distances on the mask to be exactly 0.0
        distances = distances * (1.0 - mask)
    
    # REGULARIZATION: ADD THE DOT PRODUCT TERM
    return distances, dot_product

In [108]:
def _get_triplet_mask(labels):
    """Return a 3D mask where mask[a, p, n] is True iff the triplet (a, p, n) is valid.
    A triplet (i, j, k) is valid if:
        - i, j, k are distinct
        - labels[i] == labels[j] and labels[i] != labels[k]
    Args:
        labels: tf.int32 `Tensor` with shape [batch_size]
    """
    # Check that i, j and k are distinct
    indices_equal = tf.cast(tf.eye(tf.shape(labels)[0]), tf.bool)
    indices_not_equal = tf.logical_not(indices_equal)
    i_not_equal_j = tf.expand_dims(indices_not_equal, 2)
    i_not_equal_k = tf.expand_dims(indices_not_equal, 1)
    j_not_equal_k = tf.expand_dims(indices_not_equal, 0)

    distinct_indices = tf.logical_and(tf.logical_and(i_not_equal_j, i_not_equal_k), j_not_equal_k)


    # Check if labels[i] == labels[j] and labels[i] != labels[k]
    label_equal = tf.equal(tf.expand_dims(labels, 0), tf.expand_dims(labels, 1))
    i_equal_j = tf.expand_dims(label_equal, 2)
    i_equal_k = tf.expand_dims(label_equal, 1)

    valid_labels = tf.logical_and(i_equal_j, tf.logical_not(i_equal_k))

    # Combine the two masks
    mask = tf.logical_and(distinct_indices, valid_labels)

    return mask

In [109]:
def batch_all_triplet_loss(labels, embeddings, margin=0.2,alpha=0, squared=False):
    """Build the triplet loss over a batch of embeddings.

    We generate all the valid triplets and average the loss over the positive ones.

    Args:
        labels: labels of the batch, of size (batch_size,)
        embeddings: tensor of shape (batch_size, embed_dim)
        margin: margin for triplet loss
        squared: Boolean. If true, output is the pairwise squared euclidean distance matrix.
                 If false, output is the pairwise euclidean distance matrix.

    Returns:
        triplet_loss: scalar tensor containing the triplet loss
    """
    # Get the pairwise distance matrix
    pairwise_dist,dot_product = _pairwise_distances(embeddings, squared=squared)
    
    #REGULARIZATION: compute regulazation term so  that dot_pro_expand[i,j,k] contains embed_i(T)*emb_k
    dot_pro_expand=tf.reshape(tf.tile(dot_product,(1,labels.shape[0])),[labels.shape[0],labels.shape[0],labels.shape[0]])

    anchor_positive_dist = tf.expand_dims(pairwise_dist, 2)
    anchor_negative_dist = tf.expand_dims(pairwise_dist, 1)

    # Compute a 3D tensor of size (batch_size, batch_size, batch_size)
    # triplet_loss[i, j, k] will contain the triplet loss of anchor=i, positive=j, negative=k
    # Uses broadcasting where the 1st argument has shape (batch_size, batch_size, 1)
    # and the 2nd (batch_size, 1, batch_size)
    triplet_loss = anchor_positive_dist - anchor_negative_dist + margin

    # Put to zero the invalid triplets
    # (where label(a) != label(p) or label(n) == label(a) or a == p)
    mask = _get_triplet_mask(labels)
    mask = tf.cast(mask,dtype=tf.float32)
    triplet_loss = tf.multiply(mask, triplet_loss)
    
    #REGULARIZATION:put to zero to invalid triplets
    regularizer=tf.multiply(mask,dot_pro_expand)
    M1=tf.math.square(regularizer)
    M2=tf.maximum(M1-(1/32),0)

    # Remove negative losses (i.e. the easy triplets)
    triplet_loss = tf.maximum(triplet_loss, 0.0)
    #REGULARIZATION: add regularization term
    triplet_loss=triplet_loss+alpha*(M1+M2)

    # Count number of positive triplets (where triplet_loss > 0)
    valid_triplets = tf.cast(tf.greater(triplet_loss, 1e-16),dtype=tf.float32)
    num_positive_triplets = tf.reduce_sum(valid_triplets)
    num_valid_triplets = tf.reduce_sum(mask)
    fraction_positive_triplets = num_positive_triplets / (num_valid_triplets + 1e-16)

    # Get final mean triplet loss over the positive valid triplets
    triplet_loss = tf.reduce_sum(triplet_loss) / (num_positive_triplets + 1e-16)

    return triplet_loss

In [110]:
def accuracy_triplet(labels,embeddings,margin=0.2,squared=False):
    # Get the pairwise distance matrix
    pairwise_dist,dot_product = _pairwise_distances(embeddings, squared=squared)
    
    anchor_positive_dist = tf.expand_dims(pairwise_dist, 2)
    anchor_negative_dist = tf.expand_dims(pairwise_dist, 1)
    
    triplet_loss = anchor_positive_dist - anchor_negative_dist + margin

    # Put to zero the invalid triplets
    # (where label(a) != label(p) or label(n) == label(a) or a == p)
    mask = _get_triplet_mask(labels)
    mask = tf.cast(mask,dtype=tf.float32)
    triplet_loss = tf.multiply(mask, triplet_loss)
    #triplet_loss = tf.maximum(triplet_loss, 0.0)
    
    #TRIPLET WITH LOSS<0
    
    negative_triplets=tf.cast(tf.less(triplet_loss,0),tf.float32)
    num_negative_triplets=tf.math.reduce_sum(negative_triplets)
    num_valid_triplets = tf.reduce_sum(mask)
    return num_negative_triplets/num_valid_triplets
    

## build the dataset

In [111]:
data_dir='0'
dataset=tf.keras.preprocessing.image_dataset_from_directory(directory=data_dir,
                                                           image_size=(224,224),
                                                           batch_size=64,
                                                           color_mode='rgb',
                                                           shuffle=True)

Found 352 files belonging to 32 classes.


In [112]:
def process_img(img,label):
    img=tf.keras.applications.resnet_v2.preprocess_input(img)
    img=tf.image.random_brightness(img,0.2)
    img=tf.image.random_saturation(img,2,5)
    img=tf.image.random_flip_left_right(img)
    return img,label

In [113]:
dataset=dataset.map(process_img)

In [114]:
optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001)
loss_fn=batch_all_triplet_loss
num_epochs=20

In [95]:
model=tf.keras.models.load_model('resnet/resnet_50v2_emb_32_margin_02_alpha_03_20_epoch.h5')



In [97]:
for epoch in range(num_epochs):
    running_loss=[]
    running_acc=[]
    for x_batch,y_batch in dataset:
        with tf.GradientTape() as tape:
            embeddings=model(x_batch,training=True)
            loss=batch_all_triplet_loss(y_batch,embeddings,margin=0.2,alpha=0.5)
            acc=accuracy_triplet(y_batch,embeddings,margin=0.2)
        gradients=tape.gradient(loss,model.trainable_weights)
        optimizer.apply_gradients(zip(gradients,model.trainable_weights))
        running_loss.append(loss)
        running_acc.append(acc)
    #calculate mean loss
    print(f'Loss at epoch {epoch}: ',np.mean(running_loss))
    print(f'Accuracy at epoch {epoch}: ',np.mean(running_acc))
    

Loss at epoch 0:  0.064292625
Accuracy at epoch 0:  0.86004716
Loss at epoch 1:  0.058241505
Accuracy at epoch 1:  0.85692936
Loss at epoch 2:  0.057082552
Accuracy at epoch 2:  0.83918315
Loss at epoch 3:  0.055276573
Accuracy at epoch 3:  0.86459965
Loss at epoch 4:  0.05034845
Accuracy at epoch 4:  0.8717963
Loss at epoch 5:  0.04862778
Accuracy at epoch 5:  0.89900666
Loss at epoch 6:  0.049111098
Accuracy at epoch 6:  0.88254625
Loss at epoch 7:  0.044388894
Accuracy at epoch 7:  0.90381306
Loss at epoch 8:  0.043723863
Accuracy at epoch 8:  0.9050712
Loss at epoch 9:  0.039893907
Accuracy at epoch 9:  0.9219176
Loss at epoch 10:  0.037842467
Accuracy at epoch 10:  0.92454356
Loss at epoch 11:  0.039614573
Accuracy at epoch 11:  0.92839164
Loss at epoch 12:  0.036539074
Accuracy at epoch 12:  0.9342897
Loss at epoch 13:  0.03318927
Accuracy at epoch 13:  0.95339274
Loss at epoch 14:  0.033800725
Accuracy at epoch 14:  0.9462933
Loss at epoch 15:  0.034880582
Accuracy at epoch 15: 

In [98]:
model.save('resnet/resnet_50v2_emb_32_margin_02_alpha_03_40_epoch.h5')



In [69]:
def create_triples(digit_indices):
    '''Positive and negative triple creation.
    Each triple include two images of same label (anchor and postive) and one image of different label.
    '''
    
    classes=len(digit_indices)
    triples = []
    
    # n is min length of an array of indices with a specific value (0->9)
    n = min([len(digit_indices[d]) for d in range(classes)]) - 1
    
    #digit_indices[d]: contains all indices of images of the same label
    for d in range(classes):
        for i in range(n):
            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
           
            inc = random.randrange(1, 32)
            dn = (d + inc) % 32
            z3 = digit_indices[dn][i]
            triples += [[z1,z2,z3]]
            
    return np.array(triples)

In [70]:
def processed_for_test(image_path):
    image = tf.keras.utils.load_img(image_path)
    input_arr = tf.keras.utils.img_to_array(image)
    input_arr=input_arr/255.0
    input_arr=tf.image.resize(input_arr,(224,224))
    input_arr = np.array([input_arr])  # Convert single image to a batch.
    predictions = model.predict(input_arr)
    return predictions

In [71]:
def compute_distance(emb_1,emb_2):
    dist=tf.math.subtract(emb_1,emb_2)
    dist=tf.math.square(dist)
    dist=tf.math.reduce_sum(dist)
    dist=tf.math.sqrt(dist)
    return dist

In [72]:
#initiate the list that store all image paths
import os
folder='0'
sub=os.listdir(folder)
cat_indices=[0 for i in range(len(sub))]
sub=os.listdir(folder)
for i in range(len(sub)):
    dir=os.path.join(folder,sub[i])
    image_list=os.listdir(dir)
    img_path_list=[os.path.join(dir,fname) for fname in image_list]
    cat_indices[i]=img_path_list

In [73]:
testset=create_triples(cat_indices)

In [82]:
model=tf.keras.models.load_model('resnet/resnet_50v2_emb_32_margin_02_alpha_05_20_epoch.h5')



In [99]:
count=0
for i in range(100):
    anchor=processed_for_test(testset[i][0])
    pos=processed_for_test(testset[i][1])
    neg=processed_for_test(testset[i][2])
    pos_dist=compute_distance(anchor,pos)
    neg_dist=compute_distance(anchor,neg)
    # print("At triplet",i)
    # print("post dis:",pos_dist)
    # print('neg dist:',neg_dist)
    if neg_dist-pos_dist>=0.2:
        count+=1
    else:
        print("Wrong at triplet",i)
        print(f"pos_dist: {pos_dist}")
        print(f"neg_dist: {neg_dist}")
        print("!!!!!!!!!!!!!!!!!!!!!!!!!")

In [100]:
count

100

In [102]:
model=tf.keras.models.load_model('resnet/resnet_50v2_emb_32_margin_02_alpha_05_40_epoch.h5')



In [103]:
count=0
for i in range(50):
    anchor=processed_for_test(testset[i][0])
    pos=processed_for_test(testset[i][1])
    neg=processed_for_test(testset[i][2])
    pos_dist=compute_distance(anchor,pos)
    neg_dist=compute_distance(anchor,neg)
    print("At triplet",i)
    print("post dis:",pos_dist)
    print('neg dist:',neg_dist)
    # if neg_dist-pos_dist>=0.2:
    #     count+=1
    # else:
    #     print("Wrong at triplet",i)
    #     print(f"pos_dist: {pos_dist}")
    #     print(f"neg_dist: {neg_dist}")
    #     print("!!!!!!!!!!!!!!!!!!!!!!!!!")

At triplet 0
post dis: tf.Tensor(0.3052278, shape=(), dtype=float32)
neg dist: tf.Tensor(1.5500395, shape=(), dtype=float32)
At triplet 1
post dis: tf.Tensor(0.4026877, shape=(), dtype=float32)
neg dist: tf.Tensor(1.4345411, shape=(), dtype=float32)
At triplet 2
post dis: tf.Tensor(0.30101714, shape=(), dtype=float32)
neg dist: tf.Tensor(1.319791, shape=(), dtype=float32)
At triplet 3
post dis: tf.Tensor(0.34418467, shape=(), dtype=float32)
neg dist: tf.Tensor(1.2312732, shape=(), dtype=float32)
At triplet 4
post dis: tf.Tensor(0.32187292, shape=(), dtype=float32)
neg dist: tf.Tensor(1.3663648, shape=(), dtype=float32)
At triplet 5
post dis: tf.Tensor(0.36797768, shape=(), dtype=float32)
neg dist: tf.Tensor(1.3462706, shape=(), dtype=float32)
At triplet 6
post dis: tf.Tensor(0.25719932, shape=(), dtype=float32)
neg dist: tf.Tensor(1.225972, shape=(), dtype=float32)
At triplet 7
post dis: tf.Tensor(0.3849241, shape=(), dtype=float32)
neg dist: tf.Tensor(1.48154, shape=(), dtype=float32)

In [None]:
def calculate_average(sub_dir):
    data_dir=sub_dir
    dataset=tf.keras.preprocessing.image_dataset_from_directory(directory=data_dir,
                                                           image_size=(224,224),
                                                           batch_size=64,
                                                           color_mode='rgb',
                                                           )
    dataset=dataset.map(process_img)
