## Installation

In [None]:
!python -m pip install keras_cv datasets transformers tensorboard tensorflow ipywidgets opencv-python tensorflow-datasets scikit-learn
!git-lfs --version

Login to huggingface if first time

In [None]:
from huggingface_hub import notebook_login

notebook_login()


Show that the GPU is being used

In [None]:
from tensorflow.config.experimental import list_physical_devices
print(list_physical_devices('GPU'))

model_id = "google/vit-base-patch16-224-in21k"

Now create the database, also this is the time to define data augmentation

In [None]:
from transformers import ViTImageProcessor
from datasets import load_dataset
from tensorflow import device
from tensorflow import keras
from tensorflow.keras import layers
from keras_cv.layers import RandAugment
import tensorflow as tf

# Define the model ID and other parameters
num_layers = 2
magnitude = 0.15

# Load the ViTImageProcessor
image_processor = ViTImageProcessor.from_pretrained(model_id)

# Create RandAugment transformation
rand_augment = RandAugment(
    value_range=[-1,1],
    augmentations_per_image=num_layers,
    magnitude=magnitude,
)


def transform(batch):
    inputs = image_processor([x for x in batch["image"]], return_tensors="tf")
    inputs["labels"] = batch["label"]
    return inputs

def augment(batch):
    inputs = image_processor([x for x in batch["image"]], return_tensors="tf")
    transposed = tf.transpose(inputs["pixel_values"], perm=[0,3,2,1])
    with device('/cpu:0'):
        augmented = rand_augment(transposed)
    inputs["pixel_values"] = tf.transpose(augmented, perm=[0,3,2,1])
    inputs["labels"] = batch["label"]
    return inputs

dataset = load_dataset("streetview_images_cropped", data_dir="./")

eval_size=.15
test_size=.05

dataset = dataset["train"].shuffle().train_test_split(test_size=test_size)
dataset_final_test = dataset['test'].with_transform(transform)

dataset = dataset["train"].train_test_split(test_size=eval_size)
dataset['train'] = dataset['train'].with_transform(augment)
dataset['test'] = dataset['test'].with_transform(transform)
processed_dataset = dataset

Specify hyperparameters

In [None]:
from huggingface_hub import HfFolder
import tensorflow as tf

class_labels = processed_dataset['train'].features["label"].names
num_images_train = processed_dataset['train'].num_rows
id2label = {str(i): label for i, label in enumerate(class_labels)}
label2id = {v: k for k, v in id2label.items()}

num_train_epochs = 20
train_batch_size = 32
eval_batch_size = 32
learning_rate = 6e-5
adam_beta1 = 0.85 # 0.9
adam_beta2 = 0.95 # 0.999
weight_decay_rate=0.01
num_warmup_steps=20
output_dir=model_id.split("/")[1]
hub_token = HfFolder.get_token()
hub_model_id = f'dl-au-tamas-jedrek/{model_id.split("/")[1]}-street-view'


In [None]:
import json

with open("data/distances.json", "r") as infile:
    distances = json.load(infile)

#make matrix with label2id
import numpy as np
mat_distances = np.zeros((len(label2id), len(label2id)))
for key in distances.keys():
    for key2 in distances[key].keys():
        mat_distances[int(label2id[key])][int(label2id[key2])] = distances[key][key2]
mat_distances

Get model, specify loss and metrics

In [None]:
import tensorflow as tf
from transformers import TFViTModel

base_model = TFViTModel.from_pretrained(model_id, output_attentions = True)
pixel_values = tf.keras.layers.Input(shape=(3,224,224), name='pixel_values', dtype='float32')
vit = base_model.vit(pixel_values)[0]
classifier = tf.keras.layers.Dense(len(class_labels), activation='softmax', name='outputs')(vit[:, 0, :])
model = tf.keras.Model(inputs=pixel_values, outputs=classifier)

In [None]:
import tensorflow as tf

tensor_distances = tf.convert_to_tensor(mat_distances, dtype=tf.float32)
def calculate_all_distance(y_true, y_pred):
    y_true_indices = tf.range(tf.shape(y_true)[0])
    indexed_distances = tf.gather(tensor_distances, y_true_indices)
    multiplied = tf.multiply(y_pred, indexed_distances)
    dist = tf.reduce_sum(multiplied, axis=1)
    return dist
