In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.optim.lr_scheduler import _LRScheduler
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

from sklearn import decomposition
from sklearn import manifold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np

import copy
from collections import namedtuple
import os
import random
import shutil
import time

In [None]:
os.environ['KAGGLE_USERNAME'] = 'comsci125teerapong'
os.environ['KAGGLE_KEY'] = 'c4095229aa2a02465c0b05b9152063c6'

!pip install kaggle
!kaggle competitions download -c super-ai-engineer-2021-house-grade-classification

# datasets.utils.extract_archive('/content/super-ai-engineer-2021-house-grade-classification.zip')

In [None]:
!unzip /content/super-ai-engineer-2021-house-grade-classification.zip

In [None]:
import pandas as pd 

address_df = pd.read_csv('/content/train.csv')
address_df

In [None]:
address_df.hist()

In [None]:
class_train = pd.concat([
  address_df[ address_df['class']==0].sample(n=1500,replace=True, random_state=1),
  address_df[ address_df['class']==1].sample(n=1000,replace=True, random_state=1),
  address_df[ address_df['class']==2].sample(n=1000,replace=True, random_state=1),
  address_df[ address_df['class']==3].sample(n=1000,replace=True, random_state=1),
  address_df[ address_df['class']==4].sample(n=1000,replace=True, random_state=1),
  address_df[ address_df['class']==5].sample(n=1000,replace=True, random_state=1),
  ],axis=0)

In [None]:
class_train.hist()

In [None]:
class_train = pd.DataFrame(class_train)
class_train 

In [None]:
class_train.rename(
    columns={"class":"Class",}
          ,inplace=True)

In [None]:
class_train

In [None]:
name_img = class_train.image_name.tolist()
name_img

In [None]:
class_img = class_train.Class.tolist()
class_img

In [None]:
class_img.count(0)

In [None]:
!mkdir '/content/data'
!mkdir '/content/data/train'
!mkdir '/content/data/train/0'
!mkdir '/content/data/train/1'
!mkdir '/content/data/train/2'
!mkdir '/content/data/train/3'
!mkdir '/content/data/train/4'
!mkdir '/content/data/train/5'

In [None]:
from PIL import Image 
import PIL  
dir = '/content/train/'
x = 0
for i in range(6500):
  if class_img[i] == 0:
    picture = Image.open(dir+name_img[i])
    picture = picture.convert('RGB')
    picture.save(f"/content/data/train/0/{class_img[i]}_{i}.jpg")
  if class_img[i] == 1:
    picture = Image.open(dir+name_img[i])
    picture = picture.convert('RGB')
    picture.save(f"/content/data/train/1/{class_img[i]}_{i}.jpg")
  if class_img[i] == 2:
    picture = Image.open(dir+name_img[i])
    picture = picture.convert('RGB')
    picture.save(f"/content/data/train/2/{class_img[i]}_{i}.jpg") 
  if class_img[i] == 3:
    picture = Image.open(dir+name_img[i])
    picture = picture.convert('RGB')
    picture.save(f"/content/data/train/3/{class_img[i]}_{i}.jpg")
  if class_img[i] == 4:
    picture = Image.open(dir+name_img[i])
    picture = picture.convert('RGB')
    picture.save(f"/content/data/train/4/{class_img[i]}_{i}.jpg") 
  if class_img[i] == 5:
    picture = Image.open(dir+name_img[i])
    picture = picture.convert('RGB')
    picture.save(f"/content/data/train/5/{class_img[i]}_{i}.jpg")    


In [None]:
!rm -rf /content/train

In [None]:
import os, random, cv2
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import tensorflow.keras.backend as K 
import tensorflow as tf; print(tf.__version__)
from tensorflow.keras import layers 
from tensorflow import keras 

import matplotlib.pyplot as plt
from numpy.random import rand 
import matplotlib.cm as cm
import numpy as np 

physical_devices = tf.config.list_physical_devices('GPU')
try: 
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    tf.config.optimizer.set_jit(True)
except: 
    pass 

seed = 786
tf.random.set_seed(seed)
np.random.seed(seed)
random.seed(seed)

In [None]:
# import pathlib

# dataset_url = "https://drive.google.com/file/d/1-8OXLkK7Hpq917cwvIRUKUhtxYZf-2Ua/view?usp=sharing"
# data_dir = tf.keras.utils.get_file('train', origin=dataset_url, untar=True)
# data_dir = pathlib.Path(data_dir)
# print(data_dir)
# image_count = len(list(data_dir.glob('*/*.jpg')))
# print('Total Samples: ', image_count)

