In [None]:
import os
from typing import Final
import cv2
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import utils

In [None]:
EPOCHS: Final[int] = 50

In [None]:
dataset: tf.data.Dataset = tf.keras.preprocessing.image_dataset_from_directory(
    os.path.join(utils.DATA_DIR, 'dataset'),
    shuffle=True,
    image_size=(utils.IMG_HEIGHT, utils.IMG_WIDTH),
    batch_size=utils.BATCH_SIZE
)

#### Explore with dataset

In [None]:
class_names: list[str] = dataset.class_names
class_names

In [None]:
"""Potato dataset batches:
- 67 batches each 32 images.
"""

assert len(class_names) == 3
assert len(dataset) == 68

In [None]:
2152 // 32

In [None]:
2152 % 32

In [None]:
"""Explore One of the Batches, Note:
- press q to skip to the next image.
"""

# for images_batch, label_batch in dataset.take(1):
#     assert images_batch.shape == (32, 256, 256, 3)
#     assert images_batch[0].shape == (256, 256, 3)
#     assert images_batch[0].numpy().astype("uint8").max() == 255
#     assert label_batch.shape == (32,)
#     for image, label in zip(images_batch, label_batch):
#         utils.show_image(
#             image.numpy().astype("uint8"), 
#             str(class_names[label])
#         )

# 📝 Start Training


#### 1. Preprocessing
- 80% "1721 Images or 54 Batches" of the data is used for training
- 10% "0215 Images or 06 Batches" of the data is used for validation
- 10% "0216 Images or 08 Batches" of the data is used for testing
<hr/>
-- total: 2152 images

In [None]:
dataset.cardinality().numpy()

In [None]:
def get_dataset_partitions(
        dataset: tf.data.Dataset, 
        train_split: float=0.8, 
        validation_split: float=0.1,
        test_split: float=0.1,
        shuffle: bool=True
    ) -> tuple[tf.data.Dataset, tf.data.Dataset, tf.data.Dataset]:
    """Split dataset into train, validation and test partitions."""
    DATASET_SIZE: Final[int] = dataset.cardinality().numpy()
    
    if shuffle:
        import random
        dataset = dataset.shuffle(
            buffer_size=dataset.cardinality(), 
            seed=random.randint(0, 10_000)
        )
    
    train_size = int(train_split * DATASET_SIZE)
    val_size = int(validation_split * DATASET_SIZE)

    
    train_dataset = dataset.take(train_size)
    validation_dataset = dataset.skip(train_size).take(val_size)
    test_dataset = dataset.skip(train_size + val_size)

    utils.log_to_file(
        f"Train size: {len(train_dataset)}, Validation size: {len(validation_dataset)}, Test size: {len(test_dataset)}"
    )

    return train_dataset, validation_dataset, test_dataset

In [None]:
train_dataset, validation_dataset, test_dataset = get_dataset_partitions(dataset)

In [None]:
train_dataset = train_dataset.cache().shuffle(
    buffer_size=train_dataset.cardinality()
).prefetch(
    buffer_size=tf.data.AUTOTUNE
)

validation_dataset = validation_dataset.cache().shuffle(
    buffer_size=validation_dataset.cardinality()
).prefetch(
    buffer_size=tf.data.AUTOTUNE
)

test_dataset = test_dataset.cache().shuffle(
    buffer_size=test_dataset.cardinality()
).prefetch(
    buffer_size=tf.data.AUTOTUNE
)

In [None]:
# Normalize the data
resize_rescale = tf.keras.Sequential([
    tf.keras.layers.experimental.preprocessing.Resizing(utils.IMG_HEIGHT, utils.IMG_WIDTH),
    tf.keras.layers.experimental.preprocessing.Rescaling(1.0/255)
])

# Data augmentation
data_augmentation = tf.keras.Sequential([
    tf.keras.layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical"),
    tf.keras.layers.experimental.preprocessing.RandomRotation(0.2)
])

#### 2. Training

In [None]:
input_shape = (utils.BATCH_SIZE, utils.IMG_HEIGHT, utils.IMG_WIDTH, utils.CHANNELS)

model = tf.keras.Sequential([
    resize_rescale,
    data_augmentation,
    tf.keras.layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu', input_shape=input_shape),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(units=64, activation='relu'),
    tf.keras.layers.Dense(units=3, activation='softmax')
])

model.build(input_shape=input_shape)

In [None]:
# model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

In [None]:
utils.log_to_file(f"Training model for {EPOCHS} epochs.")
import time
start_time = time.time()
history = model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=validation_dataset
)
utils.log_to_file(f"Training took {(time.time() - start_time):,.2f} seconds.")

In [None]:
accuracy_history = history.history['accuracy']
val_accuracy_history = history.history['val_accuracy']
loss_history = history.history['loss']
val_loss_history = history.history['val_loss']

In [None]:
plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(range(EPOCHS), accuracy_history, label='Training Accuracy')
plt.plot(range(EPOCHS), val_accuracy_history, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(range(EPOCHS), loss_history, label='Training Loss')
plt.plot(range(EPOCHS), val_loss_history, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')

plt.savefig(os.path.join(utils.DATA_DIR, "output", "accuracy.png"))

In [None]:
loss, accuracy = model.evaluate(test_dataset)

In [None]:
utils.log_to_file(f"Loss: {loss:,.4f}, Accuracy: {accuracy*100:,.2f}%")

In [None]:
model.save(os.path.join(utils.DATA_DIR, 'models', f"potato_model_{EPOCHS}_{accuracy*100:.2f}.model"))

#### Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

y_pred = []
y_true = []

for images_batch, label_batch in test_dataset:
    for image, label in zip(images_batch, label_batch):
        y_pred.append(np.argmax(model.predict(image[np.newaxis, ...])))
        y_true.append(label.numpy())

cm = confusion_matrix(y_true, y_pred)

In [None]:
utils.log_to_file(f"Number of wrong predictions: {(len(y_true) - np.trace(cm)):,} / {len(y_true):,}")
utils.log_to_file(f"Accuracy: {(np.trace(cm) / len(y_true))*100:.2f}%")

In [None]:
class_names = ['Early Blight', 'Late Blight', 'Healthy']

In [None]:
plt.figure(figsize=(8, 8))
sns.heatmap(cm, annot=True, fmt='g', xticklabels=class_names, yticklabels=class_names)
plt.xlabel("Predicted")
plt.ylabel("True")

plt.savefig(os.path.join(utils.DATA_DIR, "output", "confusion_matrix.png"))

In [None]:
wrong_predictions = []
for images_batch, label_batch in test_dataset:
    wrong_predictions.extend(
        (image.numpy().astype("uint8"), label.numpy(), np.argmax(model.predict(image[np.newaxis, ...])))
        for image, label in zip(images_batch, label_batch)
        if np.argmax(model.predict(image[np.newaxis, ...])) != label.numpy()
    )

print(len(wrong_predictions))

In [None]:
# # show wrong predictions using utils.show_image
# for image, true_label, predicted_label in wrong_predictions:
#     utils.show_image(
#         image, 
#         f"True: {class_names[true_label]}, Predicted: {class_names[predicted_label]}"
#     )