def calculate_best_distance(y_true, y_pred):
    y_pred_label = tf.argmax(y_pred, axis=1)
    y_true = tf.reshape(y_true, [-1])
    indices = tf.stack((y_true, y_pred_label), axis=1)
    dist = tf.gather_nd(tensor_distances, indices)
    return dist

def customLoss(y_true, y_pred):
    dist = calculate_all_distance(y_true, y_pred)
    return (dist ** 2)/500

#loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
loss = customLoss

def best_distance(y_true, y_pred):
    y_true = tf.cast(y_true, tf.int64)
    dist = calculate_best_distance(y_true, y_pred)
    return tf.reduce_mean(dist)

def all_distance(y_true, y_pred):
    y_true = tf.cast(y_true, tf.int64)
    dist = calculate_all_distance(y_true, y_pred)
    return tf.reduce_mean(dist)


In [None]:
from transformers import create_optimizer

# create optimizer wight weigh decay
num_train_steps = num_images_train * num_train_epochs
optimizer, lr_schedule = create_optimizer(
    init_lr=learning_rate,
    adam_beta1=adam_beta1,
    adam_beta2=adam_beta2,
    num_train_steps=num_train_steps,
    weight_decay_rate=weight_decay_rate,
    num_warmup_steps=num_warmup_steps,

)
def get_lr_metric(optimizer):
    def lr(y_true, y_pred):
        return optimizer._decayed_lr(tf.float32)
    return lr
lr_metric = get_lr_metric(optimizer)

# define metrics 
metrics=[
    tf.keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
    best_distance,
    all_distance,
    tf.keras.metrics.SparseTopKCategoricalAccuracy(3, name="top-3-accuracy"),
    lr_metric,
]

In [None]:
model.layers[0].trainable = True
model.summary()
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

Transform dataset for training

In [None]:
#tf_train_dataset = model.prepare_tf_dataset(processed_dataset['train'], batch_size=train_batch_size, shuffle=True)
#tf_eval_dataset = model.prepare_tf_dataset(processed_dataset['test'], batch_size=eval_batch_size, shuffle=True)
#tf_test_dataset = model.prepare_tf_dataset(dataset_final_test, batch_size=eval_batch_size, shuffle=True)

from transformers import DefaultDataCollator

data_collator = DefaultDataCollator(return_tensors="tf")

tf_train_dataset = processed_dataset["train"].to_tf_dataset(
   columns=['pixel_values'],
   label_cols=["labels"],
   shuffle=True,
   batch_size=train_batch_size,
   collate_fn=data_collator)
tf_eval_dataset = processed_dataset["test"].to_tf_dataset(
    columns=['pixel_values'],
    label_cols=["labels"],
    shuffle=True,
    batch_size=eval_batch_size,
    collate_fn=data_collator)
tf_test_dataset = dataset_final_test.to_tf_dataset(
    columns=['pixel_values'],
    label_cols=["labels"],
    shuffle=True,
    batch_size=eval_batch_size,
    collate_fn=data_collator)

Run to display train images

In [None]:
#import matplotlib.pyplot as plt

#sample_images, sample_labels = next(iter(tf_train_dataset))
#plt.figure(figsize=(10, 10))
#for i, image in enumerate(sample_images[:9]):
#    ax = plt.subplot(3, 3, i + 1)
#    transposed = tf.transpose(image)
#    plt.imshow(transposed.numpy())
#    plt.axis("off")

Push metrics to hub after every epoch

In [None]:
import os
from transformers.keras_callbacks import PushToHubCallback
from tensorflow.keras.callbacks import TensorBoard as TensorboardCallback, EarlyStopping, Callback
from huggingface_hub import push_to_hub_keras

log_dir = os.path.join(output_dir, "logs")
class CustomPushToHubCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        push_to_hub_keras(model, hub_model_id, log_dir=log_dir)

callbacks = []
callbacks.append(TensorboardCallback(log_dir=log_dir))
callbacks.append(CustomPushToHubCallback())
#callbacks.append(EarlyStopping(monitor="val_accuracy",patience=1))
#callbacks.append(PushToHubCallback(
#    output_dir,
#    hub_model_id=hub_model_id,
#    hub_token=hub_token,
#))

