In [2]:
!pip install einops








In [22]:
#구현하는 모델에서 쓰이는 모든 activation함수는 정의하여 드린 GELU 함수를 사용해야함.
#MultiHeadAttention에서 Head로 나눌때, 이미지를 patch로자른후 sequence로 만들때 Rearrange함수를 사용하면 편리함.(사용하지 않으셔도 됩니다)
#CIFAR10에 대한 test accuracy가 60프로 이상인 ViT모델을 만드시오.
import tensorflow as tf
from einops.layers.tensorflow import Rearrange
from tensorflow.keras.activations import gelu
GELU = lambda x : gelu(x)
import math

In [17]:
#논문[1]에서 설명하는 MultiHeadAttention을 만들어라.
class MultiHeadedAttention(tf.keras.Model):
    #dimension - 모델의 dimension(MHA를 거친 후의 dimension)
    def __init__(self, dimension, heads=8):
        super(MultiHeadedAttention, self).__init__()
        ############Write your code Here############
        self.heads = heads
        self.scale = dimension ** -0.5

        self.mlp_in = tf.keras.layers.Dense(dimension * 3, use_bias=False)
        self.mlp_out = tf.keras.layers.Dense(dimension)

        self.rearrange_attention = Rearrange(
            'b n (qkv h d) -> qkv b h n d', qkv=3, h=self.heads)
        self.rearrange_output = Rearrange('b h n d -> b n (h d)')
        
        
        ############################################
    def call(self, inputs):
        output = None
        ############Write your code Here############
        query_key_value = self.mlp_in(inputs)
        query_key_value = self.rearrange_attention(query_key_value)

        query = query_key_value[0]
        key = query_key_value[1]
        value = query_key_value[2]

        dot_product = tf.einsum('bhid,bhjd->bhij', query, key) * self.scale
        attention = tf.nn.softmax(dot_product, axis=-1)

        output = tf.einsum('bhij,bhjd->bhid', attention, value)
        output = self.rearrange_output(output)
        output = self.mlp_out(output)
        
        
        
        
        ############################################
        return output

#인자로 받은 residual_function을 사용하여 real_function값을 return하여주는 Class를 만들어라.(call함수 참고)
class ResidualBlock(tf.keras.Model):
    def __init__(self, residual_function):
        super(ResidualBlock, self).__init__()
        ############Write your code Here############
        self.residual_function = residual_function
        ############################################

    def call(self, inputs):
        return self.residual_function(inputs) + inputs

#인자로 받은 normfunction에 들어가기전에 LayerNormalization을 해주는 Class를 만들어라.(call함수 참고)
class NormalizationBlock(tf.keras.Model):
    def __init__(self, norm_function, epsilon=1e-5):
        super(NormalizationBlock, self).__init__()
        ############Write your code Here############
        self.norm_function = norm_function
        self.normalize = tf.keras.layers.LayerNormalization(epsilon=epsilon)
        ############################################

    def call(self, inputs):
        return self.norm_function(self.normalize(inputs))

#논문[1]에서의 MLPBlock을 만들어라.
class MLPBlock(tf.keras.Model):
    #output_dimension - MLPBlock의 output dimension
    #hidden_dimension - MLPBlock의 hidden layer dimension
    def __init__(self, output_dimension, hidden_dimension):
        super(MLPBlock, self).__init__()
        ############Write your code Here############
        self.mlp_1 = tf.keras.layers.Dense(hidden_dimension)
        self.mlp_2 = tf.keras.layers.Dense(output_dimension)

        ############################################

    def call(self, inputs):
        output = None
        ############Write your code Here############
        y = self.mlp_1(inputs)
        y = GELU(y)
        y = self.mlp_2(y)
        output = GELU(y)
        ############################################
        return output

#논문[1]을 읽고 TransformerEncoder를 위에서 정의한 class들을 사용하여 만들어라.
class TransformerEncoder(tf.keras.Model):
    #dimension - 모델의 dimension(MHA를 거친 후의 dimension), heads - MHA에서 head의 개수
    #depth - encoder layer의 개수, mlp_dimension - MLP block의 hidden layer의 dimension
    def __init__(self, dimension, depth, heads, mlp_dimension): 
        super(TransformerEncoder, self).__init__()
        layers_ = []
        for _ in range(depth):
            ############Write your code Here############
            layers_ += [
                ResidualBlock(
                    NormalizationBlock(
                        dimension,
                        MultiHeadedAttention(
                            dimension, heads=heads
                        )
                    )
                ),
                ResidualBlock(
                    NormalizationBlock(
                        dimension,
                        MLPBlock(dimension, mlp_dimension)
                    )
                )
            ]
            ############################################
        self.layers_ = tf.keras.Sequential(layers_)

    def call(self, inputs):
        return self.layers_(inputs)

