# Progetto d'esame di Data Analysis in Experimental Physics with Machine Learning

Gruppo composto dagli studenti Luca Attinà, Sharis Feriotto e Matteo Marchisio Caprioglio

Dataset ipotesi: https://www.kaggle.com/datasets/vipoooool/new-plant-diseases-dataset
Questo dataset non va bene perchè ha fatto data aug sul validation dataset, fallback al plant village originale: https://www.tensorflow.org/datasets/catalog/plant_village

In [None]:
# libraries and packages import
import os
import shutil
import random
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow import keras

from keras.models import Sequential
from keras.layers import Conv2D, BatchNormalization, Activation, MaxPooling2D, Dropout, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from keras.losses import CategoricalCrossentropy, CategoricalFocalCrossentropy
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras import regularizers
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_curve, roc_auc_score, classification_report
import seaborn as sns
from sklearn.utils import class_weight

In [None]:
# Seed setting for reproducibility
random.seed(42)
np.random.seed(42)
tf.random.set_seed(42)
tf.keras.utils.set_random_seed(42)


In [None]:
# Colab optional setup
'''
IS_COLAB = "google.colab" in sys.modules
print("Running on Colab:", IS_COLAB)
if IS_COLAB:
  from google.colab import drive
  drive.mount('/content/drive/', force_remount=True)
  #Adapt the folder to your specific one where you have downloaded the code
  %cd /content/drive/My Drive/path_to/exam-project
'''

# Data download and preprocess

Be careful if you are running this on COLAB or locally.
Due to some bugs, datasets creation is different.
Change the lower COLAB bool to True if running on COLAB.

In [None]:
COLAB = False  # if True, run on Google Colab, else on local repository


In [None]:
# useful constants
IMG_SIZE = (128, 128)
BATCH_SIZE = 32
N_EPOCHS = 30
VERBOSE = True # True for debug prints


IF RUNNING ON COLAB

In [None]:
# Load the PlantVillage dataset from TFDS instead of the new dataset (it performed data aug on the validation set, which is wrong)
# Only working on Colab (and locally if tfds.load works correctly)

def load_from_tfds():
    (ds_train, ds_val, ds_test), ds_info = tfds.load(
        'plant_village',
        split=['train[:80%]', 'train[80%:90%]', 'train[90%:]'],
        shuffle_files=True,
        as_supervised=True,  # returns (image, label) pairs
        with_info=True,
    )

    # labels are plant families
    class_names = ds_info.features['label'].names
    families = sorted({n.split('___')[0] for n in class_names})
    split_labels = families
    family_map = tf.constant([families.index(n.split('___')[0]) for n in class_names], dtype=tf.int32)

    # returns label as one-hot
    def to_ohe(img, lbl):
      idx = tf.gather(family_map, lbl)
      return img, tf.one_hot(idx, len(split_labels))

    ds_train = ds_train.map(to_ohe)
    ds_val   = ds_val.map(to_ohe)
    ds_test  = ds_test.map(to_ohe)

    print(split_labels)

    return ds_train, ds_val, ds_test, split_labels, ds_info


IF RUNNING ON LOCAL REPOSITORY

If you are working on a local repository, first of all you need to clone the dataset into a local folder.
Execute the command "git clone https://github.com/spMohanty/PlantVillage-Dataset" in the terminal while you are in a known path. Make sure to set the base_path variable below to point to that cloned folder.

Only run the git clone command if you haven’t already downloaded the dataset to your PC.

In [None]:
#define prerpocess function
def preprocess(image, label, image_size=(128, 128)):
    image = tf.image.resize(image, image_size)
    image = tf.cast(image, tf.float32) / 255.0
    return image, label


In [None]:
# Define local path to generate split
if not COLAB:
    from pathlib import Path

    base_path = Path(r"D:\progetto-daml") #Change according to the path where PlantVillage-Dataset is cloned.
    base_path = base_path / "PlantVillage-Dataset"

    OUTPUT_ROOT = base_path / "by_family" # 14 families splitting
    DS_DIR = OUTPUT_ROOT / "train"


