<img src="siamese_network.png" style="width:953px;height:600px;float:middle">
以上為siamese network的計算流程<br>

### 請解壓縮data.rar，取得本程式之數據

In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from keras.preprocessing.image import ImageDataGenerator
import os
from PIL import Image

Using TensorFlow backend.


In [2]:
examples_num = 20
img_width , img_height , channels = 28 , 28 , 1
way_num = 60
base_num = 5
input_num = 7

In [3]:
# Load Dataset
def load_data(root_dir):
    img_dirs = []
    for folder in os.listdir(root_dir):
        for character_folder in os.listdir(root_dir + '/{}'.format(folder)):
            img_dirs.append(os.path.join(root_dir , folder , character_folder))

    dataset = np.zeros([len(img_dirs) , examples_num , img_height , img_width] , dtype = np.float32)
    for i , folder in enumerate(img_dirs):
        for j , file in enumerate(os.listdir(folder)):
            imagePath = os.path.join(folder , file)
            image = Image.open(imagePath).resize((img_width , img_height))
            values = 1. - np.array(image , np.float32)
            dataset[i , j , : , :] = values

    return dataset

train_dataset = load_data('./data/training_image')
test_dataset = load_data('./data/testing_image')

In [4]:
aug = ImageDataGenerator(rotation_range = 25,
                         width_shift_range = 0.1,
                         height_shift_range = 0.1 ,
                         shear_range = 0.2 ,
                         zoom_range = 0.2,
                         fill_mode = 'nearest')

def get_batch(data , way_num = 60 , input_num = 7 , base_num = 5):

    class_indices = np.random.choice(np.arange(len(data)) ,
                                     size = way_num ,
                                     replace = False)

    base_image_set = None
    input_image_set = None
    input_label_set = None
    for i , class_index in enumerate(class_indices):

        class_image = np.reshape(data[class_index] , [20 , 28 , 28 , 1])
        augmentation_data = aug.flow(np.reshape(class_image , [-1 , 28 , 28 , 1]) , np.arange(20))
        class_image = augmentation_data[0][0]

        sample_index = np.random.choice(np.arange(20) ,
                                        size = input_num + base_num ,
                                        replace = False)

        # base_image → [1 , 5 , 28 , 28 , 1]
        base_index = sample_index[:base_num]
        base_image = np.reshape(class_image[base_index] , [1 , base_num , 28 , 28 , 1])
        if base_image_set is None:
            base_image_set = base_image
        else:
            # base_image_set → [60 , 5 , 28 , 28 , 1] (執行迴圈60次，沿著axis 0，將base_image堆疊60次)
            # 總共60個class，每1個class有5張image
            base_image_set = np.concatenate([base_image_set , base_image] , axis = 0)

        input_index = sample_index[-input_num:]
        # input_image → [7 , 28 , 28 , 1]
        input_image = np.reshape(class_image[input_index] , [1 , input_num , 28 , 28 , 1])
        if input_image_set is None:
            input_image_set = input_image
        else:
            # input_image_set → [60 , 7 , 28 , 28 , 1] (執行迴圈60次，沿著axis 0，將input_image堆疊60次)
            # 總共60個class，每1個class有7張image
            input_image_set = np.concatenate([input_image_set , input_image] , axis = 0)

        # input_label → [1 , 7 , 60]
        # class 0的input_label[: , : , 0] = 1
        # class 為1的input_label[: , : , 1] = 1
        #                   ...
        # class 為59的input_label[: , : , 60] = 1
        input_label = np.zeros([1 , input_num , way_num])
        input_label[: , : , i] = 1
        if input_label_set is None:
            input_label_set = input_label
        else:
            # input_label_set → [60 , 7 , 60] (執行迴圈60次，沿著axis 0，將input_label堆疊60次)
            # 總共60個class，每1個class含有相同的7個相同的label，而每1個label的one-hot的深度當然為60
            input_label_set = np.concatenate([input_label_set , input_label] , axis = 0)

    return base_image_set , input_image_set , input_label_set