#논문[2]를 읽고 ViT모델을 위에서 정의한 class들을 사용하여 만들어라.
class ImageTransformer(tf.keras.Model):
    #image_size - 이미지의 W==H의 크기(int), patch_size - 이미지를 쪼갤 patch의 크기(int)
    #n_classes - 최종 class의 개수, batch_size - 배치사이즈
    #dimension - 모델의 dimension(MHA를 거친 후의 dimension), depth - encoder layer의 개수
    #heads - MHA에서 head의 개수, mlp_dimension - MLP block의 hidden layer의 dimension
    #channel - input image에 대한 channel의 수
    def __init__(
            self, image_size, patch_size, n_classes, batch_size,
            dimension, depth, heads, mlp_dimension, channels=3):
        super(ImageTransformer, self).__init__()
        assert image_size % patch_size == 0, 'invalid patch size for image size'

        num_patches = (image_size // patch_size) ** 2
        self.patch_size = patch_size
        self.dimension = dimension
        self.batch_size = batch_size

        self.positional_embedding = self.add_weight(
            "position_embeddings", shape=[num_patches + 1, dimension],
            initializer=tf.keras.initializers.RandomNormal(), dtype=tf.float32
        )
        self.classification_token = self.add_weight(
            "classification_token", shape=[1, 1, dimension],
            initializer=tf.keras.initializers.RandomNormal(), dtype=tf.float32
        )
        ############Write your code Here############
        self.embedding_mlp = tf.keras.layers.Dense(dimension)
        self.rearrange = Rearrange(
            'b c (h p1) (w p2) -> b (h w) (p1 p2 c)',
            p1=self.patch_size, p2=self.patch_size
        )
        self.transformer = TransformerEncoder(dimension, depth, heads, mlp_dimension)
        self.classification_identity = tf.identity
        self.mlp_1 = tf.keras.layers.Dense(mlp_dimension)
        self.den = tf.keras.layers.Dense(n_classes)
        ############################################
    @tf.function
    def call(self, inputs):
        output = None
        ############Write your code Here############
        shapes = tf.shape(inputs)
        y = self.rearrange(inputs)
        y = self.embedding_mlp(y)
        cls_tokens = tf.broadcast_to(
            self.classification_token,
            (shapes[0], 1, self.dimension)
        )
        y = tf.concat((cls_tokens, inputs), axis=1)
        y += self.positional_embedding
        y = self.transformer(y)
        y = self.classification_identity(y[:, 0])
        y = self.mlp_1(y)
        y = GELU(y)
        
        output = self.den(y)
        ############################################
        return output
    
    
#     def compile(self, loss_fn, **kwargs):
#         super(ImageTransformer, self).compile(**kwargs)
#         self.loss_fn = loss_fn
#         self.train_accuracy = tf.keras.metrics.Accuracy('training_accuracy', dtype=tf.float32)

    
#     def train_step(self, data):
#         def _step(inputs):
#             X, Y = inputs
#             with tf.GradientTape() as tape:
#                 logits = self(X, training=True)
#                 num_labels = tf.shape(logits)[-1]
#                 label_mask = tf.math.logical_not(Y < 0)
#                 label_mask = tf.reshape(label_mask, (-1,))
#                 logits = tf.reshape(logits, (-1, num_labels))
#                 logits_masked = tf.boolean_mask(logits, label_mask)
#                 label_ids = tf.reshape(Y, (-1,))
#                 label_ids_masked = tf.boolean_mask(label_ids, label_mask)
#                 cross_entropy = self.loss_fn(label_ids_masked, logits_masked)
#                 loss = tf.reduce_sum(cross_entropy) * (1.0 / self.batch_size)
#                 y_pred = tf.argmax(tf.nn.softmax(logits, axis=-1), axis=-1)
#                 self.train_accuracy.update_state(tf.squeeze(Y), y_pred)
#             grads = tape.gradient(loss, self.trainable_variables)
#             self.optimizer.apply_gradients(
#                 list(zip(grads, self.trainable_variables)))
#             return cross_entropy

#         total_loss = self.distribute_strategy.reduce(
#             tf.distribute.ReduceOp.SUM,
#             self.distribute_strategy.run(_step, args=(data,)), axis=0
#         )
#         mean_loss = total_loss / self.batch_size
#         return {
#             'total_loss': total_loss,
#             'mean_loss': mean_loss,
#             'train_accuracy': self.train_accuracy.result()
#         }



In [23]:
from tensorflow.keras import datasets
# Download and prepare the CIFAR10 dataset
(train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
# Normalize pixel values to be between 0 and 1
############Write your code Here############
train_images = train_images / 255.
test_images = test_images / 255.
############################################
# Make image shape (BS, H, W, C) to (BS, C, H, W)
############Write your code Here############
(height, width, channels), _ = train_images.shape[1:],train_labels.shape[1:]
train_images = tf.cast(train_images.reshape((-1, channels, height, width)), dtype=tf.float32)
test_images = tf.cast(test_images.reshape((-1, channels, height, width)), dtype=tf.float32)

train_dataset, test_dataset = (tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(train_images),
                                                    tf.data.Dataset.from_tensor_slices(train_labels))),
            tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(test_images),
                                 tf.data.Dataset.from_tensor_slices(test_labels))))