In [None]:
epochs       = 7 
img_size     = 384
batch_size   = 32
class_number = 6
use_cut_mix  = True
AUTOTUNE = tf.data.AUTOTUNE

from tensorflow import keras

train_ds = keras.utils.image_dataset_from_directory(
    directory='/content/data/train',
    label_mode='categorical',
    batch_size=32,
    image_size=(384, 384))
validation_ds = keras.utils.image_dataset_from_directory(
    directory='/content/data/train',
    label_mode='categorical',
    batch_size=32,
    image_size=(384, 384))

tcls_names, vcls_names = train_ds.class_names , validation_ds.class_names
tcls_names, vcls_names

In [None]:
plt.figure(figsize=(20, 20))
for images, labels in train_ds.take(1):
    print(images.shape, labels.shape)
    
    for i in range(8):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        # plt.title(tcls_names[labels[i]])
        plt.axis("off")

In [None]:
class MixUp(layers.Layer):

    def __init__(self, num_classes, batch_size, mixup_prob=0.88, **kwargs):
        super().__init__(**kwargs)
        self.prob = mixup_prob 
        self.batch_size = batch_size
        self.num_classes = num_classes
    
    def call(self, batch_inputs, training=None):
        bs_images = batch_inputs[0] # ALL Image Samples 
        bs_labels = batch_inputs[1] # ALL Lable Samples 
        
        # Meta info 
        _, height, width, channel = bs_images.get_shape().as_list()
        mixup_images = []; mixup_labels = []
    
        for j in range(self.batch_size):
            # Choose Random Image to MixUp with
            k = tf.cast( tf.random.uniform([], 0, tf.cast(self.batch_size, tf.float32)), tf.int32)

            # Do MixUp with PROBABILITY Defined Above
            p = tf.cast( tf.random.uniform([], 0, 1) <= self.prob, tf.float32)

            # It's beta dist with alpha=1.0
            a = tf.random.uniform([], 0, 1)*p

            # Do MixUp 2 Images 
            img_x = bs_images[j]
            img_y = bs_images[k]
            mixup_images.append( (1-a) * img_x + a * img_y )

            # Do MixUp 2 Labels 
            if len(bs_labels.shape) == 1:
                lbs_x = tf.one_hot(bs_labels[j], self.num_classes)
                lbs_y = tf.one_hot(bs_labels[k], self.num_classes)
            else:
                lbs_x = bs_labels[j]
                lbs_y = bs_labels[k]
                
            lbs_x = tf.cast(lbs_x, tf.float32)
            lbs_y = tf.cast(lbs_y, tf.float32)
            mixup_labels.append( (1-a) * lbs_x + a * lbs_y )

        # Reshape 
        mixup_images = tf.reshape(tf.stack(mixup_images), (-1, height, width, channel))
        mixup_labels = tf.reshape(tf.stack(mixup_labels), (-1, self.num_classes))
        return [mixup_images, mixup_labels]

