In [1]:
import tensorflow as tf
import numpy as np
from tqdm import tqdm
from matplotlib import pyplot as plt

from itertools import product

In [2]:
# gpu 설정
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

1 Physical GPUs, 1 Logical GPUs


### Load datasets

In [3]:
# Download the mnist dataset using keras
data_train, data_test = tf.keras.datasets.mnist.load_data()

In [4]:
# Parse images and labels
(images_train, labels_train) = data_train
(images_test, labels_test) = data_test

In [5]:
X = np.expand_dims(images_train, -1)
y = labels_train

In [6]:
class CustomDataset(tf.keras.utils.Sequence) :
    def __init__(self, x, y, batch_size) :
        super().__init__()
        x = x.astype('float32')
        y = y.astype('float32')
        
        self.x = [x[i:i+batch_size] for i in range(0, len(x), batch_size)]
        self.y = [y[i:i+batch_size] for i in range(0, len(y), batch_size)]
#         self.x = np.array_split(x, len(x) // batch_size)
#         self.y = np.array_split(y, len(y) // batch_size)
        self.batch_size = batch_size
    
    def __getitem__(self, i):
        return self.x[i], self.y[i]
    
    def __len__(self):
        return len(self.x)

In [7]:
dataset = CustomDataset(X, y, 128)

### CNN

In [8]:
from tensorflow.keras.layers import Layer

In [9]:
from tensorflow.keras import Model

In [10]:
class tf_cnn_module(Model):
    def __init__(self, input_channel, output_channel, kernel_size):
        super().__init__()
        
        self.output_channel = output_channel
        self.kernel_size = (kernel_size, kernel_size)
        
        self.weight = tf.random.uniform([output_channel, *self.kernel_size, input_channel])
        self.bias = tf.random.uniform([output_channel])
        
        self.pad_size = kernel_size // 2
        
        self.n_samples = None
        self.width = None
        self.height = None
        
        self.pad = padding = [[0, 0], [1, 1], [1, 1], [0, 0]]
        
    def _add_pad(self, img):
        n_samples, width, height, channel = img.shape
        self.n_samples, self.width, self.height = n_samples, width, height
        return tf.pad(img, self.pad)

    def _slicing(self, img):
        n_samples, width, height, channel = img.shape

        # product는 뒤부터 작동
        stack = [img[:,
                    w:w+self.kernel_size[0],
                    h:h+self.kernel_size[0],
                    :] for w, h in product(range(width - self.kernel_size[0] + 1), 
                                    range(width - self.kernel_size[0] + 1))
                ]

        return tf.stack(stack, -1 )

    def _weight_product(self, features) :
        stack = []
        for n in range(self.output_channel) : # 8 -> self.output_channel
            w = tf.expand_dims(tf.expand_dims(self.weight[n], axis=0), axis = -1)
            stack.append(tf.math.multiply(w, features))
            
        return tf.reshape(tf.reduce_sum(tf.stack(stack, -1), [1,2,3]), 
                          [self.n_samples, self.width, self.height, self.output_channel]
                         ) + self.bias
    
    def __call__(self, img):
        pad_img = self._add_pad(img)
        sliced_img = self._slicing(pad_img)
        feature = self._weight_product(sliced_img)
        return feature

In [11]:
class CNN(Model):
    def __init__(self, channel_lists, kernel_size, n_class = 10):
        super().__init__()
        
        self.channel_lists = channel_lists
            
        layers = [tf_cnn_module(i, o, kernel_size) for i, o in zip(channel_lists, channel_lists[1:])]
        self.cnn_layers = layers
        
        batch = [tf.keras.layers.BatchNormalization() for _ in channel_lists[1:]]
        self.batch_layers = batch
        
        self.output_layers = tf.keras.layers.Dense(n_class)
        self.a = tf.keras.layers.ReLU()
        self.d = tf.keras.layers.Dropout(0.3)
    def __call__(self, img) : 
        x  = img
        for l, b in zip(self.cnn_layers, self.batch_layers) :
            x = tf.transpose(l(x), perm = [0,3,1,2])
            x = tf.transpose(b(x), perm = [0,2,3,1])
            x = self.a(x)
            x = self.d(x)

        x = tf.math.reduce_max(tf.reshape(x, [img.shape[0], -1, self.channel_lists[-1]]), 1)

#         return self.output_layers(x)
#         return self.softmax(self.output_layers(x))
        return tf.nn.softmax(self.output_layers(x), -1)
    
    def layer_maps(self, img):
        maps = []
        x = img
        for l,b in zip(self.cnn_layers, self.batch_layers):
            x = tf.transpose(l(x), perm = [0,3,1,2])
            maps.append(x)
            x = tf.transpose(b(x), perm = [0,2,3,1])
            x = self.d(x)
        return maps

In [12]:
cnn_model = CNN(
    [1, 8, 16, 32], 3
)

In [13]:
class tf_trainer:
    def __init__(self, model) :
        self.model = model
        self.optimizer = tf.keras.optimizers.Adam()
        self.loss_fn = tf.keras.losses.CategoricalCrossentropy()
#         self.loss_fn = tf.nn.softmax_cross_entropy_with_logits()
    def __training_batch_step__(self, batch) :
        x, y = batch
        with tf.GradientTape() as t:
            y = np.eye(10)[y.astype(int)]
            y_hat = self.model(x)
            loss = self.loss_fn(y, y_hat)
        grad = t.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grad, self.model.trainable_variables))
        
        return loss
    
    def __training_epoch__(self, epoch_numb) :
        loss_lists = []
        TQ = tqdm(dataset)
        
        for n, batch in enumerate(TQ, 1):
            loss_lists.append(self.__training_batch_step__(batch))
            TQ.set_description_str(f'Epoch : {epoch_numb}')
            TQ.set_postfix_str(f'Loss : {sum(loss_lists) / n:.5}')
        return sum(loss_lists) / n
    
    def fit(self, loop_numb) :
        history = dict(
        loss = []
        )
        
        for n in range(loop_numb):
            history['loss'].append(
                self.__training_epoch__(n)
            )
        
        return history

In [14]:
trainer = tf_trainer(cnn_model)

In [15]:
history = trainer.fit(10)

Epoch : 0: 100%|██████████| 469/469 [05:32<00:00,  1.41it/s, Loss : 14.518]
Epoch : 1: 100%|██████████| 469/469 [05:28<00:00,  1.43it/s, Loss : 14.518]
Epoch : 2: 100%|██████████| 469/469 [05:28<00:00,  1.43it/s, Loss : 14.518]
Epoch : 3: 100%|██████████| 469/469 [05:28<00:00,  1.43it/s, Loss : 14.518]
Epoch : 4: 100%|██████████| 469/469 [05:29<00:00,  1.42it/s, Loss : 14.518]
Epoch : 5: 100%|██████████| 469/469 [05:31<00:00,  1.42it/s, Loss : 14.518]
Epoch : 6: 100%|██████████| 469/469 [05:30<00:00,  1.42it/s, Loss : 14.518]
Epoch : 7: 100%|██████████| 469/469 [05:33<00:00,  1.41it/s, Loss : 14.518]
Epoch : 8: 100%|██████████| 469/469 [05:31<00:00,  1.41it/s, Loss : 14.518]
Epoch : 9: 100%|██████████| 469/469 [05:30<00:00,  1.42it/s, Loss : 14.518]
