In [1]:
#import libraries
import pandas as pd
import numpy as numpy
import os
import warnings
import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt


warnings.filterwarnings("ignore")

In [None]:

# Load the Plant Village dataset
dataset, info = tfds.load('plant_village', with_info=True, as_supervised=True)

# Extract the full dataset
full_dataset = dataset['train']

# Split the dataset into 80% training and 20% validation
train_size = int(0.8 * info.splits['train'].num_examples)
val_size = int(0.2 * info.splits['train'].num_examples)

train_dataset = full_dataset.take(train_size)
val_dataset = full_dataset.skip(train_size).take(val_size)

# Print dataset information
print(info)


Downloading and preparing dataset 827.82 MiB (download: 827.82 MiB, generated: 815.37 MiB, total: 1.60 GiB) to /root/tensorflow_datasets/plant_village/1.0.2...


Dl Completed...: 0 url [00:00, ? url/s]

Dl Size...: 0 MiB [00:00, ? MiB/s]

Extraction completed...: 0 file [00:00, ? file/s]

In [None]:
# List of disease categories
diseases = info.features['label'].names
print("Disease categories:", diseases)

In [None]:
print("Total disease classes are: {}".format(len(diseases)))

In [None]:
# Create a list of unique plants and count the number of diseases
plants = []
NumberOfDiseases = 0
for plant in diseases:
    plant_name = plant.split('___')[0]
    disease_status = plant.split('___')[1]
    if plant_name not in plants:
        plants.append(plant_name)
    if disease_status != 'healthy':
        NumberOfDiseases += 1

print("Unique plants:", plants)
print("Number of plants: {}".format(len(plants)))
print("Number of diseases (excluding healthy):", NumberOfDiseases)

In [None]:
# Count the number of images for each disease category
nums = {}
for image, label in full_dataset:
    disease = info.features['label'].int2str(label.numpy())
    if disease not in nums:
        nums[disease] = 1
    else:
        nums[disease] += 1

# Converting the nums dictionary to a Pandas DataFrame
img_per_class = pd.DataFrame(nums.values(), index=nums.keys(), columns=["no. of images"])
print(img_per_class)

In [None]:

# Define colors for specific plants
colors = ['blue', 'green', 'red', 'purple', 'orange', 'brown', 'pink', 'gray', 'olive', 'cyan', 'magenta', 'yellow']

# Plotting number of images available for each disease
plt.figure(figsize=(20, 5))
bars = plt.bar(range(len(nums)), [n for n in nums.values()], color=colors)
plt.xlabel('Plants/Diseases', fontsize=10)
plt.ylabel('No of images available', fontsize=10)
plt.xticks(range(len(nums)), nums.keys(), fontsize=8, rotation=90)
plt.title('Images per each class of plant disease')

# Adding color labels for specific plants
plt.legend(bars, nums.keys(), loc='upper right', bbox_to_anchor=(1.1, 1.0), title='Plants', fontsize='small')

plt.show()

In [None]:
#Images available for training¶
n_train = sum(nums.values())
print(f"There are {n_train} images for training")

In [None]:
#Data Preparation for training
def preprocess_image(image, label):
    # Resize images to 128x128 and normalize pixel values to [0, 1]
    image = tf.image.resize(image, (128, 128))
    image = tf.cast(image, tf.float32) / 255.0
    return image, label

batch_size = 32
buffer_size = 1000

# Preprocess the training dataset
train_dataset = train_dataset.map(preprocess_image)
train_dataset = train_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# Preprocess the validation dataset
val_dataset = val_dataset.map(preprocess_image)
val_dataset = val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
#mapping from label numbers to plant names
label_to_name = info.features['label'].int2str

def show_image(image, label):
    # image tensor to a NumPy array and rescale to [0, 1]
    image = image.numpy().astype("float32")
    image = (image - image.min()) / (image.max() - image.min())
    #image
    plt.imshow(image)
    plt.title("Label: " + label_to_name(label.numpy()))
    plt.axis("off")
    plt.show()

# several images from the training dataset
num_images_to_display = 5
for image, label in train_dataset.take(num_images_to_display):
    show_image(image[0], label[0])