############################################
class Trainer:

    def __init__(self, model, model_config, train_dataset, train_dataset_len, test_dataset, test_dataset_len, config):
        self.train_dataset = train_dataset.batch(config.batch_size)
        self.train_dataset_len = train_dataset_len
        self.test_dataset = test_dataset
        self.test_dataset_len = None
        self.test_dist_dataset = None
        if self.test_dataset:
            self.test_dataset = test_dataset.batch(config.batch_size)
            self.test_dataset_len = test_dataset_len
        self.config = config
        self.tokens = 0
        self.strategy = tf.distribute.OneDeviceStrategy("GPU:0")
        if len(tf.config.list_physical_devices('GPU')) > 1:
            self.strategy = tf.distribute.MirroredStrategy()

        with self.strategy.scope():
            self.model = model(**model_config)
            self.optimizer = tf.keras.optimizers.Adam(learning_rate=config.learning_rate)
            self.cce = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True,reduction=tf.keras.losses.Reduction.NONE)
            self.train_dist_dataset = self.strategy.experimental_distribute_dataset(self.train_dataset)
            if self.test_dataset:
                self.test_dist_dataset = self.strategy.experimental_distribute_dataset(self.test_dataset)

    def save_checkpoints(self):
        if self.config.ckpt_path is not None:
            self.model.save_weights(self.config.ckpt_path)


    def train(self):

        train_loss_metric = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
        test_loss_metric = tf.keras.metrics.Mean('testing_loss', dtype=tf.float32)

        train_accuracy = tf.keras.metrics.Accuracy('training_accuracy', dtype=tf.float32)
        test_accuracy = tf.keras.metrics.Accuracy('testing_accuracy', dtype=tf.float32)

        @tf.function
        def train_step(dist_inputs):

            def step_fn(inputs):

                X, Y = inputs

                with tf.GradientTape() as tape:
                # training=True is only needed if there are layers with different
                # behavior during training versus inference (e.g. Dropout).
                    logits = self.model(X,training=True)
                    num_labels = tf.shape(logits)[-1]
                    label_mask = tf.math.logical_not(Y < 0)
                    label_mask = tf.reshape(label_mask,(-1,))
                    logits = tf.reshape(logits,(-1,num_labels))
                    logits_masked = tf.boolean_mask(logits,label_mask)
                    label_ids = tf.reshape(Y,(-1,))
                    label_ids_masked = tf.boolean_mask(label_ids,label_mask)
                    cross_entropy = self.cce(label_ids_masked, logits_masked)
                    loss = tf.reduce_sum(cross_entropy) * (1.0 / self.config.batch_size)
                    y_pred = tf.argmax(tf.nn.softmax(logits,axis=-1),axis=-1)
                    train_accuracy.update_state(tf.squeeze(Y),y_pred)

                grads = tape.gradient(loss, self.model.trainable_variables)
                self.optimizer.apply_gradients(list(zip(grads, self.model.trainable_variables)))
                return cross_entropy

            per_example_losses = self.strategy.run(step_fn, args=(dist_inputs,))
            sum_loss = self.strategy.reduce(tf.distribute.ReduceOp.SUM, per_example_losses, axis=0)
            mean_loss = sum_loss / self.config.batch_size
            return mean_loss

        @tf.function
        def test_step(dist_inputs):

            def step_fn(inputs):

                X, Y = inputs
                # training=True is only needed if there are layers with different
                # behavior during training versus inference (e.g. Dropout).
                logits = self.model(X,training=False)
                num_labels = tf.shape(logits)[-1]
                label_mask = tf.math.logical_not(Y < 0)
                label_mask = tf.reshape(label_mask,(-1,))
                logits = tf.reshape(logits,(-1,num_labels))
                logits_masked = tf.boolean_mask(logits,label_mask)
                label_ids = tf.reshape(Y,(-1,))
                label_ids_masked = tf.boolean_mask(label_ids,label_mask)
                cross_entropy = self.cce(label_ids_masked, logits_masked)
                loss = tf.reduce_sum(cross_entropy) * (1.0 / self.config.batch_size)
                y_pred = tf.argmax(tf.nn.softmax(logits,axis=-1),axis=-1)
                test_accuracy.update_state(tf.squeeze(Y),y_pred)

                return cross_entropy

            per_example_losses = self.strategy.run(step_fn, args=(dist_inputs,))
            sum_loss = self.strategy.reduce(tf.distribute.ReduceOp.SUM, per_example_losses, axis=0)
            mean_loss = sum_loss / self.config.batch_size
            return mean_loss

        train_pb_max_len = math.ceil(float(self.train_dataset_len)/float(self.config.batch_size))
        test_pb_max_len = math.ceil(float(self.test_dataset_len)/float(self.config.batch_size)) if self.test_dataset else None

        epoch_bar = master_bar(range(self.config.max_epochs))
        with self.strategy.scope():
            for epoch in epoch_bar:
                for inputs in progress_bar(self.train_dist_dataset,total=train_pb_max_len,parent=epoch_bar):
                    loss = train_step(inputs)
                    self.tokens += tf.reduce_sum(tf.cast(inputs[1]>=0,tf.int32)).numpy()
                    train_loss_metric(loss)
                    epoch_bar.child.comment = f'training loss : {train_loss_metric.result()}'
                print(f"epoch {epoch+1}: train loss {train_loss_metric.result():.5f}. train accuracy {train_accuracy.result():.5f}")
                train_loss_metric.reset_states()
                train_accuracy.reset_states()

                if self.test_dist_dataset:
                    for inputs in progress_bar(self.test_dist_dataset,total=test_pb_max_len,parent=epoch_bar):
                        loss = test_step(inputs)
                        test_loss_metric(loss)
                        epoch_bar.child.comment = f'testing loss : {test_loss_metric.result()}'
                    print(f"epoch {epoch+1}: test loss {test_loss_metric.result():.5f}. test accuracy {test_accuracy.result():.5f}")
                    test_loss_metric.reset_states()
                    test_accuracy.reset_states()

                self.save_checkpoints()