In [5]:
input_image = tf.placeholder(tf.float32 , [way_num , input_num , 28 , 28 , 1])
input_image_ = tf.reshape(input_image , [-1 , 28 , 28 , 1])

base_image = tf.placeholder(tf.float32 , [way_num , base_num , 28 , 28 , 1])
base_image_ = tf.reshape(base_image , [-1 , 28 , 28 , 1])

y_true = tf.placeholder(tf.float32 , [way_num , input_num , way_num])
y_true_ = tf.reshape(y_true , [-1 , way_num])

on_train = tf.placeholder(tf.bool)

In [6]:
def batch_norm_layer_part1(inputs , on_train , convolution):
    # the dimension you wanna normalize, here [0] for batch
    # for image, you wanna do [0 , 1 , 2] for [batch , height , width] but not channel
    if convolution:
        fc_mean , fc_var = tf.nn.moments(inputs , axes = [0 , 1 , 2] , name = 'mean_var')
    else:
        fc_mean , fc_var = tf.nn.moments(inputs , axes = [0] , name = 'mean_var')

    ema = tf.train.ExponentialMovingAverage(decay = 0.99)
    ema_apply_op = ema.apply([fc_mean , fc_var])
    mean = tf.cond(on_train , lambda : fc_mean , lambda : ema.average(fc_mean))
    var = tf.cond(on_train , lambda : fc_var , lambda : ema.average(fc_var))
    return mean , var , ema_apply_op

def batch_norm_layer_part2(inputs , mean , var):
    initializer = tf.contrib.layers.xavier_initializer()
    scale = tf.get_variable(initializer = tf.ones([1 , inputs.shape[-1].value]) , name = 'scale')
    shift = tf.get_variable(initializer = tf.zeros([1 , inputs.shape[-1].value]) , name = 'shift')
    temp = (inputs - mean) / tf.sqrt(var + 1e-8)
    outputs = tf.multiply(temp , scale) + shift
    return outputs

In [7]:
def feature_extractor(input_ , reuse = False):
    
    #------------------------------conv_layer------------------------------#
    ema_list = []
    for i in range(1 , 6):
        with tf.variable_scope('conv{}'.format(i)) as scope:
            if reuse : tf.get_variable_scope().reuse_variables()
            conv_output = tf.contrib.layers.conv2d(input_ , 64 , [3 , 3] ,
                                                   activation_fn = tf.nn.relu , padding='SAME' ,
                                                   weights_initializer = tf.contrib.layers.xavier_initializer_conv2d())
    
        with tf.variable_scope('conv{}_compute_mean_var'.format(i)):
            conv_mean , conv_var , conv_ema = batch_norm_layer_part1(conv_output , on_train , True)
    
        with tf.variable_scope(scope):
            conv_bn = batch_norm_layer_part2(conv_output , conv_mean , conv_var)
            conv_pooling = tf.contrib.layers.max_pool2d(conv_bn , [2 , 2] , padding = 'SAME')

        input_ = conv_pooling
        ema_list.append(conv_ema)

    update_ema = tf.group(ema_list)
    #------------------------------conv_layer------------------------------#
    

    #------------------------------flatten_layer------------------------------#
    with tf.variable_scope('flatten'):
        output_flatten = tf.contrib.layers.flatten(input_)
    #------------------------------flatten_layer------------------------------#
        
    return output_flatten , update_ema

In [8]:
with tf.variable_scope('feature_extractor'):
    # embedding_input → [60 * 7 , 64]
    embedding_input , ema_input = feature_extractor(input_image_ , reuse = False)
    # embedding_input → [60 , 60 * 7 , 64]
    embedding_input_copy = tf.tile(tf.expand_dims(embedding_input , axis = 0) , [way_num , 1 , 1])

    # embedding_base → [60 , 5 , 64]    
    embedding_base , ema_base = feature_extractor(base_image_ , reuse = True)
    embedding_base = tf.reshape(embedding_base , [-1 , base_num , embedding_base.shape[-1].value])
    # embedding_base_mean → [60 , 64]  
    embedding_base_mean = tf.reduce_mean(embedding_base , axis = 1)
    # embedding_base_copy → [60 , 60 * 7 , 64] 
    embedding_base_copy = tf.tile(tf.expand_dims(embedding_base_mean , axis = 1) , [1 , way_num * input_num , 1])

    update_ema = tf.group([ema_input , ema_base])


