In [19]:
import os
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from patchify import patchify
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping

In [56]:
""" Hyperparameters """
hp = {}
hp["image_size"] = 256
hp["num_channels"] = 3
hp["patch_size"] = 16
hp["num_patches"] = (hp["image_size"]**2) // (hp["patch_size"]**2)
hp["flat_patches_shape"] = (hp["num_patches"], hp["patch_size"]*hp["patch_size"]*hp["num_channels"])

hp["batch_size"] = 16
hp["lr"] = 1e-4
hp["num_epochs"] = 5
hp["num_classes"] = 10
hp["class_names"] = ['Bacterial_Spot',
 'Curl_Virus',
 'Early_Blight',
 'Healthy',
 'Late_Blight',
 'Leaf_Mold',
 'Mosaic_Virus',
 'Septoria_Leaf_Spot',
 'Spider_Mites',
 'Target_Spot']

# cannot handle this hyperparameter 
# hp["num_layers"] = 12
# hp["hidden_dim"] = 768
# hp["mlp_dim"] = 3072
# hp["num_heads"] = 12
# hp["dropout_rate"] = 0.1

hp["num_layers"] = 6
hp["hidden_dim"] = 256
hp["mlp_dim"] = 1024
hp["num_heads"] = 6
hp["dropout_rate"] = 0.1


In [21]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

In [22]:
# split the images into train,validate and test. 

def load_data(path, split=0.1):
    images = shuffle(glob(os.path.join(path, "*", "*.jpg")))

    split_size = int(len(images) * split)
    train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
    train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)

    return train_x, valid_x, test_x

In [23]:
def process_image_label(path):
    """ Reading images """
    path = path.decode()
    image = cv2.imread(path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (hp["image_size"], hp["image_size"]))
    

    """ Preprocessing to patches """
    patch_shape = (hp["patch_size"], hp["patch_size"], hp["num_channels"])
    patches = patchify(image, patch_shape, hp["patch_size"])


    patches = np.reshape(patches, hp["flat_patches_shape"])
    patches = patches.astype(np.float32)

    """ Label """
    class_name = os.path.basename(os.path.dirname(path))
    class_idx = hp["class_names"].index(class_name)
    class_idx = np.array(class_idx, dtype=np.int32)

    return patches, class_idx


In [24]:
def parse(path):
    patches, labels = tf.numpy_function(process_image_label, [path], [tf.float32, tf.int32])
    labels = tf.one_hot(labels, hp["num_classes"])

    patches.set_shape(hp["flat_patches_shape"])
    labels.set_shape(hp["num_classes"])

    return patches, labels

In [25]:
def tf_dataset(images, batch=32):
    ds = tf.data.Dataset.from_tensor_slices((images))
    ds = ds.map(parse).batch(batch).prefetch(8)
    return ds

In [26]:
# Seeding
np.random.seed(42)
tf.random.set_seed(42)

In [27]:
""" Directory for storing files """
create_dir("files")

In [28]:
""" Paths """
dataset_path = "../Dataset"
model_path = os.path.join("files", "model.keras")
csv_path = os.path.join("files", "log.csv")

# to verify the location of dataset
print(os.listdir(dataset_path)) 

['Bacterial_Spot', 'Curl_Virus', 'Early_Blight', 'Healthy', 'Late_Blight', 'Leaf_Mold', 'Mosaic_Virus', 'Septoria_Leaf_Spot', 'Spider_Mites', 'Target_Spot']


In [29]:
 """ Dataset """
train_x, valid_x, test_x = load_data(dataset_path)
print(f"Train: {len(train_x)} - Valid: {len(valid_x)} - Test: {len(test_x)}")

Train: 12809 - Valid: 1600 - Test: 1600


In [31]:
train_ds = tf_dataset(train_x, batch=hp["batch_size"])
valid_ds = tf_dataset(valid_x, batch=hp["batch_size"])

# to verify the shape of the image in train dataset
for x,y in train_ds:
    print(x.shape,y.shape) # (patchSize,totalPatches,totalPatches*channel)
    break
    

(16, 256, 768) (16, 10)


In [32]:
""" Assigning class weight as the dataset is imbalanced """

' Assigning class weight as the dataset is imbalanced '

In [33]:
from sklearn.utils.class_weight import compute_class_weight

In [40]:
class_labels = np.unique(train_labels)
print(class_labels)

[0 1 2 3 4 5 6 7 8 9]


In [43]:
# compute class weight
train_labels = []
for path in train_x:
    class_name = os.path.basename(os.path.dirname(path))
    class_idx = hp["class_names"].index(class_name)
    train_labels.append(class_idx)

# Calculate class weights
# class_weights = compute_class_weight("balanced", np.unique(train_labels), train_labels)
class_weights = compute_class_weight(
                                        class_weight = "balanced",
                                        classes = class_labels,
                                        y = train_labels                                                 
                                    )
class_weight_dict = dict(zip(class_labels, class_weights))

# Print computed class weights
print("Computed Class Weights:", class_weight_dict)

Computed Class Weights: {0: 0.7499414519906323, 1: 0.4984046692607004, 2: 1.5755227552275524, 3: 1.009377462568952, 4: 0.8274547803617571, 5: 1.6721932114882507, 6: 4.342033898305084, 7: 0.913623395149786, 8: 0.9681783824640967, 9: 1.1487892376681614}


In [None]:
"""Model Implementation"""

In [44]:
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

