In [None]:
!pip install transformers datasets tensorflow_datasets opencv-python

In [None]:
import keras.optimizers
import numpy as np
%matplotlib inline
from transformers import AutoImageProcessor, ViTImageProcessor
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
import os
import cv2
import datasets
from transformers import DefaultDataCollator
from transformers import TFViTForImageClassification, create_optimizer, TFCvtForImageClassification
from transformers import CvtConfig, CvtModel
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sn


print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
def create_image_folder_dataset(root_path):
    """creates `Dataset` from image folder structure"""

    # get class names by folders names
    _CLASS_NAMES = os.listdir(root_path)
    # defines `datasets` features`
    features = datasets.Features({
        "img": datasets.Image(),
        "label": datasets.features.ClassLabel(names=_CLASS_NAMES),
    })
    # temp list holding datapoints for creation
    img_data_files = []
    label_data_files = []
    # load images into list for creation
    for img_class in os.listdir(root_path):
        for img in os.listdir(os.path.join(root_path, img_class)):
            path_ = os.path.join(root_path, img_class, img)
            img_data_files.append(path_)
            label_data_files.append(img_class)
    # create dataset
    ds = datasets.Dataset.from_dict({"img": img_data_files, "label": label_data_files}, features=features)
    return ds


train_imgs = create_image_folder_dataset("drive/MyDrive/project1/train")
img_class_labels = train_imgs.features["label"].names

In [None]:
model_id = "google/vit-base-patch16-224-in21k"
#model_id = "microsoft/cvt-13"
feature_extractor = ViTImageProcessor.from_pretrained(model_id)
#feature_extractor.size = {"shortest_edge":512,}
# learn more about data augmentation here: https://www.tensorflow.org/tutorials/images/data_augmentation
data_augmentation = keras.Sequential(
    [
        layers.Resizing(1024, 1024),
        layers.CenterCrop(900, 900),
        layers.experimental.preprocessing.RandomCrop(300, 300),
        layers.RandomBrightness(factor=0.2),
        layers.RandomContrast(factor=0.2),

        layers.Rescaling(1/255),
        layers.Resizing(224, 224),
        #layers.RandomZoom(height_factor=(0,0.15), width_factor=(0,0.15), fill_mode="constant", ),
        layers.RandomFlip("horizontal_and_vertical"),

        layers.RandomRotation(factor=0.2, fill_mode="constant", fill_value=0),
    ],
    name="data_augmentation",
)
data_resizing = keras.Sequential(
    [
        layers.Resizing(1024, 1024),
        layers.CenterCrop(900, 900),
        layers.experimental.preprocessing.RandomCrop(450, 450),
        layers.Rescaling(1/255),
        layers.Resizing(224, 224),
    ],
    name="data_resizing",
)
def load_ben_color(image):
    sigmaX=10
    #image = cv2.resize(image, (1024, 1024))
    image=cv2.addWeighted(image, 4, cv2.GaussianBlur(image, (0,0), sigmaX) ,-4 ,128)
    return image
# use keras image data augementation processing
def augmentation(examples):

    inputs = {"pixel_values":[data_augmentation(np.array(load_ben_color(np.array(img)))) for img in examples['img']], "labels":examples["label"]}
    inputs["pixel_values"] = np.array(inputs["pixel_values"]).swapaxes(1,3)
    #raise Exception(str(tf.reduce_min(inputs["pixel_values"])) + " " + str(tf.reduce_max(inputs["pixel_values"])))

    return inputs


# basic processing (only resizing)
def process(examples):
    inputs = {"pixel_values":[data_resizing(np.array(load_ben_color(np.array(img)))) for img in examples['img']], "labels":examples["label"]}
    inputs["pixel_values"] = np.array(inputs["pixel_values"]).swapaxes(1,3)
    return inputs

In [None]:
test_size = .1
train_val_set = train_imgs.train_test_split(test_size=test_size)
train_val_set["test"] = train_val_set["test"].with_transform(process)
train_val_set["train"] = train_val_set["train"].with_transform(augmentation)
#train_val_set["train"] = train_val_set["train"].with_transform(augmentation)

from transformers import TFViTForImageClassification

labels = train_val_set['train'].features['label'].names

model = TFViTForImageClassification.from_pretrained(
    model_id,
    num_labels=len(labels),
    id2label={str(i): c for i, c in enumerate(labels)},
    label2id={c: str(i) for i, c in enumerate(labels)}
)

#model = TFCvtForImageClassification.from_pretrained(
#    model_id,
#    )
#model.classifier = tf.keras.layers.Dense(5)
#model.num_labels = 5

