In [33]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

from sklearn.tree import DecisionTreeClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import numpy as np
import pickle

import matplotlib.pyplot as plt
from tensorflow.keras import layers, models
from tensorflow.keras.utils import to_categorical
from tensorflow import keras
from keras import models
from keras.models import Sequential
from keras.optimizers import Adam
import tensorflow as tf
import os
import copy

In [34]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 128
num_epochs = 5
image_size = 72
num_heads = 4
projection_dim = 64
transformer_units = [
    projection_dim * 2,
    projection_dim
]
transformer_layers = 8
mlp_head_units = [2048, 1048] 

num_classes = 10
input_shape=(32,32,3)

In [35]:
def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation = tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x

In [36]:
class Patches(layers.Layer):
    def __init__(self, patch_size):
        super(Patches, self).__init__()
        self.patch_size = patch_size

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images = images,
            sizes = [1, self.patch_size, self.patch_size, 1],
            strides = [1, self.patch_size, self.patch_size, 1],
            rates = [1, 1, 1, 1],
            padding = "VALID", 
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches

In [37]:
# plt.figure(figsize=(4,4))
# image = x_train[np.random.choice(range(x_train.shape[0]))]
# plt.imshow(image.astype("uint8"))
# plt.axis("off")

# resized_image = tf.image.resize(
#     tf.convert_to_tensor([image]), size = (image_size, image_size)
# )
# patches = Patches(patch_size)(resized_image) 
# print(f"Patch size : {patch_size} X {patch_size}")
# print(f"patches per image :{patches.shape[1]}")
# print(f"Elements per Patch: {patches.shape[-1]}")

# n = int(np.sqrt(patches.shape[1]))
# plt.figure(figsize = (4,4))
# for i,patch in enumerate(patches[0]):
#     ax = plt.subplot(n, n, i+1)
#     patch_img = tf.reshape(patch, (patch_size, patch_size, 3))
#     plt.imshow(patch_img.numpy().astype("uint8"))
#     plt.axis("off")

In [38]:
class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units = projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim = num_patches, output_dim = projection_dim
        )
    def call(self, patch):
        positions = tf.range(start = 0, limit = self.num_patches, delta = 1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded
        

In [39]:
def create_vit_classifier(input_patch_size, input_data_augmentation):
    inputs = layers.Input(shape=input_shape)
    #augmented data
    augmented = input_data_augmentation(inputs)
    #create patches
    patches = Patches(input_patch_size)(augmented)
    # encode patches
    num_patches = (image_size // input_patch_size) ** 2
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer Block
    for _ in range(transformer_layers):
        # layer normalisation
        x1 = layers.LayerNormalization(epsilon= 1e-6)(encoded_patches)
        # Create a multi -  head attention layer
        attention_output = layers.MultiHeadAttention(
            num_heads = num_heads,
            key_dim = projection_dim,
            dropout = 0.1
        )(x1, x1)
        # Skip connection 1
        x2 = layers.Add()([attention_output, encoded_patches])
        #layer normalisation
        x3 = layers.LayerNormalization(epsilon = 1e-6)(x2)
        # MLP
        x2 = mlp(x3, hidden_units = transformer_units, dropout_rate = 0.1)
        # skip connection2
        encoded_patches =  layers.Add()([x3, x2])
    # Create a [batch_size, projection_dim] tensor
    representation = layers.LayerNormalization(epsilon = 1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.5)(representation)

    # Add MLP 
    features = mlp(representation, hidden_units = mlp_head_units, dropout_rate = 0.5)
    #classify ouputs
    logits = layers.Dense(num_classes)(features)
    # Create the Keras Model
    model = keras.Model(inputs = inputs, outputs = logits)
    return model

In [40]:
# # combined
# X = np.concatenate((x_train, x_test), axis=0)
# y = np.concatenate((y_train, y_test), axis=0)

# print("Shapes during training:")
# print("x_train shape:", X.shape)
# print("y_train shape:", y.shape)

In [41]:
train_accuracy_list = []

test_accuracy_list  = []
test_precision_list  = []
test_recall_list  = []

In [42]:
print("hello")

hello


In [48]:
patch_size_configs = [4, 6, 8, 12, 18]

In [49]:
def build_ViT_model(input_patch_size, input_data_augmentation):
    
    model = create_vit_classifier(input_patch_size, input_data_augmentation)
    optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy")
        ]
    )

    return model

In [50]:
def get_processed_dataset(dir):
    train_npy = np.load(dir)
    label_npy = np.load('train_labels.npy')

    return train_npy, label_npy

In [51]:
ORIGIN = 'origin_cifar10.npy'
SMALL = 'small_noise_cifar10.npy'
MEDIUM = 'medium_noise_cifar10.npy'
LARGE = 'large_noise_cifar10.npy'

