In [1]:
import tensorflow as tf
import numpy as np

In [2]:
class MNIST():
    def __init__(self, input_shape):
        super(MNIST, self).__init__()
        self.feature_extractor = tf.keras.models.Sequential([
            tf.keras.layers.Conv2D(filters=32, kernel_size=5,
                                   strides=1, input_shape=input_shape),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Activation('relu'),
            tf.keras.layers.MaxPooling2D(pool_size=2, strides=2),
            tf.keras.layers.Conv2D(filters=48, kernel_size=5, strides=1),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Activation('relu'),
            tf.keras.layers.MaxPooling2D(pool_size=2, strides=2),
            tf.keras.layers.Flatten()            
        ])
        
        self.label_predictor = tf.keras.models.Sequential([
            tf.keras.layers.Dense(100),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Activation('relu'),
            tf.keras.layers.Dense(100),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Activation('relu'),
            tf.keras.layers.Dense(10, activation='softmax')
        ])
        self.domain_predictor = tf.keras.models.Sequential([
            tf.keras.layers.Dense(100),
            tf.keras.layers.BatchNormalization(),
            tf.keras.layers.Activation('relu'),
            tf.keras.layers.Dense(1),
            tf.keras.layers.Activation('sigmoid')          
        ])
        self.path_1 = tf.keras.models.Sequential([
            self.feature_extractor,
            self.label_predictor
        ])
        self.path_2 = tf.keras.models.Sequential([
            self.feature_extractor,
            self.label_predictor
        ])
        
        
        self.loss = tf.keras.losses.SparseCategoricalCrossentropy()
        self.loss_2 = tf.keras.losses.SparseCategoricalCrossentropy()
        
        self.optimizer = tf.keras.optimizers.Adam()
        self.optimizer_2 = tf.keras.optimizers.Adam(learning_rate=0.001)
        
        self.train_loss = tf.keras.metrics.Mean()
        self.train_loss_2 = tf.keras.metrics.Mean()
        
        self.train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
        self.train_accuracy_2 = tf.keras.metrics.SparseCategoricalAccuracy()
        
        
        self.test_loss = tf.keras.metrics.Mean()
        self.test_loss_2 = tf.keras.metrics.Mean()
        self.test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
        self.test_accuracy_2 = tf.keras.metrics.SparseCategoricalAccuracy()

    @tf.function
    def train(self, x_train, y_train):
        with tf.GradientTape() as tape:
            y_pred = self.path_1(x_train)
            loss = self.loss(y_train, y_pred)
        gradients = tape.gradient(loss, self.path_1.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.path_1.trainable_variables))

        self.train_loss(loss)
        self.train_accuracy(y_train, y_pred)
        
        return
    
    @tf.function
    def train_2(self, x_train, y_train):
        with tf.GradientTape() as tape:
            y_pred = self.path_2(x_train)
            loss = self.loss_2(y_train, y_pred)
        gradients = tape.gradient(loss, self.path_2.trainable_variables)
        self.optimizer_2.apply_gradients(zip(gradients, self.path_2.trainable_variables))

        self.train_loss_2(loss)
        self.train_accuracy_2(y_train, y_pred)
        
        return
    
    @tf.function
    def test(self, x_test, y_test):
        y_pred = self.path_1(x_test)
        loss = self.loss(y_test, y_pred)

        self.test_loss(loss)
        self.test_accuracy(y_test, y_pred)
        
    @tf.function
    def test_2(self, x_test, y_test):
        y_pred = self.path_2(x_test)
        loss = self.loss_2(y_test, y_pred)

        self.test_loss_2(loss)
        self.test_accuracy_2(y_test, y_pred)

In [3]:
x_train = np.load('../data/mnist/x_train.npy')
y_train = np.load('../data/mnist/y_train.npy')

x_test = np.load('../data/mnist/x_test.npy')
y_test = np.load('../data/mnist/y_test.npy')

# (x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

In [4]:
x_train = x_train - 0.5
x_test = x_test - 0.5

In [5]:
x_train = tf.cast(x_train, tf.float32)
x_test = tf.cast(x_test, tf.float32)
y_train = tf.cast(y_train, tf.float32)
y_test = tf.cast(y_test, tf.float32)

In [6]:
x_train_2 = tf.concat([x_train, x_train + tf.random.normal(x_train.shape, mean=0.0, stddev=.1)], axis=0)
x_test_2 = tf.concat([x_test, x_test + tf.random.normal(x_test.shape, mean=0.0, stddev=.1)], axis=0)

In [7]:
y_train_2 = tf.concat([np.zeros(len(x_train)), np.ones(len(x_train))], axis=0)
y_test_2 = tf.concat([np.zeros(len(x_test)), np.ones(len(x_test))], axis=0)

In [8]:
train_ds = tf.data.Dataset.from_tensor_slices(
    (x_train, y_train)).shuffle(len(x_train)).batch(1000)
test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(1000)

In [9]:
train_ds_2 = tf.data.Dataset.from_tensor_slices(
    (x_train_2, y_train_2)).shuffle(len(x_train_2)).batch(2000)
test_ds_2 = tf.data.Dataset.from_tensor_slices((x_test_2, y_test_2)).batch(2000)

In [10]:
model = MNIST(input_shape=(28, 28, 1))

