### Setup

In [1]:
import tensorflow as tf
import numpy as np
from functools import partial

n_inputs = 28 * 28
n_hidden_neurons = 100
learning_rate = 0.01
momentum = 0.95

(X_train_raw, y_train_raw), (X_test_raw, y_test_raw) = tf.keras.datasets.mnist.load_data()
X_train_raw = X_train_raw.astype(np.float32).reshape(-1, n_inputs) / 255
X_test_raw = X_test_raw.astype(np.float32).reshape(-1, n_inputs) / 255
y_train_raw = y_train_raw.astype(np.int32)
y_test_raw = y_test_raw.astype(np.int32)

X_train = X_train_raw[:55000]
y_train = y_train_raw[:55000]
X_val = X_train_raw[55000:]
y_val = y_train_raw[55000:]

def generate_comparison_training_batch(batch_size, images, labels):
    size1 = batch_size // 2
    size2 = batch_size - size1
    if size1 != size2 and np.random.rand() > 0.5:
        size1, size2 = size2, size1
    X = []
    y = []
    while len(X) < size1:
        rnd_idx1, rnd_idx2 = np.random.randint(0, len(images), 2)
        if rnd_idx1 != rnd_idx2 and labels[rnd_idx1] == labels[rnd_idx2]:
            X.append(np.array([images[rnd_idx1], images[rnd_idx2]]))
            y.append([1])
    while len(X) < batch_size:
        rnd_idx1, rnd_idx2 = np.random.randint(0, len(images), 2)
        if labels[rnd_idx1] != labels[rnd_idx2]:
            X.append(np.array([images[rnd_idx1], images[rnd_idx2]]))
            y.append([0])
    rnd_indices = np.random.permutation(batch_size)
    return np.array(X)[rnd_indices], np.array(y)[rnd_indices]


  return f(*args, **kwds)
  from ._conv import register_converters as _register_converters


In [2]:
he_init = tf.contrib.layers.variance_scaling_initializer()
def dnn(inputs, n_layers=5, n_neurons=100, activation=tf.nn.elu, initializer=he_init, batch_norm_momentum=None, name="", dropout_rate=None):
    for layer in range(n_layers):
        inputs = tf.layers.dense(inputs, n_neurons, kernel_initializer=initializer, name="%s_hidden%d" % (name, layer + 1))
        inputs = activation(inputs, name="%s_hidden%d_output" % (name, layer + 1))
    return inputs

### Create DNN to Detect Same Digit

In [3]:
tf.reset_default_graph()

X = tf.placeholder(tf.float64, (None, 2, n_inputs), name="X")
y = tf.placeholder(tf.int32, [None, 1], name="y")
X_A, X_B = tf.unstack(X, axis=1)

# DNN
dnn_A = dnn(X_A, name="DNN_A")
dnn_B = dnn(X_B, name="DNN_B")
dnn_AB = tf.concat([dnn_A, dnn_B], 1)
hidden_AB = tf.layers.dense(dnn_AB, 10, name="hidden_AB", kernel_initializer=he_init, activation=tf.nn.elu)
logits = tf.layers.dense(hidden_AB, 1, name="logits", kernel_initializer=he_init)
y_proba = tf.sigmoid(logits)

# Loss
xentropy = tf.nn.sigmoid_cross_entropy_with_logits(labels=tf.cast(y, tf.float64), logits=logits)
loss = tf.reduce_mean(xentropy, name="loss")

# Train
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=momentum, use_nesterov=True)
training_op = optimizer.minimize(loss, name="training_op")

# Eval
y_pred = tf.cast(tf.greater_equal(y_proba, 0.5), tf.int32)
accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred, y), tf.float32), name="accuracy")

init = tf.global_variables_initializer()
saver = tf.train.Saver()

In [4]:
X_test1, y_test1 = generate_comparison_training_batch(len(X_test_raw), X_test_raw, y_test_raw)

In [5]:
n_epochs = 100
batch_size = 500