In [None]:
#ONLY RUN FIRST TIME, AFTER DATASET CLONING
'''
SOURCE_DIR = base_path / "raw/color"
TRAIN_FRAC = 0.8 #change fractions
VAL_FRAC = 0.1
TEST_FRAC = 0.1

# Create output folders
for split in ("train","val","test"):
    folder = OUTPUT_ROOT / split
    if folder.exists():
        shutil.rmtree(folder) #remove pre-existing folder for new split
    folder.mkdir(parents=True, exist_ok=True)

# Collect folders name and map into family name folders
disease_folders = sorted([d for d in SOURCE_DIR.iterdir() if d.is_dir()])
families = sorted({d.name.split("___")[0] for d in disease_folders})

for split in ("train","val","test"):
    for fam in families:
        (OUTPUT_ROOT/ split / fam).mkdir(parents=True, exist_ok=True)

# Splits files into train, validation and test for each family
for disease_dir in disease_folders:
    fam = disease_dir.name.split("___")[0]
    images = list(disease_dir.glob("*.*"))  # all image files
    random.shuffle(images)

    n = len(images)
    n_train = int(n * TRAIN_FRAC)
    n_val   = int(n * VAL_FRAC)

    train_imgs = images[:n_train]
    val_imgs   = images[n_train:n_train+n_val]
    test_imgs  = images[n_train+n_val:]

    # Put the datasets into the local folders
    for img in train_imgs:
        shutil.copy(img, OUTPUT_ROOT/"train"/fam/img.name)
    for img in val_imgs:
        shutil.copy(img, OUTPUT_ROOT/"val"/fam/img.name)
    for img in test_imgs:
        shutil.copy(img, OUTPUT_ROOT/"test"/fam/img.name)

print("Datasets paths are:\n",
    OUTPUT_ROOT / "train\n",
    OUTPUT_ROOT / "val\n",
    OUTPUT_ROOT / "test\n")
'''

In [None]:
# read split data into train, validation e test sets
def prepare_local_dataset():
    split_labels = sorted([p.name for p in (OUTPUT_ROOT/"train").iterdir() if p.is_dir()]) #folders names

    ds_train_ohe = tf.keras.utils.image_dataset_from_directory(
        str(OUTPUT_ROOT/"train"),
        image_size=IMG_SIZE,
        batch_size=BATCH_SIZE,
        shuffle=True,
        label_mode='categorical',
    )

    ds_val_ohe = tf.keras.utils.image_dataset_from_directory(
        str(OUTPUT_ROOT/"val"),
        image_size=IMG_SIZE,
        batch_size=BATCH_SIZE,
        label_mode='categorical',
        shuffle=False,
    )

    ds_test_ohe = tf.keras.utils.image_dataset_from_directory(
        str(OUTPUT_ROOT/"test"),
        image_size=IMG_SIZE,
        batch_size=BATCH_SIZE,
        label_mode='categorical',
        shuffle=False,
    )

    return ds_train_ohe, ds_val_ohe, ds_test_ohe, split_labels


# Training


In [None]:
# cnn layers
def simple_cnn(input_shape, num_classes, l2_coef=1e-3):
    model = Sequential([
        Conv2D(16, (3, 3), padding='same', kernel_regularizer=regularizers.l2(l2_coef), input_shape=input_shape),
        BatchNormalization(),
        Activation('relu'),
        Dropout(0.4),

        Conv2D(32, (3, 3), padding='same', kernel_regularizer=regularizers.l2(l2_coef)),
        BatchNormalization(),
        Activation('relu'),
        MaxPooling2D((2, 2)),
        Dropout(0.4),

        Flatten(),
        Dense(num_classes, activation='softmax', kernel_regularizer=regularizers.l2(l2_coef))
    ])
    return model