In [11]:
# EPOCHS = 5

# for epoch in range(EPOCHS):
#     for images, labels in train_ds:
#         model.train(images, labels)

#     for test_images, test_labels in test_ds:
#         model.test(test_images, test_labels)

#     template = '에포크: {}, 손실: {}, 정확도: {}, 테스트 손실: {}, 테스트 정확도: {}'
#     print(template.format(epoch+1,
#                          model.train_loss.result(),
#                          model.train_accuracy.result()*100,
#                          model.test_loss.result(),
#                          model.test_accuracy.result()*100))

In [12]:
EPOCHS = 10

for epoch in range(EPOCHS):
    for (images, labels), (images_2, labels_2) in zip(train_ds, train_ds_2):
        model.train(images, labels)
#         model.train_2(images_2, labels_2)

    for (test_images, test_labels), (test_images_2, test_labels_2) in zip(test_ds, test_ds_2):
        model.test(test_images, test_labels)
#         model.test_2(test_images_2, test_labels_2)

    template = 'Epoch: {}\n' + \
    'L1: {:.4f}, Acc1: {:.2f}, L1 Test: {:.4f}, Acc1 Test: {:.2f}\n'+ \
    'L2: {:.4f}, Acc2: {:.2f}, L2 Test: {:.4f}, Acc2 Test: {:.2f}\n'
    
    print(template.format(epoch+1,
                         model.train_loss.result(),
                         model.train_accuracy.result()*100,
                         model.test_loss.result(),
                         model.test_accuracy.result()*100,
                         model.train_loss_2.result(),
                         model.train_accuracy_2.result()*100,
                         model.test_loss_2.result(),
                         model.test_accuracy_2.result()*100))

Epoch: 1
L1: 0.6989, Acc1: 81.08, L1 Test: 0.1660, Acc1 Test: 95.09
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test: 0.00

Epoch: 2
L1: 0.4150, Acc1: 88.60, L1 Test: 0.1244, Acc1 Test: 96.33
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test: 0.00

Epoch: 3
L1: 0.3042, Acc1: 91.58, L1 Test: 0.1029, Acc1 Test: 96.91
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test: 0.00

Epoch: 4
L1: 0.2439, Acc1: 93.20, L1 Test: 0.0891, Acc1 Test: 97.28
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test: 0.00

Epoch: 5
L1: 0.2057, Acc1: 94.24, L1 Test: 0.0796, Acc1 Test: 97.54
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test: 0.00

Epoch: 6
L1: 0.1787, Acc1: 94.97, L1 Test: 0.0729, Acc1 Test: 97.73
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test: 0.00

Epoch: 7
L1: 0.1586, Acc1: 95.53, L1 Test: 0.0683, Acc1 Test: 97.84
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test: 0.00

Epoch: 8
L1: 0.1431, Acc1: 95.96, L1 Test: 0.0649, Acc1 Test: 97.93
L2: 0.0000, Acc2: 0.00, L2 Test: 0.0000, Acc2 Test

In [None]:
for epoch in range(EPOCHS):
    for (images, labels), (images_2, labels_2) in zip(train_ds, train_ds_2):
        model.train(images, labels)
        model.train_2(images_2, labels_2)

    for (test_images, test_labels), (test_images_2, test_labels_2) in zip(test_ds, test_ds_2):
        model.test(test_images, test_labels)
        model.test_2(test_images_2, test_labels_2)

    template = 'Epoch: {}\n' + \
    'L1: {:.4f}, Acc1: {:.2f}, L1 Test: {:.4f}, Acc1 Test: {:.2f}\n'+ \
    'L2: {:.4f}, Acc2: {:.2f}, L2 Test: {:.4f}, Acc2 Test: {:.2f}\n'
    
    print(template.format(epoch+1,
                         model.train_loss.result(),
                         model.train_accuracy.result()*100,
                         model.test_loss.result(),
                         model.test_accuracy.result()*100,
                         model.train_loss_2.result(),
                         model.train_accuracy_2.result()*100,
                         model.test_loss_2.result(),
                         model.test_accuracy_2.result()*100))

In [None]:
for epoch in range(EPOCHS):
    for (images, labels), (images_2, labels_2) in zip(train_ds, train_ds_2):
        model.train(images, labels)
#         model.train_2(images_2, labels_2)

    for (test_images, test_labels), (test_images_2, test_labels_2) in zip(test_ds, test_ds_2):
        model.test(test_images, test_labels)
        model.test_2(test_images_2, test_labels_2)

    template = 'Epoch: {}\n' + \
    'L1: {:.4f}, Acc1: {:.2f}, L1 Test: {:.4f}, Acc1 Test: {:.2f}\n'+ \
    'L2: {:.4f}, Acc2: {:.2f}, L2 Test: {:.4f}, Acc2 Test: {:.2f}\n'
    
    print(template.format(epoch+1,
                         model.train_loss.result(),
                         model.train_accuracy.result()*100,
                         model.test_loss.result(),
                         model.test_accuracy.result()*100,
                         model.train_loss_2.result(),
                         model.train_accuracy_2.result()*100,
                         model.test_loss_2.result(),
                         model.test_accuracy_2.result()*100))

In [None]:
# class SVHN():

In [None]:
# class GTSRB():