with tf.variable_scope('euclidean_distance'):
    distance = tf.reduce_sum(tf.pow(embedding_base_copy - embedding_input_copy , 2) , axis = -1)
    distance = tf.transpose(distance , [1 , 0])
    distance = tf.sqrt(tf.maximum(distance , 1e-9))

    
with tf.variable_scope('contrastive_loss'):
    margin = 4
    decision_similarity = y_true_
    similarity = tf.multiply(decision_similarity , tf.square(distance))
    dissimilarity = tf.multiply(1 - decision_similarity , -tf.square(distance))
    contrastive_loss = tf.reduce_mean(similarity + dissimilarity)
    
    
with tf.variable_scope('optimizer'):
    train_op = tf.train.AdamOptimizer(1e-3).minimize(contrastive_loss)
    
    
with tf.variable_scope('accuracy'):    
    correct = tf.equal(tf.argmin(distance , 1) , tf.argmax(y_true_ , 1))
    correct = tf.cast(correct , tf.float32)
    accuracy = tf.reduce_mean(correct)

The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
Please use `layer.__call__` method instead.
Instructions for updating:
Use keras.layers.flatten instead.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


In [9]:
sess = tf.Session()
sess.run(tf.global_variables_initializer())

embedding_vector_list = []
for batch_i in range(0 , 3200):
    base_image_batch , input_image_batch , input_label_batch = get_batch(train_dataset)

    _  , _ , train_loss , train_accuracy = sess.run([train_op , update_ema , contrastive_loss , accuracy] ,
                                                     feed_dict = {input_image : input_image_batch ,
                                                                  y_true : input_label_batch ,
                                                                  base_image : base_image_batch ,
                                                                  on_train : True})
    
    if batch_i % 160 == 0:
        print('=' * 30)
        print('batch_i : {}'.format(batch_i))
        print('training_loss : {:.4f}'.format(train_loss))
        print('training_accuracy : {:.2%}'.format(train_accuracy))

        base_image_test , input_image_test , input_label_test = get_batch(test_dataset)
        test_loss , test_accuracy =  sess.run([contrastive_loss , accuracy] ,
                                              feed_dict = {input_image : input_image_test ,
                                                           y_true : input_label_test ,
                                                           base_image : base_image_test ,
                                                           on_train : False})
        print('*' * 30)
        print('testing_loss : {:.4f}'.format(test_loss))
        print('testing_accuracy : {:.2%}\n'.format(test_accuracy))

batch_i : 0
training_loss : -121.8945
training_accuracy : 6.67%
******************************
testing_loss : -425789587456.0000
testing_accuracy : 6.19%

batch_i : 160
training_loss : -635.0869
training_accuracy : 9.05%
******************************
testing_loss : -301.0881
testing_accuracy : 6.43%

batch_i : 320
training_loss : -840.5331
training_accuracy : 5.48%
******************************
testing_loss : -375.4400
testing_accuracy : 7.38%

batch_i : 480
training_loss : -1146.9990
training_accuracy : 7.14%
******************************
testing_loss : -438.3100
testing_accuracy : 5.00%

batch_i : 640
training_loss : -1483.4766
training_accuracy : 7.62%
******************************
testing_loss : -663.0653
testing_accuracy : 7.86%

batch_i : 800
training_loss : -1896.4108
training_accuracy : 4.29%
******************************
testing_loss : -369.8025
testing_accuracy : 5.24%

batch_i : 960
training_loss : -2291.2598
training_accuracy : 5.24%
******************************
test