class TrainerConfig:
    # optimization parameters
    max_epochs = 10
    batch_size = 64
    learning_rate = 1e-3
    # checkpoint settings
    ckpt_path = None

    def __init__(self, **kwargs):
        for k, v in kwargs.items():
            setattr(self, k, v)                
model_config = {image_size":32,
                "patch_size":4,
                "num_classes":10,
                "dim":64,
                "depth":3,
                "heads":4,
                "mlp_dim":128}
tconf = TrainerConfig(max_epochs=10, batch_size=64, learning_rate=1e-3)
trainer = Trainer(ImageTransformer, model_config, train_dataset, len(train_images), test_dataset, len(test_images), tconf)

trainer.train()
#Initialize your model
#Initialize optimizer and loss and compile it to the model
############Write your code Here############
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
cross_entropy_loss = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction=tf.keras.losses.Reduction.NONE)
model = ImageTransformer(image_size=32, patch_size=4, n_classes=10, batch_size=64,
                         dimension=64, depth=3, heads=4, mlp_dimension=128)
model.compile(optimizer=optimizer, loss_fn=cross_entropy_loss)

############################################

#Train your model
############Write your code Here############
model.fit(train_dataset, batch_size=64, epochs=10)
############################################
print('==============Training Finished===============')

#Evaluate your test samples
accuracy = 0
############Write your code Here############

############################################

print('Test Accuracy :', accuracy)

NameError: name 'master_bar' is not defined