In [45]:
class ClassToken(Layer):
    def __init__(self):
        super().__init__()

    def build(self, input_shape):
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value = w_init(shape=(1, 1, input_shape[-1]), dtype=tf.float32),
            trainable = True
        )

    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        hidden_dim = self.w.shape[-1]

        cls = tf.broadcast_to(self.w, [batch_size, 1, hidden_dim])
        cls = tf.cast(cls, dtype=inputs.dtype)
        return cls

In [49]:
def mlp(x, cf):
    x = Dense(hp["mlp_dim"], activation="gelu")(x)
    x = Dropout(hp["dropout_rate"])(x)
    x = Dense(hp["hidden_dim"])(x)
    x = Dropout(hp["dropout_rate"])(x)
    return x

In [50]:
def transformer_encoder(x, cf):
    skip_1 = x
    x = LayerNormalization()(x)
    x = MultiHeadAttention(
        num_heads=hp["num_heads"], key_dim=hp["hidden_dim"]
    )(x, x)
    x = Add()([x, skip_1])

    skip_2 = x
    x = LayerNormalization()(x)
    x = mlp(x, cf)
    x = Add()([x, skip_2])

    return x

In [51]:
def ViT(hp):
    """ Inputs """
    input_shape = (hp["num_patches"], hp["patch_size"]*hp["patch_size"]*hp["num_channels"])
    inputs = Input(input_shape)     ## (None, 256, 3072)
    

    """ Patch + Position Embeddings """
    patch_embed = Dense(hp["hidden_dim"])(inputs)   ## (None, 256, 768)

    positions = tf.range(start=0, limit=hp["num_patches"], delta=1)
    pos_embed = Embedding(input_dim=hp["num_patches"], output_dim=hp["hidden_dim"])(positions) ## (256, 768)
    embed = patch_embed + pos_embed ## (None, 256, 768)

    """ Adding Class Token """
    token = ClassToken()(embed)
    x = Concatenate(axis=1)([token, embed]) ## (None, 257, 768)

    for _ in range(hp["num_layers"]):
        x = transformer_encoder(x, hp)

    """ Classification Head """
    x = LayerNormalization()(x)     ## (None, 257, 768)
    x = x[:, 0, :]
    x = Dense(hp["num_classes"], activation="softmax")(x)

    model = Model(inputs, x)
    return model

In [52]:
model = ViT(hp)




In [53]:
model.summary()

In [61]:
 model.compile(
        loss="categorical_crossentropy",
        optimizer=tf.keras.optimizers.Adam(hp["lr"], clipvalue=1.0),
        metrics=["acc"]
    )

In [62]:
 callbacks = [
        ModelCheckpoint(model_path, monitor='val_loss', verbose=1, save_best_only=True, save_freq='epoch'),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=1e-10, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=False),
    ]

In [63]:
history = model.fit(
        train_ds,
        epochs=hp["num_epochs"],
        validation_data=valid_ds,
        class_weight=class_weight_dict,
        callbacks=callbacks
    )

Epoch 1/5
[1m801/801[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - acc: 0.3647 - loss: 1.8472
Epoch 1: val_loss improved from inf to 0.72798, saving model to files\model.keras
[1m801/801[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2841s[0m 4s/step - acc: 0.3649 - loss: 1.8466 - val_acc: 0.7369 - val_loss: 0.7280 - learning_rate: 1.0000e-04
Epoch 2/5
[1m801/801[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - acc: 0.7698 - loss: 0.6312
Epoch 2: val_loss improved from 0.72798 to 0.54634, saving model to files\model.keras
[1m801/801[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2761s[0m 3s/step - acc: 0.7698 - loss: 0.6311 - val_acc: 0.8037 - val_loss: 0.5463 - learning_rate: 1.0000e-04
Epoch 3/5
[1m801/801[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3s/step - acc: 0.8383 - loss: 0.4407
Epoch 3: val_loss improved from 0.54634 to 0.38162, saving model to files\model.keras
[1m801/801[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2762s

In [None]:
""" Validate and Test"""

In [64]:
valid_ds = tf_dataset(valid_x, batch=hp["batch_size"])

In [65]:
test_ds = tf_dataset(test_x, batch=hp["batch_size"])

In [66]:
model.load_weights(model_path)
model.compile(
        loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
        optimizer=tf.keras.optimizers.Adam(hp["lr"]),
        metrics=["acc"]
)

In [67]:
model.evaluate(test_ds)

[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m104s[0m 1s/step - acc: 0.8988 - loss: 0.3111


[0.3310690224170685, 0.8849999904632568]

In [68]:
from sklearn.metrics import classification_report, confusion_matrix

In [69]:
# Assuming test_ds yields batches of (x, y) tuples
test_predictions = []
test_true_labels = []

for test_images, test_labels in test_ds:
    # Predict the labels for the test images
    predictions = model.predict(test_images)

    # Convert predictions to class labels (assuming one-hot encoding)
    predicted_labels = np.argmax(predictions, axis=1)

    # Convert true labels to class labels
    true_labels = np.argmax(test_labels.numpy(), axis=1)

    # Append to lists for the entire test set
    test_predictions.extend(predicted_labels)
    test_true_labels.extend(true_labels)

# Convert lists to NumPy arrays
test_predictions = np.array(test_predictions)
test_true_labels = np.array(test_true_labels)

# Calculate the classification report
report = classification_report(test_true_labels, test_predictions, zero_division=1)
print("Classification Report:\n", report)

# Calculate the confusion matrix
conf_matrix = confusion_matrix(test_true_labels, test_predictions)
print("\nConfusion Matrix:\n", conf_matrix)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 883ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 924ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 899ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 892ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 875ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 892ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 949ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 970ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 966ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 953ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 979ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 968ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1