## Check python requirements
 - Python version between 3.8 and 3.11

## Import libraries / packages

In [None]:
import kagglehub
import os
import shutil
import random
import matplotlib.pyplot as plt
from datetime import datetime
import tensorflow as tf
from tensorflow.keras.layers import Rescaling, RandomFlip, RandomRotation
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model

# Import custom file with different models
Model structures are kept separately for easier testing

In [3]:
import model_definition

## Download dataset of images
Dataset is downloaded from kaggle to cache. Path to the dataset is saved into variable.

In [None]:
# Download latest version
path = kagglehub.dataset_download("abdallahalidev/plantvillage-dataset")

print("Path to dataset files:", path)
path = path + r"\plantvillage dataset" + r"\color"

## Split dataset into train, validation and test data
Images are diveded into 3 splits. It is done physically on in the file structure. In working directory, folder is created where the imagas are copied and separeted into 3 subfolders.

This step is ignored if the folder alredy exists - the split has already been done before.

In [None]:
# Paths
original_dataset_dir = path  # Path to the original dataset
output_base_dir = 'split_dataset'  # Output directory for train, val, test

# Skip if folder alredy exists
if not os.path.isdir(output_base_dir):

    # Create train, val, test directories
    splits = ['train', 'validation', 'test']
    for split in splits:
        split_path = os.path.join(output_base_dir, split)
        os.makedirs(split_path, exist_ok=True)

    # Split ratios
    train_ratio = 0.7
    val_ratio = 0.2
    test_ratio = 0.1

    # Split images
    for class_name in os.listdir(original_dataset_dir):
        class_path = os.path.join(original_dataset_dir, class_name)
        if not os.path.isdir(class_path):
            continue

        # Create class directories in each split folder
        for split in splits:
            os.makedirs(os.path.join(output_base_dir, split, class_name), exist_ok=True)

        # Get all image files
        images = [f for f in os.listdir(class_path) if os.path.isfile(os.path.join(class_path, f))]
        random.shuffle(images)

        # Calculate split sizes
        total_images = len(images)
        train_size = int(total_images * train_ratio)
        val_size = int(total_images * val_ratio)

        # Assign images to splits
        train_images = images[:train_size]
        val_images = images[train_size:train_size + val_size]
        test_images = images[train_size + val_size:]

        # Function to copy images
        def copy_images(image_list, split):
            for image in image_list:
                src = os.path.join(class_path, image)
                dest = os.path.join(output_base_dir, split, class_name, image)
                shutil.copy(src, dest)

        # Copy images to respective folders
        copy_images(train_images, 'train')
        copy_images(val_images, 'validation')
        copy_images(test_images, 'test')

    print("Dataset successfully split!")

else:
    print(f"Folder {output_base_dir} already exists")


## Load images into code
Images are loaded (prefetched) into memory as keras objects.

In [None]:
# Define paths
train_dir = 'split_dataset/train'
validation_dir = 'split_dataset/validation'
test_dir = 'split_dataset/test'

# Load datasets
train_dataset = tf.keras.utils.image_dataset_from_directory(
    train_dir,
    image_size=(224, 224),  # Resize all images to this size
    batch_size=32          # Number of images per batch
)

validation_dataset = tf.keras.utils.image_dataset_from_directory(
    validation_dir,
    image_size=(224, 224),
    batch_size=32
)

test_dataset = tf.keras.utils.image_dataset_from_directory(
    test_dir,
    image_size=(224, 224),
    batch_size=32
)

# Optional: Prefetch for performance
AUTOTUNE = tf.data.AUTOTUNE
train_dataset = train_dataset.prefetch(buffer_size=AUTOTUNE)
validation_dataset = validation_dataset.prefetch(buffer_size=AUTOTUNE)
test_dataset = test_dataset.prefetch(buffer_size=AUTOTUNE)


## Data augmentation and normalization
- Creates random variations for train set to prevent overfitting and make robust model
- Rescales image values from 0-255 to 0-1

In [7]:
# Data augmentation
data_augmentation = tf.keras.Sequential([
    RandomFlip("horizontal"),
    RandomRotation(0.1),
])