In [None]:
# Simple Residual Block
class SimpleResidualBlock(tf.keras.layers.Layer):
    def __init__(self):
        super(SimpleResidualBlock, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(filters=3, kernel_size=3, strides=1, padding='same')
        self.relu1 = tf.keras.layers.ReLU()
        self.conv2 = tf.keras.layers.Conv2D(filters=3, kernel_size=3, strides=1, padding='same')
        self.relu2 = tf.keras.layers.ReLU()

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.relu1(x)
        x = self.conv2(x)
        return self.relu2(x) + inputs  # Adding the input tensor to the output

# Base class for image classification models
class ImageClassificationBase(tf.keras.Model):
    def train_step(self, data):
        images, labels = data
        with tf.GradientTape() as tape:
            predictions = self(images, training=True)
            loss = self.compiled_loss(labels, predictions)
        gradients = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        self.compiled_metrics.update_state(labels, predictions)
        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        images, labels = data
        predictions = self(images, training=False)
        loss = self.compiled_loss(labels, predictions)
        self.compiled_metrics.update_state(labels, predictions)
        return {m.name: m.result() for m in self.metrics}

    def validation_step(self, data):
        return self.test_step(data)

    def call(self, inputs, training=False):
        raise NotImplementedError("`call` method must be implemented in subclass.")

# Convolution block with BatchNormalization
def ConvBlock(out_channels, pool=False):
    layers = [
        tf.keras.layers.Conv2D(out_channels, kernel_size=3, padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.ReLU()
    ]
    if pool:
        layers.append(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    return tf.keras.Sequential(layers)

# ResNet9 architecture
class ResNet9(ImageClassificationBase):
    def __init__(self, in_channels, num_diseases):
        super(ResNet9, self).__init__()
        self.conv1 = ConvBlock(64)
        self.conv2 = ConvBlock(128, pool=True)
        self.res1 = tf.keras.Sequential([ConvBlock(128) for _ in range(2)])

        self.conv3 = ConvBlock(256, pool=True)
        self.conv4 = ConvBlock(512, pool=True)
        self.res2 = tf.keras.Sequential([ConvBlock(512) for _ in range(2)])

        self.classifier = tf.keras.Sequential([
            tf.keras.layers.GlobalAveragePooling2D(),
            tf.keras.layers.Dense(num_diseases)
        ])

    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.conv2(x)
        x_res1 = self.res1(x) + x
        x = self.conv3(x_res1)
        x = self.conv4(x)
        x_res2 = self.res2(x) + x
        return self.classifier(x_res2)

In [None]:
# Instantiate the model
num_classes = info.features['label'].num_classes
model = ResNet9(in_channels=3, num_diseases=num_classes)

In [None]:
# Evaluate function
def evaluate(model, val_dataset, batch_size):
    val_dataset = val_dataset.batch(batch_size)
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    val_loss = tf.keras.metrics.Mean()
    val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()

    for images, labels in val_dataset:
        predictions = model(images, training=False)
        loss = loss_fn(labels, predictions)
        val_loss(loss)
        val_accuracy(labels, predictions)

    return {"val_loss": val_loss.result(), "val_accuracy": val_accuracy.result()}

In [None]:
# Fit function with EarlyStopping
def fit_one_cycle(epochs, max_lr, model, train_dataset, val_dataset, batch_size, weight_decay=0):
    optimizer = tf.keras.optimizers.SGD(learning_rate=max_lr, momentum=0.9, nesterov=True)
    lr_scheduler = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=max_lr,
        decay_steps=len(train_dataset) * 2,  # You can adjust the decay steps
        decay_rate=0.95,
        staircase=True)

    def lr_scheduler_fn(epoch, lr):
        return lr_scheduler(epoch)

    lr_callback = tf.keras.callbacks.LearningRateScheduler(lr_scheduler_fn)

    # EarlyStopping callback
    early_stopping_callback = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=3,
        restore_best_weights=True
    )

    model.compile(optimizer=optimizer,
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    history = model.fit(train_dataset,
                        epochs=epochs,
                        validation_data=val_dataset,
                        callbacks=[lr_callback, early_stopping_callback],
                        verbose=2)

    val_loss, val_accuracy = model.evaluate(val_dataset, verbose=0)
    print(f'Validation Loss: {val_loss}, Validation Accuracy: {val_accuracy}')

    return history, val_loss, val_accuracy

In [None]:
# Set hyperparameters
epochs = 10
max_lr = 0.01
weight_decay = 1e-4

# Fit the model
history, val_loss, val_accuracy = fit_one_cycle(epochs, max_lr, model, train_dataset, val_dataset, batch_size, weight_decay)


Epoch 1/10


In [None]:
# Plot training history
import matplotlib.pyplot as plt

plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()