In [None]:
!pip install keras-pos-embd
!pip install tensorflow-addons

Collecting keras-pos-embd
  Downloading keras-pos-embd-0.12.0.tar.gz (6.0 kB)
Building wheels for collected packages: keras-pos-embd
  Building wheel for keras-pos-embd (setup.py) ... [?25l[?25hdone
  Created wheel for keras-pos-embd: filename=keras_pos_embd-0.12.0-py3-none-any.whl size=7470 sha256=dc3a22eef32d07d8ad17eeb7fbb90fb15e9c0493a9c482d5afd570d81aca251b
  Stored in directory: /root/.cache/pip/wheels/77/99/fd/dd98f4876c3ebbef7aab0dbfbd37bca41d7db37d3a28b2cb09
Successfully built keras-pos-embd
Installing collected packages: keras-pos-embd
Successfully installed keras-pos-embd-0.12.0
Collecting tensorflow-addons
  Downloading tensorflow_addons-0.13.0-cp37-cp37m-manylinux2010_x86_64.whl (679 kB)
[K     |████████████████████████████████| 679 kB 28.2 MB/s 
Installing collected packages: tensorflow-addons
Successfully installed tensorflow-addons-0.13.0


In [None]:
#importing libraries
from __future__ import print_function
from functools import reduce
import json
import os
import re
import tarfile
import tempfile
import keras
import keras.backend as K
import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.layers import merge, recurrent, Dense, Input, Dropout, TimeDistributed, concatenate, Layer
from keras.layers.embeddings import Embedding
from keras.layers.normalization import BatchNormalization
from keras.layers.wrappers import Bidirectional
from keras.models import Model
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.text import Tokenizer
from keras.regularizers import l2
from keras_pos_embd import TrigPosEmbedding
from keras.utils import np_utils
import numpy as np
from tensorflow.keras import layers
from tensorflow import keras
import tensorflow_addons as tfa
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
np.random.seed(1337)

###*Common functions*

In [None]:
#loading dataset
num_classes = 10
input_shape = (32, 32, 3)
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
print(f"x_Train shape = {x_train.shape}, y_train shape = {y_train.shape}, x_test shape = {x_test.shape}, y_test shape = {y_test.shape}")

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz
x_Train shape = (50000, 32, 32, 3), y_train shape = (50000, 10), x_test shape = (10000, 32, 32, 3), y_test shape = (10000, 10)


In [None]:
positional_emb = True
conv_layers = 2
projection_dim = 128
image_size = 64  # We'll resize input images to this size.
patch_size = 8  # Size of the patches to be extracted from the input images.
num_patches = (image_size // patch_size) ** 2 
num_heads = 2
transformer_units = [
    projection_dim,
    projection_dim,
]
transformer_layers = 2
stochastic_depth_rate = 0.1

learning_rate = 0.001
weight_decay = 0.0001
batch_size = 128
num_epochs = 30
image_size = 32

In [None]:
#function for multi-layer perceptron
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

In [None]:
class Patches(layers.Layer):
    def __init__(self, patch_size, num_patches):
        super(Patches, self).__init__()
        self.patch_size = patch_size
        self.num_patches = num_patches

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, self.patch_size, self.patch_size, 1],
            strides=[1, self.patch_size, self.patch_size, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, self.num_patches, patch_dims])
        return patches

In [None]:
class MLPMixerLayer(layers.Layer):
    def __init__(self, num_patches, hidden_units, dropout_rate, *args, **kwargs):
        super(MLPMixerLayer, self).__init__(*args, **kwargs)

        self.mlp1 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=num_patches),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.mlp2 = keras.Sequential(
            [
                layers.Dense(units=num_patches),
                tfa.layers.GELU(),
                layers.Dense(units=embedding_dim),
                layers.Dropout(rate=dropout_rate),
            ]
        )
        self.normalize = layers.LayerNormalization(epsilon=1e-6)

    def call(self, inputs):
        # Apply layer normalization.
        x = self.normalize(inputs)
        # Transpose inputs from [num_batches, num_patches, hidden_units] to [num_batches, hidden_units, num_patches].
        x_channels = tf.linalg.matrix_transpose(x)
        # Apply mlp1 on each channel independently.
        mlp1_outputs = self.mlp1(x_channels)
        # Transpose mlp1_outputs from [num_batches, hidden_dim, num_patches] to [num_batches, num_patches, hidden_units].
        mlp1_outputs = tf.linalg.matrix_transpose(mlp1_outputs)
        # Add skip connection.
        x = mlp1_outputs + inputs
        # Apply layer normalization.
        x_patches = self.normalize(x)
        # Apply mlp2 on each patch independtenly.
        mlp2_outputs = self.mlp2(x_patches)
        # Add skip connection.
        x = x + mlp2_outputs
        return x

In [None]:
# Referred from: github.com:rwightman/pytorch-image-models.
class StochasticDepth(layers.Layer):
    def __init__(self, drop_prop, **kwargs):
        super(StochasticDepth, self).__init__(**kwargs)
        self.drop_prob = drop_prop

    def call(self, x, training=None):
        if training:
            keep_prob = 1 - self.drop_prob
            shape = (tf.shape(x)[0],) + (1,) * (len(tf.shape(x)) - 1)
            random_tensor = keep_prob + tf.random.uniform(shape, 0, 1)
            random_tensor = tf.floor(random_tensor)
            return (x / keep_prob) * random_tensor
        return x

In [None]:
# data augmentation
augmentation = keras.Sequential(
    [
        layers.experimental.preprocessing.Rescaling(scale=1.0 / 255),
        layers.experimental.preprocessing.RandomCrop(image_size, image_size),
        layers.experimental.preprocessing.RandomFlip("horizontal"),
    ],
    name="data_augmentation",
)

### *Compact Convolutional Transformer (CCT)*

In [None]:
#tokenizer consists of convolutional layers
#for convolutional tokenization 
# inputs => embed_to_patches/conv_layer=> linear_projection/pooling => reshape
positional_emb = True
conv_layers = 2
projection_dim = 128

num_heads = 2
transformer_units = [
    projection_dim,
    projection_dim,
]
transformer_layers = 2
stochastic_depth_rate = 0.1

learning_rate = 0.001
weight_decay = 0.0001
batch_size = 128
num_epochs = 30
image_size = 32
class Tokenizer(layers.Layer):
    def __init__(
        self,
        kernel_size=3,
        stride=1,
        padding=1,
        pooling_kernel_size=3,
        pooling_stride=2,
        num_conv_layers=conv_layers,
        num_output_channels=[64, 128],
        positional_emb=positional_emb,
        **kwargs,
    ):
        super(Tokenizer, self).__init__(**kwargs)

        # convolutional layers
        self.conv_model = keras.Sequential()
        for i in range(num_conv_layers):
            self.conv_model.add(
                layers.Conv2D(
                    num_output_channels[i],
                    kernel_size,
                    stride,
                    padding="valid",
                    use_bias=False,
                    activation="relu",
                    kernel_initializer="he_normal",
                )
            )
            
            self.conv_model.add(layers.ZeroPadding2D(padding))
            #linear pooling 
            self.conv_model.add(
                layers.MaxPool2D(pooling_kernel_size, pooling_stride, "same")
            )

        self.positional_emb = positional_emb
        
    #reshape 
    def call(self, images):
        outputs = self.conv_model(images)
        # After passing the images through our mini-network the spatial dimensions
        # are flattened to form sequences.
        reshaped = tf.reshape(
            outputs,
            (-1, tf.shape(outputs)[1] * tf.shape(outputs)[2], tf.shape(outputs)[-1]),
        )
        return reshaped

    #position embedding(optinal)
    #calculating the number of sequences and initialize an embedding layer which is learned
    def positional_embedding(self, image_size):
        # calculating the number of sequences and initialize an embedding layer to
        # compute the positional embeddings later
        if self.positional_emb:
            dummy_inputs = tf.ones((1, image_size, image_size, 3))
            dummy_outputs = self.call(dummy_inputs)
            sequence_length = tf.shape(dummy_outputs)[1]
            projection_dim = tf.shape(dummy_outputs)[-1]


            embed_layer = layers.Embedding(
                input_dim=sequence_length, output_dim=projection_dim
            )
            return embed_layer, sequence_length

            # embed_layer = keras.models.Sequential()

            # embed_layer.add(TrigPosEmbedding(
            #     input_dim=sequence_length, 
            #     output_dim=projection_dim,
            #     mode = TrigPosEmbedding.MODE_EXPAND,
            # ))
            # return embed_layer, sequence_length
        else:
            return None

In [None]:
#transformer with sequence pooling
embedding_dim = 256  # Number of hidden units.
def transformer(
    image_size = image_size,
    input_shape = input_shape,
    num_heads = num_heads,
    projection_dim = projection_dim,
    transformer_units = transformer_units

):
    inputs = layers.Input(input_shape)
    #augmentation
    augmented = augmentation(inputs)
    #tokenization and encoding patches
    tokens = Tokenizer()
    encoded_patches = tokens(augmented)

    #adding positional embedding
    if positional_emb:
        embed_layer, seq_len = tokens.positional_embedding(image_size)
        positions = tf.range(start=0, limit=seq_len, delta=1)
        positional_embeddings = embed_layer(positions)
        encoded_patches += positional_embeddings
    # Calculate Stochastic Depth probabilities.
    dpr = [x for x in np.linspace(0, stochastic_depth_rate, transformer_layers)]

    #creating layers from transformer block

    for i in range(transformer_layers):
        #layer normalization
        x1 = layers.LayerNormalization(epsilon=1e-5)(encoded_patches)

        #create a mult-head attention
        attention_output = layers.MultiHeadAttention(num_heads = num_heads, key_dim=projection_dim, dropout=0.1)(x1, x1)
        #skip connection
        x2 = layers.Add()([attention_output, encoded_patches])
        #layer normalization 2
        x3 = layers.LayerNormalization(epsilon=1e-5)(x2)
        # MLP.
        # x3 = mlp(x3, hidden_units=transformer_units, dropout_rate=0.1)

        #skip connection 2
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    #apply sequence pooling
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    attention_weights = tf.nn.softmax(layers.Dense(1)(representation), axis=1)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)
    

    # Add MLP.
    features = keras.Sequential(
    [MLPMixerLayer(num_patches, embedding_dim, dropout_rate=0.1) for _ in range(1)]
)
    #sequence pooling 
    representation = layers.LayerNormalization(epsilon=1e-5)(encoded_patches)
    attention_weights = tf.nn.softmax(layers.Dense(1)(representation), axis=1)
    weighted_representation = tf.matmul(
        attention_weights, representation, transpose_a=True
    )
    weighted_representation = tf.squeeze(weighted_representation, -2)
    # Classify outputs.
    logits = layers.Dense(num_classes)(weighted_representation)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model