# Normalize the dataset
train_dataset = train_dataset.map(lambda x, y: (data_augmentation(x), y))
train_dataset = train_dataset.map(lambda x, y: (Rescaling(1./255)(x), y))
validation_dataset = validation_dataset.map(lambda x, y: (Rescaling(1./255)(x), y))
test_dataset = test_dataset.map(lambda x, y: (Rescaling(1./255)(x), y))


## Model definition

In [10]:
# Get numbert of output classes (number of folders in data folder)
num_of_classes = len(os.listdir("split_dataset/train"))

# Select model
model_name, model = model_definition.get_model("ResNet50_v2", num_of_classes)

# Start training new model

In [None]:
# Get timestamp to mark the model
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

checkpoint_filename = f"checkpoints/model_{model_name}_{timestamp}_epoch_{{epoch:02d}}_acc_{{val_accuracy:.2f}}.keras"

# Define the checkpoint callback
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filename,  # Save path
    save_weights_only=False,  # Set to True to save only weights
    save_best_only=False,     # Set to True to save only the best model
    verbose=1                 # Print a message when saving
)

history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=1,
    callbacks=[checkpoint_callback],
    batch_size=32
)

model.save(f'model/model_{model_name}_{timestamp}_epoch1_frozen_pretrained.keras')

# Load model and continue training

In [None]:
model = load_model(f'model/model_{model_name}_{timestamp}_epoch1_frozen_pretrained.keras')

# Unfreeze the base pretrained model
model.layers[0].trainable = True

# Re-compile the Model
model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=10,
    callbacks=[checkpoint_callback],
    batch_size=32
)

model.save(f'model/model_{model_name}_{timestamp}_from_pretrained_finished.keras')

# Plot model training history of improvement

In [None]:
# Plot accuracy
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.legend()
plt.show()

# Plot loss
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.legend()
plt.show()

## Load and evaluate model

In [None]:
model = load_model("model.keras")
loss, accuracy = model.evaluate(test_dataset)
print(f"Test Accuracy: {accuracy:.2f}")

## Define function to make labels readable
Extract information from dataset class labels and transform them for better readlibility. Prints the result as a list of tupples.

In [None]:
# Extracts information from the label
def parse_label(label):
    # Split the label into the flower name and disease part
    label = label.split(" ")[0]
    parts = label.split("___")
    
    # Extract the flower name and make it human-readable
    flower_name = parts[0].replace("_", " ").replace("(", "").replace(")", "")
    
    # Determine if the label indicates a healthy plant
    is_healthy = "healthy" in label
    
    # Extract the disease name or mark it as healthy
    if is_healthy:
        disease = "healthy"
    else:
        disease = parts[1].replace("_", " ").replace("(", "").replace(")", "")
    
    return flower_name, is_healthy, disease

# Example usage

# print(parse_label("Corn_(maize)___Northern_Leaf_Blight"))
# print(parse_label("Cherry_(including_sour)___Powdery_mildew"))
# print(parse_label("Blueberry___healthy"))
# print(parse_label("Tomato___Spider_mites Two-spotted_spider_mite"))

import os

base_path = './split_dataset/test'
folder_names = os.listdir(base_path)  # List all items in the directory
folder_names = [folder for folder in folder_names if os.path.isdir(os.path.join(base_path, folder))]  # Keep only directories

result = []  # This will hold the nested list of results

for folder in folder_names:
    result.append(parse_label(folder))  # Apply the function and append to the result

print(result)

# Split model into parts
GitHub and GitLab free accounts have 100 MB limit per file. To use the cloud, it is possible to split the file into multiple smaller files. And only merge them when necessary.

In [3]:
file_name = "model.keras"

def split_file(file_path, parts):
    with open(file_path, 'rb') as f:
        content = f.read()
    
    part_size = len(content) // parts
    for i in range(parts):
        start = i * part_size
        end = None if i == parts - 1 else (i + 1) * part_size
        with open(f"{file_path}.part{i+1}", 'wb') as part_file:
            part_file.write(content[start:end])

split_file(file_name, parts=3)

def merge_file(output_path, part_paths):
    with open(output_path, 'wb') as output:
        for part in part_paths:
            with open(part, 'rb') as part_file:
                output.write(part_file.read())

# merge_file("model_final_new.keras", ["model_final.keras.part1", "model_final.keras.part2", "model_final.keras.part3"])