with tf.Session() as sess:
    init.run()
    for epoch in range(n_epochs):
        for batch_index in range(len(X_train) // batch_size):
            X_batch, y_batch = generate_comparison_training_batch(batch_size, X_train, y_train)
            loss_train, _ = sess.run([loss, training_op], feed_dict={X: X_batch, y: y_batch})
        print("[TRAIN]\t{}\tLoss: {:.6f}".format(epoch, loss_train))
        if (epoch % 10 == 0):
            accuracy_test = sess.run(accuracy, feed_dict={X: X_test1, y: y_test1})
            print("[TEST]\t{}\tAccuracy: {:.2f}%".format(epoch, accuracy_test * 100))
        
    saver.save(sess, "./my_mnist_comparison_model.ckpt")

[TRAIN]	0	Loss: 0.420595
[TEST]	0	Accuracy: 79.63%
[TRAIN]	1	Loss: 0.343084
[TRAIN]	2	Loss: 0.289761
[TRAIN]	3	Loss: 0.288865
[TRAIN]	4	Loss: 0.221917
[TRAIN]	5	Loss: 0.206437
[TRAIN]	6	Loss: 0.187823
[TRAIN]	7	Loss: 0.215581
[TRAIN]	8	Loss: 0.153604
[TRAIN]	9	Loss: 0.187798
[TRAIN]	10	Loss: 0.176235
[TEST]	10	Accuracy: 94.42%
[TRAIN]	11	Loss: 0.120984
[TRAIN]	12	Loss: 0.145719
[TRAIN]	13	Loss: 0.135987
[TRAIN]	14	Loss: 0.099689
[TRAIN]	15	Loss: 0.105212
[TRAIN]	16	Loss: 0.103685
[TRAIN]	17	Loss: 0.090757
[TRAIN]	18	Loss: 0.097564
[TRAIN]	19	Loss: 0.076935
[TRAIN]	20	Loss: 0.084564
[TEST]	20	Accuracy: 96.18%
[TRAIN]	21	Loss: 0.134126
[TRAIN]	22	Loss: 0.076028
[TRAIN]	23	Loss: 0.082661
[TRAIN]	24	Loss: 0.099912
[TRAIN]	25	Loss: 0.066036
[TRAIN]	26	Loss: 0.082318
[TRAIN]	27	Loss: 0.074621
[TRAIN]	28	Loss: 0.048043
[TRAIN]	29	Loss: 0.071715
[TRAIN]	30	Loss: 0.094824
[TEST]	30	Accuracy: 96.75%
[TRAIN]	31	Loss: 0.072406
[TRAIN]	32	Loss: 0.047032
[TRAIN]	33	Loss: 0.088157
[TRAIN]	34	Loss: 0.

In [6]:
tf.reset_default_graph()

X = tf.placeholder(tf.float32, (None, n_inputs), name="X")
y = tf.placeholder(tf.int32, (None), name="y")

dnn_outputs = dnn(X, name="DNN_A")
# frozen_outputs = tf.stop_gradient(dnn_outputs)
logits = tf.layers.dense(dnn_outputs, 10, name="cls_logits", kernel_initializer=he_init)

# Loss
xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits)
loss = tf.reduce_mean(xentropy, name="cls_loss")

# Train
optimizer = tf.train.MomentumOptimizer(learning_rate=learning_rate, momentum=momentum, use_nesterov=True)
training_op = optimizer.minimize(loss, name="cls_training_op")

# # Eval
correct = tf.cast(tf.nn.in_top_k(logits, y, 1), tf.float32)
accuracy = tf.reduce_mean(correct, name="cls_accuracy")

init = tf.global_variables_initializer()
# dnn_A_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope="DNN_A")
# restore_saver = tf.train.Saver(var_list={var.op.name: var for var in dnn_A_vars})
new_saver = tf.train.Saver()

In [8]:
n_epochs = 100
batch_size = 50

with tf.Session() as sess:
    init.run()
#     restore_saver.restore(sess, "./my_mnist_comparison_model.ckpt")
    for epoch in range(n_epochs):
        rnd_idx = np.random.permutation(len(X_val))
        for rnd_indices in np.array_split(rnd_idx, len(X_val) // batch_size):
            X_batch, y_batch = X_val[rnd_indices], y_val[rnd_indices]
            loss_train, _ = sess.run([loss, training_op], feed_dict={X: X_batch, y: y_batch})
        print("[TRAIN]\t{}\tLoss: {:.6f}".format(epoch, loss_train))
        if (epoch % 10 == 0):
            accuracy_test = sess.run(accuracy, feed_dict={X: X_test_raw, y: y_test_raw})
            print("[TEST]\t{}\tAccuracy: {:.2f}%".format(epoch, accuracy_test * 100))
    new_saver.save(sess, "./my_mnist_recognize_model.ckpt")
        

[TRAIN]	0	Loss: 0.448841
[TEST]	0	Accuracy: 90.10%
[TRAIN]	1	Loss: 0.321422
[TRAIN]	2	Loss: 0.178332
[TRAIN]	3	Loss: 0.089012
[TRAIN]	4	Loss: 0.047456
[TRAIN]	5	Loss: 0.080921
[TRAIN]	6	Loss: 0.017963
[TRAIN]	7	Loss: 0.017015
[TRAIN]	8	Loss: 0.003149
[TRAIN]	9	Loss: 0.004140
[TRAIN]	10	Loss: 0.000383
[TEST]	10	Accuracy: 93.72%
[TRAIN]	11	Loss: 0.000930
[TRAIN]	12	Loss: 0.000932
[TRAIN]	13	Loss: 0.000502
[TRAIN]	14	Loss: 0.000451
[TRAIN]	15	Loss: 0.000121
[TRAIN]	16	Loss: 0.000940
[TRAIN]	17	Loss: 0.000351
[TRAIN]	18	Loss: 0.000066
[TRAIN]	19	Loss: 0.000634
[TRAIN]	20	Loss: 0.000294
[TEST]	20	Accuracy: 93.80%
[TRAIN]	21	Loss: 0.000334
[TRAIN]	22	Loss: 0.000304
[TRAIN]	23	Loss: 0.000207
[TRAIN]	24	Loss: 0.001060
[TRAIN]	25	Loss: 0.000312
[TRAIN]	26	Loss: 0.000253
[TRAIN]	27	Loss: 0.000125
[TRAIN]	28	Loss: 0.000095
[TRAIN]	29	Loss: 0.000173
[TRAIN]	30	Loss: 0.000109
[TEST]	30	Accuracy: 93.85%
[TRAIN]	31	Loss: 0.000267
[TRAIN]	32	Loss: 0.000253
[TRAIN]	33	Loss: 0.000095
[TRAIN]	34	Loss: 0.