In [1]:
import numpy as np
import pandas as pd

import glob, random, os, warnings
import matplotlib.pyplot as plt
from datetime import datetime
from tqdm import tqdm
import seaborn as sns
from time import time
import time as t
import shutil
import cv2

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay


import tensorflow as tf
import tensorflow.keras.layers as L
from tensorflow.keras.layers import *
from tensorflow.keras.preprocessing.image import ImageDataGenerator

#To get the same results in different environments
def seed_everything(seed = 0):
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['TF_DETERMINISTIC_OPS'] = "True"
    os.environ["TF_DISABLE_SEGMENT_REDUCTION_OP_DETERMINISM_EXCEPTIONS"] = "True"

seed_everything()
warnings.filterwarnings('ignore')


print('TensorFlow Version ' + tf.__version__)

TensorFlow Version 2.11.0


In [2]:
# from tensorflow.keras import layers
# from tensorflow import keras
# class ConvolutionalTokenizer(layers.Layer):
#     """
#     Creates Convolutional Tokens of images for feeding to Transformer Encoder.
#     """
#     def __init__(self,kernel_size=3,stride=1,padding=1,pooling_kernel_size=3,pooling_stride=2,conv_layers=2,num_output_channels=[64, 128],**kwargs,):
#         super(ConvolutionalTokenizer, self).__init__(**kwargs)
        
#         # Creating a Sequential Keras Model for Tokenizing images
#         self.conv_model = keras.Sequential()
#         # Created the required number of convolutional layer
#         for i in range(conv_layers):
#             # Adding a conv2d layer with ReLU activation as suggested by authors
#             self.conv_model.add(layers.Conv2D(num_output_channels[i],kernel_size,stride,padding="valid",use_bias=False,activation="relu",kernel_initializer="he_normal"))
#             # Zero Padding
#             self.conv_model.add(layers.ZeroPadding2D(padding))
#             # Pooling over the image with 3x3 kernel having padding='same' and stride=2   
#             self.conv_model.add(layers.MaxPool2D(pooling_kernel_size, pooling_stride, "same"))

#     def call(self, images):
#         # Reshaping the outputs by flattening them
#         outputs = self.conv_model(images)
#         Flattened = tf.reshape(
#             outputs,
#             (-1, tf.shape(outputs)[1] * tf.shape(outputs)[2], tf.shape(outputs)[3]),
#         )
#         return Flattened

#     # Adding Learnable Positional Embeddings
#     def pos_embeddings(self, image_size):
#         inp = tf.ones((1, image_size, image_size, 1))
#         out = self.call(inp)
#         seq_len = tf.shape(out)[1]
#         projection_dim = tf.shape(out)[-1]

#         embed_layer = layers.Embedding(
#             input_dim=seq_len, output_dim=projection_dim
#         )
#         return embed_layer, seq_len

from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf

class ConvolutionalTokenizer(layers.Layer):
    """
    Creates Convolutional Tokens of images for feeding to Transformer Encoder.
    """
    def __init__(self, kernel_size=3, stride=1, padding=1, pooling_kernel_size=3, pooling_stride=2,
                 conv_layers=2, num_output_channels=[64, 128], **kwargs):
        super(ConvolutionalTokenizer, self).__init__(**kwargs)
        
        # Creating a Sequential Keras Model for Tokenizing images
        self.conv_model = keras.Sequential()
        # Creating the required number of convolutional layers
        for i in range(conv_layers):
            # Adding a Conv2D layer with ReLU activation as suggested by authors
            self.conv_model.add(layers.Conv2D(num_output_channels[i], kernel_size, stride,
                                              padding="valid", use_bias=False, activation="relu",
                                              kernel_initializer="he_normal"))
            # Zero Padding
            self.conv_model.add(layers.ZeroPadding2D(padding))
            # Pooling over the image with 3x3 kernel, padding='same' and stride=2   
            self.conv_model.add(layers.MaxPool2D(pooling_kernel_size, pooling_stride, "same"))

    def call(self, images):
        # Pass RGB images through the convolutional model
        outputs = self.conv_model(images)
        # Flatten the output
        flattened = tf.reshape(outputs,
                               (-1, tf.shape(outputs)[1] * tf.shape(outputs)[2], tf.shape(outputs)[3]))
        return flattened

    def pos_embeddings(self, image_size):
        # Ensure the input has 3 channels for RGB
        inp = tf.ones((1, image_size, image_size, 3))  # RGB shape
        out = self.call(inp)
        seq_len = tf.shape(out)[1]
        projection_dim = tf.shape(out)[-1]

        # Define the positional embedding layer
        embed_layer = layers.Embedding(input_dim=seq_len, output_dim=projection_dim)
        return embed_layer, seq_len