In [None]:
#model training 

def training(model, x_train, y_train):
    optimizer = tfa.optimizers.AdamW(learning_rate=0.001, weight_decay=0.0001)
    filepath = "/sample_data/tmp/checkpoint"
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.CategoricalCrossentropy(
            from_logits=True, label_smoothing=0.1
        ),
        metrics=[
            keras.metrics.CategoricalAccuracy(name="accuracy"),
            keras.metrics.TopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )
    es = keras.callbacks.ModelCheckpoint(
        filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )

    history = model.fit(x = x_train, y = y_train, batch_size = batch_size, epochs = num_epochs, validation_split= 0.1, callbacks = [es])
    _, accuracy, top_5_accuracy = model.evaluate(x_test, y_test)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")
    print(f"Test top 5 accuracy: {round(top_5_accuracy * 100, 2)}%")

    return history


In [None]:
model = transformer()
train = training(model, x_train, y_train)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30

In [None]:
#visualizing progress
plt.plot(history.history["loss"], label="train_loss")
plt.plot(history.history["val_loss"], label="val_loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Train and Validation Losses Over Epochs", fontsize=14)
plt.legend()
plt.grid()
plt.show()

In [None]:
#visualizing progress
plt.plot(history.history["accuracy"], label="accuracy")
plt.plot(history.history["val_accuracy"], label="val_accuracy")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Train and Validation Accuracy Over Epochs", fontsize=14)
plt.legend()
plt.grid()
plt.show()