In [None]:
# preprocess and batch datasets. Useful objects are defined
def model_preprocess(COLAB):
    if COLAB:
        ds_train, ds_val, ds_test, split_labels, ds_info = load_from_tfds()
        ds_train = ds_train.shuffle(buffer_size=5000) # only shuffle train set
        ds_train = ds_train.map(lambda img, lbl: preprocess(img, lbl, IMG_SIZE)).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)  # training dataset without data augmentation
        ds_val = ds_val.map(lambda img, lbl: preprocess(img, lbl, IMG_SIZE)).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
        ds_test = ds_test.map(lambda img, lbl: preprocess(img, lbl, IMG_SIZE)).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    else:
        ds_train, ds_val, ds_test, split_labels = prepare_local_dataset()

    num_classes = len(split_labels)
    checkpoint = "best_model_14_families_exam.h5"

    return ds_train, ds_val, ds_test, split_labels, num_classes, checkpoint



In [None]:
# model compilation
def model_train(num_classes):
    model = simple_cnn(input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3), num_classes=num_classes)

    optimizer = Adam(learning_rate=0.001)
    model.compile(
        optimizer=optimizer,
        #loss=CategoricalFocalCrossentropy(alpha = 0.25, gamma = 2),
        loss=CategoricalCrossentropy(),
        metrics=['accuracy']
    )

    model.build(input_shape=(None, IMG_SIZE[0], IMG_SIZE[1], 3))  # Build the model with dynamic batch size
    model.summary()
    return model


In [None]:
# program running (all the functions above are executed)
train_set, val_set, test_set, split_labels, num_classes, checkpoint_file = model_preprocess(COLAB)


In [None]:
# debug prints: labels, number of labels, checkpoint file name
if VERBOSE:
    print(split_labels)
    print(len(split_labels))
    print(checkpoint_file)

In [None]:
# debug print for families - ohe labels cross check
def display_samples_with_labels_ohe(dataset, num_samples=5):
    plt.figure(figsize=(6, 3 * num_samples))
    for i, (img, label_ohe) in enumerate(dataset.unbatch().take(num_samples)):
        # Tensors are converted into numpy arrays
        img_np   = img.numpy().astype("uint8")
        ohe_vec  = label_ohe.numpy()                     # one-hot vector
        fam_idx  = int(tf.argmax(label_ohe).numpy())     # family index
        fam_name = split_labels[fam_idx]

        ax = plt.subplot(num_samples, 1, i + 1)
        plt.imshow(img_np)
        plt.axis("off")
        plt.title(f"Family: {fam_name} (idx={fam_idx})\nOHE: {ohe_vec.tolist()}")
    plt.tight_layout()
    plt.show()

# example on train batch
if VERBOSE:
    display_samples_with_labels_ohe(train_set, num_samples=5)


In [None]:
# counting number of images per class
from collections import Counter

if VERBOSE:
    counts = Counter()

    for _, batch_labels in train_set:
        idxs = np.argmax(batch_labels.numpy(), axis=1)
        counts.update(idxs)

    print(f"Number of classes: {len(split_labels)}")
    print("Number of images per class:")
    for idx in range(len(split_labels)):
        print(f"{split_labels[idx]}: {counts[idx]} images")


In [None]:
# Visualize the class numbers distribution
if VERBOSE:
    counts_list = [counts[i] for i in range(len(split_labels))]

    plt.figure(figsize=(12, 6))
    plt.bar(split_labels, counts_list, color='skyblue')
    plt.xlabel('Family')
    plt.ylabel('Number of Images')
    plt.title('Classes Distribution')
    plt.xticks(rotation=90)
    plt.grid(axis='y')
    plt.tight_layout()
    plt.show()