In [None]:
class CutMix(layers.Layer):

    def __init__(self, num_classes, batch_size, cutmix_prob=0.88, **kwargs):
        super().__init__(**kwargs)
        self.prob = cutmix_prob 
        self.batch_size = batch_size
        self.num_classes = num_classes
        
    def call(self, batch_inputs, training=None):
        bs_images = batch_inputs[0] # ALL Image Samples 
        bs_labels = batch_inputs[1] # ALL Lable Samples 
        # Meta info 
        _, height, width, channel = bs_images.get_shape().as_list()
        img_size = height # TO DO: Support Non-Square Image 
        cutmix_images = []; cutmix_labels = []

        for j in range(self.batch_size):
            # Do CutMix with PROBABILITY Defined Above
            p = tf.cast( tf.random.uniform([],0, 1) <= self.prob, tf.int32)

            # Choose Random Image to CutMix with
            k = tf.cast( tf.random.uniform([], 0, self.batch_size), tf.int32)

            # Choose Random Location 
            x = tf.cast( tf.random.uniform([],0, img_size),tf.int32)
            y = tf.cast( tf.random.uniform([],0, img_size),tf.int32)

            # It's beta dist with alpha=1.0
            b = tf.random.uniform([], 0, 1) 

            w = tf.cast(img_size * tf.math.sqrt(1-b), tf.int32) * p
            ya = tf.math.maximum(0,   y-w//2)
            yb = tf.math.minimum(img_size, y+w//2)
            xa = tf.math.maximum(0,   x-w//2)
            xb = tf.math.minimum(img_size, x+w//2)

            # Do CutMix 
            one    = bs_images[j, ya:yb, 0:xa,        :]
            two    = bs_images[k, ya:yb, xa:xb,       :]
            three  = bs_images[j, ya:yb, xb:img_size, :]
            middle = tf.concat([one, two, three], axis=1)
            img    = tf.concat([bs_images[j, 0:ya, :, :],
                                middle,
                                bs_images[j, yb:img_size, :, :]], axis=0)
            cutmix_images.append(img)

            # MAKE CUTMIX LABEL
            a = tf.cast(w * w / img_size / img_size, tf.float32)
            if len(bs_labels.shape) == 1:
                lab1 = tf.one_hot(bs_labels[j], self.num_classes)
                lab2 = tf.one_hot(bs_labels[k], self.num_classes)
            else:
                lab1 = bs_labels[j]
                lab2 = bs_labels[k]

            cutmix_labels.append((1-a)*lab1 + a*lab2)

        # Reshape 
        cutmix_images = tf.reshape(tf.stack(cutmix_images), (-1, height, width, channel))
        cutmix_labels = tf.reshape(tf.stack(cutmix_labels), (-1, self.num_classes))
        return [cutmix_images, cutmix_labels]

In [None]:
class RandomMixUpCutMix(layers.Layer):
    def __init__(self, num_classes, batch_size, switch_prob=0.10, mixup_prob=0.1, cutmix_prob=0.1, **kwargs):
        super().__init__(**kwargs)
        self.mixup_prob = mixup_prob   # mixup probability 
        self.cutmix_prob = cutmix_prob # cutmix probability 
        self.switch_prob = switch_prob # probability of switching between mixup and cutmix 
        self.mixup  = CutMix(num_classes, batch_size=batch_size, cutmix_prob=cutmix_prob)
        self.cutmix = MixUp(num_classes, batch_size=batch_size, mixup_prob=mixup_prob)
        
    def call(self, batch_inputs, training=None):
        if training: 
            bs_images = batch_inputs[0] # ALL Image Samples 
            bs_labels = batch_inputs[1] # ALL Lable Samples 
            return tf.cond(
                tf.less(
                    tf.random.uniform([], minval=0, maxval=1, dtype=tf.float32), 
                    tf.cast(self.switch_prob, tf.float32)),
                    lambda: self.mixup([bs_images, bs_labels]), lambda: self.cutmix([bs_images, bs_labels]))
        else:
            return batch_inputs

In [None]:
# for train set : augmentation 
keras_aug = keras.Sequential(
     [ 
          # layers.RandomFlip("horizontal_and_vertical"),
          layers.RandomZoom(.2, .3)
          # layers.Rescaling(1./255),
          # layers.RandomRotation((0.1, 0.2), fill_mode="reflect")
        
    ]
)

# train_ds = train_ds.shuffle(10 * batch_size)
# train_ds = train_ds.map(lambda x, y: (keras_aug(x), y), num_parallel_calls=AUTOTUNE)
# train_ds = train_ds.map(lambda x, y: RandomMixUpCutMix(len(tcls_names), 
#                                                        batch_size)([x, y], training=True), 
#                         num_parallel_calls=AUTOTUNE)
# train_ds = train_ds.prefetch(buffer_size=AUTOTUNE)

In [None]:
for images, labels in train_ds.take(5):
    print(images.shape, labels.shape)
    plt.figure(figsize=(20, 20))
    for i in range(8):
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(labels[i].numpy())
        plt.axis("off")
    plt.show()

In [None]:
# def k_hot(x, y): 
#     return x, tf.one_hot(y, class_number)

# val_ds = validation_ds.map(k_hot) if use_cut_mix else val_ds
# val_ds = validation_ds.prefetch(buffer_size=AUTOTUNE)

plt.figure(figsize=(20, 20))
for images, labels in validation_ds.take(1):
    print(images.shape, labels.shape)
    for i in range(9):
        augmented_images = keras_aug(images)
        ax = plt.subplot(3, 3, i + 1)
        plt.imshow(augmented_images[0].numpy().astype("uint64"))
        plt.axis("off")

In [None]:
patch_size      = (4,4)  # 4-by-4 sized patches
dropout_rate    = 0.5     # Dropout rate
num_heads       = 16       # Attention heads  #Test 24
embed_dim       = 64      # Embedding dimension
num_mlp         = 128     # MLP layer size
qkv_bias        = True    # Convert embedded patches to query, key, and values with a learnable additive value
window_size     = 2       # Size of attention window
shift_size      = 1       # Size of shifting window
image_dimension = 24      # Initial image size / Input size of the transformer model 

num_patch_x = image_dimension // patch_size[0]
num_patch_y = image_dimension // patch_size[1]

In [None]:
def window_partition(x, window_size):
    _, height, width, channels = x.shape
    patch_num_y = height // window_size
    patch_num_x = width // window_size
    x = tf.reshape(
        x, shape=(-1, patch_num_y, window_size, patch_num_x, window_size, channels)
    )
    x = tf.transpose(x, (0, 1, 3, 2, 4, 5))
    windows = tf.reshape(x, shape=(-1, window_size, window_size, channels))
    return windows


def window_reverse(windows, window_size, height, width, channels):
    patch_num_y = height // window_size
    patch_num_x = width // window_size
    x = tf.reshape(
        windows,
        shape=(-1, patch_num_y, patch_num_x, window_size, window_size, channels),
    )
    x = tf.transpose(x, perm=(0, 1, 3, 2, 4, 5))
    x = tf.reshape(x, shape=(-1, height, width, channels))
    return x


class DropPath(layers.Layer):
    def __init__(self, drop_prob=None, **kwargs):
        super(DropPath, self).__init__(**kwargs)
        self.drop_prob = drop_prob

    def call(self, x):
        input_shape = tf.shape(x)
        batch_size = input_shape[0]
        rank = x.shape.rank
        shape = (batch_size,) + (1,) * (rank - 1)
        random_tensor = (1 - self.drop_prob) + tf.random.uniform(shape, dtype=x.dtype)
        path_mask = tf.floor(random_tensor)
        output = tf.math.divide(x, 1 - self.drop_prob) * path_mask
        return output

In [None]:
class WindowAttention(layers.Layer):
    def __init__(
        self, dim, window_size, num_heads, qkv_bias=True, dropout_rate=0.0, **kwargs
    ):
        super(WindowAttention, self).__init__(**kwargs)
        self.dim = dim
        self.window_size = window_size
        self.num_heads = num_heads
        self.scale = (dim // num_heads) ** -0.5
        self.qkv = layers.Dense(dim * 3, use_bias=qkv_bias)
        self.dropout = layers.Dropout(dropout_rate)
        self.proj = layers.Dense(dim)

    def build(self, input_shape):
        num_window_elements = (2 * self.window_size[0] - 1) * (
            2 * self.window_size[1] - 1
        )
        self.relative_position_bias_table = self.add_weight(
            shape=(num_window_elements, self.num_heads),
            initializer=tf.initializers.Zeros(),
            trainable=True,
        )
        coords_h = np.arange(self.window_size[0])
        coords_w = np.arange(self.window_size[1])
        coords_matrix = np.meshgrid(coords_h, coords_w, indexing="ij")
        coords = np.stack(coords_matrix)
        coords_flatten = coords.reshape(2, -1)
        relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]
        relative_coords = relative_coords.transpose([1, 2, 0])
        relative_coords[:, :, 0] += self.window_size[0] - 1
        relative_coords[:, :, 1] += self.window_size[1] - 1
        relative_coords[:, :, 0] *= 2 * self.window_size[1] - 1
        relative_position_index = relative_coords.sum(-1)

        self.relative_position_index = tf.Variable(
            initial_value=tf.convert_to_tensor(relative_position_index), trainable=False
        )

    def call(self, x, mask=None):
        _, size, channels = x.shape
        head_dim = channels // self.num_heads
        x_qkv = self.qkv(x)
        x_qkv = tf.reshape(x_qkv, shape=(-1, size, 3, self.num_heads, head_dim))
        x_qkv = tf.transpose(x_qkv, perm=(2, 0, 3, 1, 4))
        q, k, v = x_qkv[0], x_qkv[1], x_qkv[2]
        q = q * self.scale
        k = tf.transpose(k, perm=(0, 1, 3, 2))
        attn = q @ k

        num_window_elements = self.window_size[0] * self.window_size[1]
        relative_position_index_flat = tf.reshape(
            self.relative_position_index, shape=(-1,)
        )
        relative_position_bias = tf.gather(
            self.relative_position_bias_table, relative_position_index_flat
        )
        relative_position_bias = tf.reshape(
            relative_position_bias, shape=(num_window_elements, num_window_elements, -1)
        )
        relative_position_bias = tf.transpose(relative_position_bias, perm=(2, 0, 1))
        attn = attn + tf.expand_dims(relative_position_bias, axis=0)

        if mask is not None:
            nW = mask.get_shape()[0]
            mask_float = tf.cast(
                tf.expand_dims(tf.expand_dims(mask, axis=1), axis=0), tf.float32
            )
            attn = (
                tf.reshape(attn, shape=(-1, nW, self.num_heads, size, size))
                + mask_float
            )
            attn = tf.reshape(attn, shape=(-1, self.num_heads, size, size))
            attn = keras.activations.softmax(attn, axis=-1)
        else:
            attn = keras.activations.softmax(attn, axis=-1)
        attn = self.dropout(attn)

        x_qkv = attn @ v
        x_qkv = tf.transpose(x_qkv, perm=(0, 2, 1, 3))
        x_qkv = tf.reshape(x_qkv, shape=(-1, size, channels))
        x_qkv = self.proj(x_qkv)
        x_qkv = self.dropout(x_qkv)
        return x_qkv

In [None]:
class SwinTransformer(layers.Layer):
    def __init__(
        self, 
        dim,
        num_patch,
        num_heads,
        window_size=7,
        shift_size=0,
        num_mlp=1024,
        qkv_bias=True,
        dropout_rate=0.0,
        **kwargs,
    ):
        super(SwinTransformer, self).__init__(**kwargs)

        self.dim = dim  # number of input dimensions
        self.num_patch = num_patch  # number of embedded patches
        self.num_heads = num_heads  # number of attention heads
        self.window_size = window_size  # size of window
        self.shift_size = shift_size  # size of window shift
        self.num_mlp = num_mlp  # number of MLP nodes

        self.norm1 = layers.LayerNormalization(epsilon=1e-5)
        self.attn = WindowAttention(
            dim,
            window_size=(self.window_size, self.window_size),
            num_heads=num_heads,
            qkv_bias=qkv_bias,
            dropout_rate=dropout_rate,
        )
        self.drop_path = DropPath(dropout_rate)
        self.norm2 = layers.LayerNormalization(epsilon=1e-5)

        self.mlp = keras.Sequential(
            [
                layers.Dense(num_mlp),
                layers.Activation(keras.activations.gelu),
                layers.Dropout(dropout_rate),
                layers.Dense(dim),
                layers.Dropout(dropout_rate),
            ]
        )

        if min(self.num_patch) < self.window_size:
            self.shift_size = 0
            self.window_size = min(self.num_patch)

    def build(self, input_shape):
        if self.shift_size == 0:
            self.attn_mask = None
        else:
            height, width = self.num_patch
            h_slices = (
                slice(0, -self.window_size),
                slice(-self.window_size, -self.shift_size),
                slice(-self.shift_size, None),
            )
            w_slices = (
                slice(0, -self.window_size),
                slice(-self.window_size, -self.shift_size),
                slice(-self.shift_size, None),
            )
            mask_array = np.zeros((1, height, width, 1))
            count = 0
            for h in h_slices:
                for w in w_slices:
                    mask_array[:, h, w, :] = count
                    count += 1
            mask_array = tf.convert_to_tensor(mask_array)

            # mask array to windows
            mask_windows = window_partition(mask_array, self.window_size)
            mask_windows = tf.reshape(
                mask_windows, shape=[-1, self.window_size * self.window_size]
            )
            attn_mask = tf.expand_dims(mask_windows, axis=1) - tf.expand_dims(
                mask_windows, axis=2
            )
            attn_mask = tf.where(attn_mask != 0, -100.0, attn_mask)
            attn_mask = tf.where(attn_mask == 0, 0.0, attn_mask)
            self.attn_mask = tf.Variable(initial_value=attn_mask, trainable=False)

    def call(self, x):
        height, width = self.num_patch
        _, num_patches_before, channels = x.shape
        x_skip = x
        x = self.norm1(x)
        x = tf.reshape(x, shape=(-1, height, width, channels))
        if self.shift_size > 0:
            shifted_x = tf.roll(
                x, shift=[-self.shift_size, -self.shift_size], axis=[1, 2]
            )
        else:
            shifted_x = x

        x_windows = window_partition(shifted_x, self.window_size)
        x_windows = tf.reshape(
            x_windows, shape=(-1, self.window_size * self.window_size, channels)
        )
        attn_windows = self.attn(x_windows, mask=self.attn_mask)

        attn_windows = tf.reshape(
            attn_windows, shape=(-1, self.window_size, self.window_size, channels)
        )
        shifted_x = window_reverse(
            attn_windows, self.window_size, height, width, channels
        )
        if self.shift_size > 0:
            x = tf.roll(
                shifted_x, shift=[self.shift_size, self.shift_size], axis=[1, 2]
            )
        else:
            x = shifted_x

        x = tf.reshape(x, shape=(-1, height * width, channels))
        x = self.drop_path(x)
        x = x_skip + x
        x_skip = x
        x = self.norm2(x)
        x = self.mlp(x)
        x = self.drop_path(x)
        x = x_skip + x
        return x

In [None]:
class PatchExtract(layers.Layer):
    def __init__(self, patch_size, **kwargs):
        super(PatchExtract, self).__init__(**kwargs)
        self.patch_size_x = patch_size[0]
        self.patch_size_y = patch_size[0]

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=(1, self.patch_size_x, self.patch_size_y, 1),
            strides=(1, self.patch_size_x, self.patch_size_y, 1),
            rates=(1, 1, 1, 1),
            padding="VALID",
        )
        patch_dim = patches.shape[-1]
        patch_num = patches.shape[1]
        return tf.reshape(patches, (batch_size, patch_num * patch_num, patch_dim))


class PatchEmbedding(layers.Layer):
    def __init__(self, num_patch, embed_dim, **kwargs):
        super(PatchEmbedding, self).__init__(**kwargs)
        self.num_patch = num_patch
        self.proj = layers.Dense(embed_dim)
        self.pos_embed = layers.Embedding(input_dim=num_patch, output_dim=embed_dim)

    def call(self, patch):
        pos = tf.range(start=0, limit=self.num_patch, delta=1)
        return self.proj(patch) + self.pos_embed(pos)


class PatchMerging(tf.keras.layers.Layer):
    def __init__(self, num_patch, embed_dim):
        super(PatchMerging, self).__init__()
        self.num_patch = num_patch
        self.embed_dim = embed_dim
        self.linear_trans = layers.Dense(2 * embed_dim, use_bias=False)

    def call(self, x):
        height, width = self.num_patch
        _, _, C = x.get_shape().as_list()
        x = tf.reshape(x, shape=(-1, height, width, C))
        feat_maps = x
     
        x0 = x[:, 0::2, 0::2, :]
        x1 = x[:, 1::2, 0::2, :]
        x2 = x[:, 0::2, 1::2, :]
        x3 = x[:, 1::2, 1::2, :]
        x = tf.concat((x0, x1, x2, x3), axis=-1)
        x = tf.reshape(x, shape=(-1, (height // 2) * (width // 2), 4 * C))
        return self.linear_trans(x), feat_maps

In [None]:
import numpy as np 
import tensorflow as tf 
from tensorflow import keras 
from tensorflow.keras import Model, Sequential, Input, layers, applications

In [None]:
class HybridSwinTransformer(keras.Model):
    def __init__(self):
        super(HybridSwinTransformer, self).__init__()
        # base models 
        self.inputx = keras.Input((img_size, img_size, 3), name='input_hybrids')
        base = applications.EfficientNetB0(
            include_top=False,
            # weights=BASE_WEIGHTS + ADV_PROB[0],
            input_tensor=self.inputx
        )
        
        # base model with compatible output which will be an input of transformer model 
        self.new_base = keras.Model(
            [base.inputs], 
            [
                base.get_layer('block6a_expand_activation').output, 
                base.output
            ], # output with 192 feat_maps
            name='efficientnet'
        )
        
        # stuff of swin transformers 
        self.patch_extract  = PatchExtract(patch_size)
        self.patch_embedds  = PatchEmbedding(num_patch_x * num_patch_y, embed_dim)
        self.patch_merging  = PatchMerging((num_patch_x, num_patch_y), embed_dim=embed_dim)
        
        # swin blocks containers 
        self.swin_sequences = keras.Sequential(name='swin_blocks')
        for i in range(shift_size):
            self.swin_sequences.add(
                SwinTransformer(
                    dim=embed_dim,
                    num_patch=(num_patch_x, num_patch_y),
                    num_heads=num_heads,
                    window_size=window_size,
                    shift_size=i,
                    num_mlp=num_mlp,
                    qkv_bias=qkv_bias,
                    dropout_rate=dropout_rate
                )
            )
        
        # swin block's head
        self.swin_head = keras.Sequential(
            [
                layers.GlobalAveragePooling1D(),
                layers.AlphaDropout(0.5),
                layers.BatchNormalization(),
            ], name='swin_head'
        )
        
        # base model's (cnn model) head
        self.conv_head = keras.Sequential(
            [
                layers.GlobalAveragePooling2D(),
                layers.AlphaDropout(0.5),
            ], name='conv_head'
        )
        
        # classifier
        self.classifier = layers.Dense(class_number, dtype='float32')
        
        
    def call(self, inputs, training=None, **kwargs):
        x , base_gcam_top = self.new_base(inputs)
        x = self.patch_extract(x)
        x = self.patch_embedds(x)
        x = self.swin_sequences(x)
        x, swin_gcam_top = self.patch_merging(x)
        
        swin_top = self.swin_head(x)
        conv_top = self.conv_head(base_gcam_top)
        preds = self.classifier(tf.concat([swin_top, conv_top], axis=-1))
        
        if training: # training phase 
            return preds
        else: # inference phase
            return preds, base_gcam_top, swin_gcam_top

    def build_graph(self):
        x = keras.Input(shape=(img_size, img_size, 3))
        return keras.Model(inputs=[x], outputs=self.call(x))
    
keras.backend.clear_session()
model = HybridSwinTransformer()
print(model(tf.ones((2, img_size, img_size, 3)))[0].shape)
display(keras.utils.plot_model(model.build_graph(), 
                               show_shapes=True,
                               show_layer_names=True, 
                               expand_nested=False))
model.build_graph().summary()

In [None]:
!pip install tensorflow_addons

In [None]:
from tensorflow.keras import losses, optimizers , metrics
from tensorflow.keras import callbacks
from tensorflow_addons import optimizers as tfa_optimizers

rlr = callbacks.ReduceLROnPlateau(monitor="val_accuracy", factor=0.3, patience=2)
ckp = callbacks.ModelCheckpoint('model.h5', 
                                monitor="val_accuracy", 
                                verbose=1,
                                save_best_only=True, 
                                save_weights_only=True,
                                mode="max")

In [None]:
# compile and run 
if use_cut_mix:
    loss_fn = losses.CategoricalCrossentropy(label_smoothing = 0.01, from_logits=True) 
else:
    loss_fn = losses.SparseCategoricalCrossentropy(from_logits=True) 

In [None]:
# compile and run
model.compile(
    loss=loss_fn,
    optimizer=tfa_optimizers.AdamW(learning_rate=1e-4, weight_decay=0.00001), 
    metrics=['accuracy']
)

# ISSUE: ValueError: Unable to create dataset (name already exists)
# [ugly workaround!]
for i in range(len(model.weights)):
    model.weights[i]._handle_name = model.weights[i].name +  str(i)

In [None]:
# training 
history = model.fit(train_ds, 
                    epochs=12,
                    callbacks=[ckp, rlr], 
                    validation_data=validation_ds, 
                    verbose=0)

In [None]:
import cv2 as cv 
import numpy as np 
import matplotlib.pyplot as plt 
from tensorflow.keras import datasets , layers , models

In [None]:
img = cv.imread('/content/test/3cb7e6f8.jpg')
imgre = cv.resize(img,(384,384))
imgre = cv.cvtColor(imgre, cv.COLOR_RGB2BGR)
img = cv.cvtColor(img, cv.COLOR_RGB2BGR)

preds, base_top, swin_top = model.predict(np.array([imgre]))
pred_index = tf.argmax(preds[0])
pred_index.numpy()
# index = np.argmax(predict[0])
# classname = class_name[index]

cv.putText(img, f'{pred_index}', (20, 30), cv.FONT_HERSHEY_PLAIN, 2, (0,0,255),2)
plt.imshow(img)

In [None]:
submiss_name = pd.read_csv('/content/sample_submission.csv')
submiss_name

In [None]:
name_id = submiss_name['Id']
name_id = pd.DataFrame(name_id)
name_id = name_id.values.tolist()
name_id

In [None]:
from PIL import Image
import glob
image_list = []
predict_list = []
x = 1 
# glob.glob('/content/test/*.jpg'): #assuming gif

for i in range(1550):
      filename = '/content/test/'+ name_id[i][0] + ".jpg"
      img = cv.imread(filename)
      if img is not None:
        x = x+1
        imgre = cv.resize(img,(384,384))
        imgre = cv.cvtColor(imgre, cv.COLOR_RGB2BGR)
      # img = cv.cvtColor(img, cv.COLOR_RGB2BGR)

        preds, base_top, swin_top = model.predict(np.array([imgre]))
        pred_index = tf.argmax(preds[0])
        predict_list.append(pred_index.numpy())
      else:
        predict_list.append(0)



In [None]:
x

In [None]:
len(predict_list)

In [None]:
x = np.reshape(predict_list,(1,1550)).T
x

In [None]:
import pandas as pd
df_class = pd.DataFrame(x)
df_class

In [None]:
len(name_id)

In [None]:
import pandas as pd
df_id = pd.DataFrame(name_id)
df_id

In [None]:
data_pred = pd.concat([df_id, df_class], axis=1)

data_pred.columns = ['Id', 'Predicted']
data_pred

In [None]:
data_pred.to_csv('predict_test.csv', index=False)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp /content/model.h5 /content/drive/MyDrive

In [None]:
##############################################################################

In [None]:
def plot_stuff(inputs, features_a, features_b):
    plt.figure(figsize=(25, 25))
    
    plt.subplot(1, 3, 1)
    plt.axis('off')
    plt.imshow(tf.squeeze(inputs/255, axis=0))
    plt.title('Input')
    
    plt.subplot(1, 3, 2)
    plt.axis('off')
    plt.imshow(features_a)
    plt.title('CNN')
    
    plt.subplot(1, 3, 3)
    plt.axis('off')
    plt.imshow(features_b)
    plt.title('Hybrid-CNN-Transformer')
    plt.show()

# ref: https://keras.io/examples/vision/grad_cam/
def get_img_array(img):
    array = keras.utils.img_to_array(img)
    array = np.expand_dims(array, axis=0)
    return array

# ref: https://keras.io/examples/vision/grad_cam/
def make_gradcam_heatmap(img_array, grad_model, pred_index=None):
    with tf.GradientTape(persistent=True) as tape:
        preds, base_top, swin_top = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]
        
    grads = tape.gradient(class_channel, base_top)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    base_top = base_top[0]
    heatmap_a = base_top @ pooled_grads[..., tf.newaxis]
    heatmap_a = tf.squeeze(heatmap_a)
    heatmap_a = tf.maximum(heatmap_a, 0) / tf.math.reduce_max(heatmap_a)
    heatmap_a = heatmap_a.numpy()
    
    grads = tape.gradient(class_channel, swin_top)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    swin_top = swin_top[0]
    heatmap_b = swin_top @ pooled_grads[..., tf.newaxis]
    heatmap_b = tf.squeeze(heatmap_b)
    heatmap_b = tf.maximum(heatmap_b, 0) / tf.math.reduce_max(heatmap_b)
    heatmap_b = heatmap_b.numpy()
    return heatmap_a, heatmap_b

In [None]:
# load save weight
model.load_weights('/content/model.h5')

# Prepare image
img_arrays = next(iter(validation_ds))[0]; print(img_arrays.shape)

# plot utils
for img_array in img_arrays:
    # Generate class activation heatmap
    img_array = get_img_array(img_array)
    cnn_heatmap, swin_heatmap = make_gradcam_heatmap(img_array, model) 
    print(cnn_heatmap.shape, cnn_heatmap.max(), cnn_heatmap.min())
    print(swin_heatmap.shape, swin_heatmap.max(), swin_heatmap.min())
    
    # Display heatmap
    plot_stuff(img_array, cnn_heatmap, swin_heatmap)

In [None]:
# ref: https://keras.io/examples/vision/grad_cam/
def save_and_display_gradcam(img, 
                             heatmap, 
                             target=None, 
                             pred=None,
                             cam_path="cam.jpg",  
                             alpha=0.6, 
                             plot=None):
    # Rescale heatmap to a range 0-255
    heatmap = np.uint8(255 * heatmap)

    # Use jet colormap to colorize heatmap
    jet = cm.get_cmap("jet") 

    # Use RGB values of the colormap
    jet_colors  = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # Create an image with RGB colorized heatmap
    jet_heatmap = keras.utils.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[0], img.shape[1]))
    jet_heatmap = keras.utils.img_to_array(jet_heatmap)

    # Superimpose the heatmap on original image
    superimposed_img = img + jet_heatmap * alpha
    superimposed_img = keras.utils.array_to_img(superimposed_img)
    return superimposed_img

In [None]:
samples, labels = next(iter(validation_ds))

for sample, label in zip(samples, labels):
    # preparing 
    img_array = sample[tf.newaxis, ...] 
    
    # get heatmaps 
    heatmap_a, heatmap_b = make_gradcam_heatmap(img_array, model)
    
    # overaly heatmap and input sample 
    overaly_a = save_and_display_gradcam(sample, heatmap_a)
    overlay_b = save_and_display_gradcam(sample, heatmap_b)
    
    # ploting stuff 
    plot_stuff(img_array, overaly_a, overlay_b)

In [None]:
!cp /content/model.h5 /content/drive/MyDrive


In [None]:
################### Test  Model Load Weight  ############################

In [None]:
model.load_weights('/content/drive/MyDrive/model.h5')

img = cv.imread('/content/test/3cb7e6f8.jpg')
imgre = cv.resize(img,(384,384))
imgre = cv.cvtColor(imgre, cv.COLOR_RGB2BGR)
img = cv.cvtColor(img, cv.COLOR_RGB2BGR)

preds, base_top, swin_top = model.predict(np.array([imgre]))
pred_index = tf.argmax(preds[0])
pred_index.numpy()
# index = np.argmax(predict[0])
# classname = class_name[index]

cv.putText(img, f'{pred_index}', (20, 30), cv.FONT_HERSHEY_PLAIN, 2, (0,0,255),2)
plt.imshow(img)


In [None]:
##############################################################################3