In [None]:
outer_cv = KFold(n_splits=5, shuffle=True, random_state=42)
best_params = {}
performance_results = []

x_combined, y_combined = get_processed_dataset(ORIGIN)

data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size)
    ],
    name = "data_augmentation"
)
data_augmentation.layers[0].adapt(x_combined)

for outer_train_idx, outer_val_idx in outer_cv.split(x_combined):
    X_outer_train, X_outer_val = x_combined[outer_train_idx], x_combined[outer_val_idx]
    y_outer_train, y_outer_val = y_combined[outer_train_idx], y_combined[outer_val_idx]

    best_score = -np.inf
    best_patch_size = None

    for patch_size in patch_size_configs:
        inner_cv = KFold(n_splits=5, shuffle=True, random_state=42)
        scores = []
        
        for inner_train_idx, inner_val_idx in inner_cv.split(X_outer_train):
            X_inner_train, X_inner_val = X_outer_train[inner_train_idx], X_outer_train[inner_val_idx]
            y_inner_train, y_inner_val = y_outer_train[inner_train_idx], y_outer_train[inner_val_idx]
            
            model = build_ViT_model(patch_size, data_augmentation)
            history = model.fit(
                x=X_inner_train,
                y=y_inner_train,
                batch_size=batch_size,
                epochs=num_epochs,
                validation_split=0.1,
                validation_data=(X_inner_val, y_inner_val)
            )
            
            score = model.evaluate(X_inner_val, y_inner_val, verbose=0)[0] 
            scores.append(score)

        mean_score = np.mean(scores)
        
        if mean_score > best_score:
            best_score = mean_score
            best_patch_size = patch_size

    best_params[outer_val_idx[0]] = best_patch_size
    final_model = build_ViT_model(best_patch_size)
    final_model.fit(X_outer_train, y_outer_train, epochs=5, batch_size=128, verbose=0)
    final_performance = final_model.evaluate(X_outer_val, y_outer_val)[0]
    performance_results.append(final_performance)

average_performance = np.mean(performance_results)
print(f'Average Performance across all outer folds: {average_performance}')
print(f'Best Parameters for each fold: {best_params}')

Epoch 1/5

In [32]:
# kfold = KFold(n_splits=5, shuffle=True, random_state=42)

# for train_index, test_index in kfold.split(X):

#     x_train, x_test = X[train_index], X[test_index]
#     y_train, y_test = y[train_index], y[test_index]
    
#     ### data 
#     ### model
    
#     model = build_ViT_model(patch_size_config)

#     optimizer = tfa.optimizers.AdamW(
#         learning_rate=learning_rate, weight_decay=weight_decay
#     )

#     model.compile(
#         optimizer=optimizer,
#         loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
#         metrics=[
#             keras.metrics.SparseCategoricalAccuracy(name="accuracy")
#         ]
#     )

#     checkpoint_filepath = "./tmp/checkpoint"
#     checkpoint_callback = keras.callbacks.ModelCheckpoint(
#         checkpoint_filepath,
#         monitor="val_accuracy",
#         save_best_only=True,
#         save_weights_only=True
#     )

#     history = model.fit(
#         x=x_train,
#         y=y_train,
#         batch_size=batch_size,
#         epochs=num_epochs,
#         validation_split=0.1,
#         callbacks=[checkpoint_callback],
#     )
#     model.load_weights(checkpoint_filepath)
    
#     # train performance
#     train_accuracy = history.history['accuracy'][-1]
    
#     # test performance
#     out = model.predict(x_test)
#     probabilities = tf.nn.softmax(out).numpy()
#     predicted_classes = np.argmax(probabilities, axis=1)

#     test_accuracy = accuracy_score(y_test, predicted_classes)
#     test_precision = precision_score(y_test, predicted_classes, average='macro', zero_division=0) 
#     test_recall = recall_score(y_test, predicted_classes, average='macro', zero_division=0) 

#     train_accuracy_list.append(train_accuracy)
#     test_accuracy_list.append(test_accuracy)
#     test_precision_list.append(test_precision)
#     test_recall_list.append(test_recall)

#     print("train_accuracy", train_accuracy)
#     print("test_accuracy", test_accuracy)
#     print("test_precision", test_precision)
#     print("test_recall", test_recall)

Epoch 1/5
 50/338 [===>..........................] - ETA: 5:58 - loss: 2.6847 - accuracy: 0.1947

KeyboardInterrupt: 

In [14]:
# # mean
# average_accuracy = np.mean(train_accuracy_list)
# average_precision = np.mean(test_precision_list)
# average_recall = np.mean(test_recall_list)

# # print mean
# print(f"Average train Accuracy: {average_accuracy:.3f}")
# print(f"Average test Precision: {average_precision:.3f}")
# print(f"Average test Recall: {average_recall:.3f}")

Average train Accuracy: 0.483
Average test Precision: 0.085
Average test Recall: 0.140