Train model

In [None]:
from transformers import logging as transformers_logging
transformers_logging.set_verbosity_info()
train_results = model.fit(
    tf_train_dataset,
    validation_data=tf_eval_dataset,
    callbacks=callbacks,
    epochs=num_train_epochs,
    verbose=1
)

## Attention heatmap

### Create partial model to display

In [None]:
layer_index = 1  # Attention layer index 
attention_model = tf.keras.Model(inputs=model.input, outputs=model.get_layer(index=layer_index).output)

### Prepare an image

In [None]:
one_image = tf_test_dataset.take(1)

# Now, you can iterate through the one_image dataset to get the individual image and label
for image_batch, label_batch in one_image:
    # Assuming image_batch has shape (32, 3, 224, 224) and label_batch has shape (32,)
    # You can select one image from the batch, for example, the first image:
    single_image = image_batch
    single_label = label_batch[0]
    print(f"Loaded {single_label}")


preprocessed_image = tf.transpose(single_image[0])
# De-normalize the image for visual clarity.
in1k_mean = tf.constant([0.485 * 255, 0.456 * 255, 0.406 * 255])
in1k_std = tf.constant([0.229 * 255, 0.224 * 255, 0.225 * 255])
preprocessed_img_orig = (preprocessed_image * in1k_std) + in1k_mean
preprocessed_img_orig = preprocessed_img_orig / 255.0
preprocessed_img_orig = tf.clip_by_value(preprocessed_img_orig, 0.0, 1.0)
preprocessed_img_orig = tf.image.flip_left_right(preprocessed_img_orig)
preprocessed_img_orig = tf.image.rot90(preprocessed_img_orig).numpy()

### Get attention scores

In [None]:
result = attention_model(single_image)
attention_score = result.attentions

### Display attention heatmap

In [None]:
import numpy as np
import cv2
import matplotlib.pyplot as plt

def attention_heatmap(attention_scores, image, model_type="dino"):
    num_tokens = 2 if "distilled" in model_type else 1
    batch_size = 32
    num_heads = 12
    patch_size = 16

    # Process the attention maps for overlay.
    w_featmap = 224 // patch_size
    h_featmap = 224 // patch_size

    # Taking the representations from CLS token.
    attentions = attention_scores[0, :, 1, num_tokens:].numpy().reshape(num_heads, -1)

    # Reshape the attention scores to resemble mini patches.
    attentions = attentions.reshape(num_heads, w_featmap, h_featmap)
    attentions = attentions.transpose((1, 2, 0))

    # Resize the attention patches to 224x224 (224: 14x16).
    attentions = tf.image.resize(
        attentions, size=(224,224)
    ) 
    return attentions

# Generate the attention heatmaps.
attentions = attention_heatmap(attention_score[0], preprocessed_img_orig)

# Plot the maps.
fig, axes = plt.subplots(nrows=3, ncols=4, figsize=(13, 13))
img_count = 0

for i in range(3):
    for j in range(4):
        if img_count < len(attentions):
            axes[i, j].imshow(preprocessed_img_orig)
            axes[i, j].imshow(attentions[..., img_count], cmap="inferno", alpha=0.5)
            axes[i, j].title.set_text(f"Attention head: {img_count}")
            axes[i, j].axis("off")
            img_count += 1

plt.figure(figsize=(10, 10))
ax = plt.subplot(3, 3, 1)
plt.imshow(preprocessed_img_orig)
plt.axis("off")

## Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

predictions = np.array([])
true_labels = np.array([])

for x, y in tf_test_dataset:
    y_prob = model.predict(x)
    # Apply softmax to obtain probabilities
    probabilities = tf.nn.softmax(y_prob.logits, axis=-1).numpy()
    # Get the predicted labels (class with the highest probability)
    y_pred = tf.argmax(probabilities, axis=-1).numpy()

    predictions = np.concatenate([predictions, y_pred])
    print(y_pred)
    print(y.numpy())
    true_labels = np.concatenate([true_labels, y.numpy()])

cm = confusion_matrix(true_labels, predictions)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_labels)
plt.rcParams['figure.dpi'] = 600
plt.rcParams['font.size'] = 1

disp.plot()
plt.show()