In [3]:
def mlp(x, hidden_units, dropout):
    """
    Creates A Feed Forward Network`
    
    Args:
        hidden_units: Number of hidden units in MLP
        dropout: The Rate of dropout which is to be applied.
    """
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout)(x)
    return x

In [4]:
def Transformer_Encoder(L,embedded_patches,num_heads,projection_dim,transformer_units):
    """
    Transformer Encoder Block
    
    Args: 
        L: number of transformer_layers
        
        embedded_patches: Patches from the Convolutional Tokenizer block
        
        num_heads: Number of Attention Heads
        
        projection_dim: Size of each attention head for query and key
        
        transformer_units: hidden units of MLP
    """
    
    
    # Iterating over the number of transformer layers
    for i in range(L):
        # Normalizing the input patches
        norm = layers.LayerNormalization(epsilon=1e-5)(embedded_patches)
        # Feeding to MHA
        attention_output = layers.MultiHeadAttention(num_heads=num_heads, key_dim=projection_dim, dropout=0.1)(norm,norm)
        # Shortcut skip connection
        skip1 = layers.Add()([attention_output, embedded_patches])
        # Normalizing 
        norm2= layers.LayerNormalization(epsilon=1e-5)(skip1)
        
        # Feed Forward MLP
        ffn = mlp(norm2, hidden_units=transformer_units, dropout=0.1)

        # Shortcut skip connection
        embedded_patches = layers.Add()([ffn, skip1])
        
        return embedded_patches

In [5]:
def SeqPool(trans_enco_out):
    """
    Sequence Pooling block.
    
    Args: 
        trans_enco_out: Takes in the Output of transformer encoder block 
    
    Returns:
        A 1xD output to be fed to final classifier
    """
    # Normalizing the output of transformer enocder layer
    normalized = layers.LayerNormalization(epsilon=1e-5)(trans_enco_out)
    # Adding a linear layer
    linear=layers.Dense(1)(normalized)
    # Applying Softmax to the linear layer
    soft = tf.nn.softmax(linear, axis=1)
    # Multiplying the softmax of linear layer with the normalized output of orignal output of the transformer encoder block
    mult = tf.matmul(soft, normalized, transpose_a=True)
    # Squeezing the dimensions
    seq_pool_output = tf.squeeze(mult, -2)

    return seq_pool_output

In [6]:
def CompactConvolutionalTransformer(image_size=224, num_classes=2, input_shape=(224, 224, 3),
                                    projection_dim=128, num_heads=2, L=2, transformer_units=[128, 128]):
    """
    CCT model for RGB input
    
    Args:
        image_size: size of image
        num_classes: Number of classes of output
        input_shape: shape of image (updated for RGB)
        projection_dim: Size of each attention head for query and key
        num_heads: Number of heads of MHA
        L: Number of transformer encoder layers
    
    Returns:
        CCT Model
    """
    
    inputs = layers.Input(input_shape)  # Now expects RGB images with 3 channels
    
    # Convolutional Tokenization Block
    conv_tokenizer = ConvolutionalTokenizer()
    embedded_patches = conv_tokenizer(inputs)

    # Adding positional embedding
    pos_embed, seq_length = conv_tokenizer.pos_embeddings(image_size)
    positions = tf.range(start=0, limit=seq_length, delta=1)
    position_embeddings = pos_embed(positions)
    embedded_patches += position_embeddings
    
    # Transformer Encoder Block with Sequence Pooling
    embedded_patches = Transformer_Encoder(L, embedded_patches, num_heads=num_heads,
                                           projection_dim=projection_dim, transformer_units=transformer_units)
    
    # Sequence Pooling on the output of the transformer encoder
    sequence_pooling = SeqPool(embedded_patches)
    
    # Dense layer for classification
    output = layers.Dense(num_classes, activation='softmax')(sequence_pooling)
    
    model = keras.Model(inputs=inputs, outputs=output)
    return model

In [7]:
cct = CompactConvolutionalTransformer()
print(cct.summary())

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 convolutional_tokenizer (Convo  (None, 3136, 128)   75456       ['input_1[0][0]']                
 lutionalTokenizer)                                                                               
                                                                                                  
 tf.__operators__.add (TFOpLamb  (None, 3136, 128)   0           ['convolutional_tokenizer[0][0]']
 da)                                                                                          

In [8]:
from tensorflow.keras.models import load_model
weights_path = r'E:\Projects\Content-moderation\weights\CCTmodel (1).hdf5'
cct.load_weights(weights_path)


print("HDF5 model weights loaded successfully!")

HDF5 model weights loaded successfully!