In [None]:
num_train_epochs = 5
train_batch_size = 32
eval_batch_size = 32
learning_rate = 0.00003
weight_decay_rate = 0.01
num_warmup_steps = 0
output_dir = model_id.split("/")[1]
hub_model_id = f'{model_id.split("/")[1]}-eyes'
fp16 = True
data_collator = DefaultDataCollator(return_tensors="tf")

# converting our train dataset to tf.data.Dataset
tf_train_dataset = train_val_set["train"].to_tf_dataset(
    columns=['pixel_values'],
    label_cols=["labels"],
    shuffle=True,
    batch_size=train_batch_size,
    collate_fn=data_collator)

# converting our test dataset to tf.data.Dataset
tf_eval_dataset = train_val_set["test"].to_tf_dataset(
    columns=['pixel_values'],
    label_cols=["labels"],
    shuffle=True,
    batch_size=eval_batch_size,
    collate_fn=data_collator)

In [None]:
"""for x, y in tf_train_dataset:
    print(np.array(x[0]).min(), np.array(x[0]).max())
    test = np.array(x[0])
    plt.imshow(test.swapaxes(0,2))
    plt.show()"""

In [None]:
class model_per_epoch(keras.callbacks.Callback):
    def __init__(self, model,filepath,save_best_only):
        self.filepath=filepath
        self.model=model
        self.save_best_only=save_best_only
        self.lowest_loss=np.inf
        self.best_weights=self.model.get_weights()
    def on_epoch_end(self,epoch, logs=None):
        v_loss=logs.get('val_loss')
        if v_loss< self.lowest_loss:
            self.lowest_loss =v_loss
            self.best_weights=self.model.get_weights()
            self.best_epoch=epoch +1
            self.model.set_weights(self.best_weights)
            name= str(self.best_epoch) +'-' + str(self.lowest_loss)[:str(self.lowest_loss).rfind('.')+3] + '.h5'
            file_id=os.path.join(self.filepath, name)
            self.model.save_weights(file_id)
        if self.save_best_only==False:
            name= str(epoch) +'-' + str(v_loss)[:str(v_loss).rfind('.')+3] + '.h5'
            file_id=os.path.join(self.filepath, name)
            self.model.save(file_id)
    def on_train_end(self, logs=None):
        if self.save_best_only == True:
            self.model.set_weights(self.best_weights)
            name= str(self.best_epoch) +'-' + str(self.lowest_loss)[:str(self.lowest_loss).rfind('.')+3] + '.h5'
            file_id=os.path.join(self.filepath, name)
            self.model.save_weights(file_id)
            print(' model is returned with best weights from epoch ', self.best_epoch)

save_dir=r'drive/MyDrive/project1/'

In [None]:
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
num_train_steps = len(tf_train_dataset) * num_train_epochs
optimizer, lr_schedule = create_optimizer(
    init_lr=learning_rate,
    num_train_steps=num_train_steps,
    weight_decay_rate=weight_decay_rate,
    num_warmup_steps=num_warmup_steps,
)

# define metrics
metrics=[
    tf.keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
]
callbacks=[tf.keras.callbacks.EarlyStopping(patience=2),
           model_per_epoch(model, save_dir, True)]
model.compile(optimizer=optimizer,
              loss=loss,
              metrics=metrics,)

In [None]:
history = model.fit(
    tf_train_dataset.prefetch(20),
    validation_data=tf_eval_dataset.prefetch(20),
    callbacks=callbacks,
    epochs=100,
)

In [None]:
test_imgs = create_image_folder_dataset("drive/MyDrive/project1/test")
test_imgs = test_imgs.with_transform(process)
test_imgs = test_imgs.to_tf_dataset(
    columns=['pixel_values'],
    label_cols=["labels"],
    shuffle=False,
    batch_size=1,
    collate_fn=data_collator)

In [None]:
y_pred = model.predict(test_imgs)

In [None]:
y_pred_ensemble = np.empty((y_pred.shape[0],10))
y_pred_ensemble[:,0] = y_pred
for i in range(1,10):
  y_pred_ensemble[:,i] = np.argmax(model.predict(test_imgs).logits, axis=1)

In [None]:
y_pred = []
for row in y_pred_ensemble.astype("int64"):
  y_pred.append(np.argmax(np.bincount(row)))
y_pred = np.array(y_pred)

In [None]:
testset = list(test_imgs)
y = np.concatenate([testset[n][1] for n in range(0, len(testset))])

In [None]:
ax = sn.heatmap(confusion_matrix(y,y_pred), xticklabels=[0, 1, 2, 3, 4], yticklabels=[0, 1, 2, 3, 4], annot=True)
ax.set(xlabel="True Class", ylabel="Predicted Class")
plt.show()

In [None]:
model.save_weights("drive/MyDrive/project1/transformer_weights.h5")