In [None]:
# Print element format
if VERBOSE:
    print("Element spec:", train_set.element_spec)

    # Example on a batch
    for batch in train_set.take(1):
        x, y = batch
        print("x shape:", x.shape, "  dtype:", x.dtype)
        print("y shape:", y.shape, "  dtype:", y.dtype)


In [None]:
model = model_train(num_classes)


In [None]:
#label conversion from ohe to index
all_labels = []
for batch in train_set:
    images, labels_ohe = batch
    for lab in labels_ohe:
        all_labels.append(int(np.argmax(lab)))

all_labels = np.array(all_labels)
classes=np.unique(all_labels)

# weights calculation (total examples number / (classes number * example in ith class) )
weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=classes,
    y=all_labels
)

class_w = dict(zip(classes, weights))

# print family name and corresponding weight
if VERBOSE:
    for idx, w in class_w.items():
        name = split_labels[idx]
        print(f"{name:15s}: {w:.4f}")

In [None]:
# Model training - Saving best model
history = model.fit(
    train_set,
    validation_data=val_set,
    epochs=N_EPOCHS,
    #class_weight=class_w, #optional: training with weighted classes
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True, verbose=1),
        ModelCheckpoint(checkpoint_file, monitor='val_loss', save_best_only=True, verbose=1),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
    ]
)


# CNN training history


In [None]:
# plot training history (Loss and Accuracy)
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.tight_layout()
plt.show()


# Evaluation code

The following code is left here as a backup in case of problem with the compilation wit model_evaluation.ipynb

In [None]:
# Generate Predictions on Test Set
from tensorflow.keras import Sequential

model = keras.models.load_model('best_model_14_families_focal.h5', compile=False) # insert file name

y_true = [] # ground truth
y_pred = [] # prediction
y_score = [] # predictions vector
for images, labels in test_set:
    y_true.extend(np.argmax(labels.numpy(), axis=1))
    preds = model.predict(images)
    y_pred.extend(np.argmax(preds, axis=1))
    y_score.append(preds)
y_score = np.concatenate(y_score)


In [None]:
import h5py
f = h5py.File('best_model_14_families_focal.h5', 'r')
f.close()

In [None]:
# Calculate Evaluation Metrics (Accuracy, Precision, Recall, F1)
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, average='weighted')
recall = recall_score(y_true, y_pred, average='weighted')
f1 = f1_score(y_true, y_pred, average='weighted')
print(f"Accuracy: {accuracy:.6f}")
print(f"Precision: {precision:.6f}")
print(f"Recall: {recall:.6f}")
print(f"F1-score: {f1:.6f}")


In [None]:
# Compute confusion matrix
cm = confusion_matrix(y_true, y_pred, normalize='true')

#Plot confusion matrix
plt.figure(figsize=(14, 12))
sns.heatmap(cm, annot=False, fmt='d', cmap='viridis', xticklabels=split_labels, yticklabels=split_labels)
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix (Test Set)')
plt.xticks(rotation=90)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()


In [None]:
# Plot ROC Curves for Each Class
n_classes = y_score.shape[1]
fpr = dict()
tpr = dict()
roc_auc = dict()

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(np.eye(n_classes)[y_true][:, i], y_score[:, i])
    roc_auc[i] = roc_auc_score(np.eye(n_classes)[y_true][:, i], y_score[:, i])
plt.figure(figsize=(12, 12))
auc_and_idx = sorted([(roc_auc[i], i) for i in range(n_classes)], reverse=True)
for auc, i in auc_and_idx:
    plt.plot(fpr[i], tpr[i], label=f'{split_labels[i]} (AUC = {auc:.4f})')
plt.plot([0, 1], [0, 1], 'r--', lw=2, label='Random Classifier (AUC = 0.5)')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - One vs Rest (Test Set)')
plt.legend(fontsize='small', bbox_to_anchor=(1.05, 1), loc='best')

plt.show()


In [None]:
# Display Classification Report
report = classification_report(y_true, y_pred, target_names=split_labels)
print(report)
