In [None]:
# data_prep.py
import os, re
import pandas as pd
from PIL import Image

DATASET_FOLDERS = {
    "mustard": "/kaggle/input/900-mustard-leaf-dataset",
    "cauliflower": "/kaggle/input/cauliflower-dataset",
    "maize1": "/kaggle/input/corn-or-maize-leaf-disease-dataset",
    "maize2": "/kaggle/input/maize-disease-dataset",
    "eggplant": "/kaggle/input/eggplant-disease-recognition-dataset",
    "plantvillage": "/kaggle/input/plantdisease",
    "sugarcane": "/kaggle/input/sugarcane-leaf-disease-dataset",
    "wheat": "/kaggle/input/wheat-leaf-dataset",
    "rice": "/kaggle/input/rice-leaf-diseases-detection",
}

def normalize_name(name):
    name = name.replace(" ", "_").replace("__", "_")
    name = re.sub(r"[()]", "", name)
    return name.strip("_")

def add_images_from_folder(root_folder, crop_name=None, skip_augmented=True):
    rows=[]
    for root, dirs, files in os.walk(root_folder):
        if skip_augmented and any("aug" in d.lower() for d in root.split(os.sep)):
            continue
        for f in files:
            if f.lower().endswith((".jpg",".jpeg",".png")):
                try:
                    img = Image.open(os.path.join(root,f)).convert('RGB')
                except:
                    continue
                parent = os.path.basename(root)
                if crop_name:
                    crop = crop_name
                    disease = normalize_name(parent)
                else:
                    if "___" in parent:
                        parts = parent.split("___")
                        crop = normalize_name(parts[0])
                        disease = normalize_name("_".join(parts[1:]))
                    else:
                        crop = normalize_name(parent)
                        disease = "unknown"
                rows.append({"path":os.path.join(root,f),"crop":crop,"disease":disease})
    return rows

# Collect all images
all_rows=[]

# Mustard
for subset in ["TRAIN","TEST"]:
    mustard_path = os.path.join(DATASET_FOLDERS["mustard"], subset)
    if os.path.exists(mustard_path):
        all_rows += add_images_from_folder(mustard_path, crop_name="Mustard")

# Cauliflower
all_rows += add_images_from_folder(DATASET_FOLDERS["cauliflower"], crop_name="Cauliflower")

# Maize
all_rows += add_images_from_folder(DATASET_FOLDERS["maize1"], crop_name="Maize")
all_rows += add_images_from_folder(DATASET_FOLDERS["maize2"], crop_name="Maize")

# Eggplant
eggplant_root = os.path.join(DATASET_FOLDERS["eggplant"], "Eggplant Disease Recognition Dataset")
for folder in ["Original Images","Original Images (Version 02)"]:
    folder_path = os.path.join(eggplant_root, folder)
    if os.path.exists(folder_path):
        all_rows += add_images_from_folder(folder_path, crop_name="Eggplant")

# PlantVillage
pv_root = os.path.join(DATASET_FOLDERS["plantvillage"], "PlantVillage")
for folder in os.listdir(pv_root):
    folder_path = os.path.join(pv_root, folder)
    if os.path.isdir(folder_path):
        if folder.lower().startswith("tomato"):
            all_rows += add_images_from_folder(folder_path, crop_name="Tomato")
        elif folder.lower().startswith("potato"):
            all_rows += add_images_from_folder(folder_path, crop_name="Potato")
        elif folder.lower().startswith("pepper"):
            all_rows += add_images_from_folder(folder_path, crop_name="Pepper")

# Sugarcane
all_rows += add_images_from_folder(DATASET_FOLDERS["sugarcane"], crop_name="Sugarcane")

# Wheat
all_rows += add_images_from_folder(DATASET_FOLDERS["wheat"], crop_name="Wheat")

# Rice
rice_root = os.path.join(DATASET_FOLDERS["rice"], "Rice_Leaf_Diease")
for root, dirs, files in os.walk(rice_root):
    if any("aug" in d.lower() for d in root.split(os.sep)):
        continue
    for f in files:
        if f.lower().endswith((".jpg",".jpeg",".png")):
            all_rows.append({"path":os.path.join(root,f),"crop":"Paddy","disease":normalize_name(os.path.basename(root))})

# Build DataFrame & labels
df = pd.DataFrame(all_rows).drop_duplicates().reset_index(drop=True)
df["disease_label"] = df["crop"] + "___" + df["disease"]
df["label"] = pd.factorize(df["disease_label"])[0]

# Save CSV
output_csv = "/kaggle/working/kaggle_dataset_manifest.csv"
df.to_csv(output_csv,index=False)
print(f"Saved {output_csv}")
print("Total images:", len(df))
print("Num unique classes:", df['label'].nunique())


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
import pandas as pd
from sklearn.model_selection import train_test_split
import json

IMG_SIZE = (224,224)
BATCH_SIZE = 32
EPOCHS_BASE = 10
EPOCHS_FINE = 15

df = pd.read_csv("/kaggle/working/kaggle_dataset_manifest.csv")
train_df, temp_df = train_test_split(df, test_size=0.30, stratify=df['label'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.50, stratify=temp_df['label'], random_state=42)

# -----------------------------
# Generators
# -----------------------------
def create_generators(model_name):
    if model_name=="EfficientNetB0":
        preprocess_func = tf.keras.applications.efficientnet.preprocess_input
    elif model_name=="ResNet50":
        preprocess_func = tf.keras.applications.resnet.preprocess_input
    else:
        preprocess_func = tf.keras.applications.mobilenet_v2.preprocess_input

    train_datagen = ImageDataGenerator(
        preprocessing_function=preprocess_func,
        rotation_range=20, width_shift_range=0.1,
        height_shift_range=0.1, brightness_range=[0.8,1.2],
        horizontal_flip=True
    )
    val_datagen = ImageDataGenerator(preprocessing_function=preprocess_func)
    
    train_gen = train_datagen.flow_from_dataframe(
        train_df, x_col='path', y_col='disease_label',
        target_size=IMG_SIZE, class_mode='categorical',
        batch_size=BATCH_SIZE, shuffle=True
    )
    val_gen = val_datagen.flow_from_dataframe(
        val_df, x_col='path', y_col='disease_label',
        target_size=IMG_SIZE, class_mode='categorical',
        batch_size=BATCH_SIZE, shuffle=False
    )
    return train_gen, val_gen

# -----------------------------
# Build model
# -----------------------------
def build_model(model_name, num_classes):
    if model_name=="EfficientNetB0":
        base = tf.keras.applications.EfficientNetB0(include_top=False, weights='imagenet', input_shape=(*IMG_SIZE,3))
    elif model_name=="ResNet50":
        base = tf.keras.applications.ResNet50(include_top=False, weights='imagenet', input_shape=(*IMG_SIZE,3))
    elif model_name=="MobileNetV2":
        base = tf.keras.applications.MobileNetV2(include_top=False, weights='imagenet', input_shape=(*IMG_SIZE,3))
    else:
        raise ValueError("Invalid model name")
    
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    model = models.Model(inputs=base.input, outputs=out)
    base.trainable = False
    
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
                  loss='categorical_crossentropy', metrics=['accuracy'])
    return model, base

# -----------------------------
# Train function
# -----------------------------
def train_model(model_name):
    print(f"\nTraining {model_name}")
    train_gen, val_gen = create_generators(model_name)
    num_classes = len(train_gen.class_indices)
    model, base = build_model(model_name, num_classes)
    
    callbacks = [
        EarlyStopping(patience=5, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3)
    ]
    
    # Base training
    history_base = model.fit(train_gen, validation_data=val_gen, epochs=EPOCHS_BASE, callbacks=callbacks)
    
    # Fine-tuning last 50 layers
    for layer in base.layers[-50:]:
        layer.trainable = True
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-5),
                  loss='categorical_crossentropy', metrics=['accuracy'])
    history_fine = model.fit(train_gen, validation_data=val_gen, epochs=EPOCHS_FINE, callbacks=callbacks)
    
    # Save model
    model_file = f"{model_name.lower()}_disease_model.h5"
    model.save(model_file)
    print(f"{model_name} saved as {model_file}")
    
    # Save info for Grad-CAM
    info = {
        "model_file": model_file,
        "last_conv_layer": base.layers[-1].name,
        "class_indices": train_gen.class_indices,
        "history_base": history_base.history,
        "history_fine": history_fine.history
    }
    with open(f"{model_name.lower()}_info.json","w") as f:
        json.dump(info, f)
    
    return model_file

# -----------------------------
# Train all models
# -----------------------------
if __name__ == "__main__":
    models_to_train = ["EfficientNetB0","ResNet50","MobileNetV2"]
    for m in models_to_train:
        train_model(m)


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import json

IMG_SIZE=(224,224)
BATCH_SIZE=32

df = pd.read_csv("/kaggle/working/kaggle_dataset_manifest.csv")
_, temp_df = train_test_split(df, test_size=0.30, stratify=df['label'], random_state=42)
_, test_df = train_test_split(temp_df, test_size=0.50, stratify=temp_df['label'], random_state=42)

test_datagen = ImageDataGenerator(rescale=1./255)
test_gen = test_datagen.flow_from_dataframe(
    test_df, x_col='path', y_col='disease_label',
    target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', shuffle=False
)

models_info_files = ["efficientnetb0_info.json", "resnet50_info.json", "mobilenetv2_info.json"]

for info_file in models_info_files:
    with open(info_file,"r") as f:
        info = json.load(f)

    model = load_model(info["model_file"])
    class_indices = info["class_indices"]
    labels = list(class_indices.keys())

    print(f"\nEvaluating {info_file.split('_')[0].capitalize()}...")
    preds = model.predict(test_gen)
    y_true = test_gen.classes
    y_pred = np.argmax(preds, axis=1)

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12,10))
    sns.heatmap(cm/np.sum(cm), annot=False, cmap='Blues')
    plt.title(f"{info_file.split('_')[0].capitalize()} Confusion Matrix (Normalized)")
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()

    # Classification report
    print(classification_report(y_true, y_pred, target_names=labels))

    # Training curves
    history_base = info['history_base']
    history_fine = info['history_fine']

    acc = history_base['accuracy'] + history_fine['accuracy']
    val_acc = history_base['val_accuracy'] + history_fine['val_accuracy']
    loss = history_base['loss'] + history_fine['loss']
    val_loss = history_base['val_loss'] + history_fine['val_loss']

    plt.figure(figsize=(10,5))
    plt.plot(acc,label='Train Acc')
    plt.plot(val_acc,label='Val Acc')
    plt.plot(loss,label='Train Loss')
    plt.plot(val_loss,label='Val Loss')
    plt.title(f"{info_file.split('_')[0].capitalize()} Training Curves")
    plt.xlabel('Epochs')
    plt.legend()
    plt.show()


In [None]:
import tensorflow as tf
import numpy as np
import cv2

def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    """
    Generates Grad-CAM heatmap for given image and model.
    """
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )

    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        loss = predictions[:, pred_index]

    grads = tape.gradient(loss, conv_outputs)
    pooled_grads = tf.reduce_mean(grads, axis=(0,1,2))
    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = np.maximum(heatmap, 0) / (np.max(heatmap) + 1e-10)
    return heatmap

def overlay_heatmap(img, heatmap, alpha=0.4):
    """
    Superimposes heatmap on original image.
    """
    heatmap = cv2.resize(heatmap, (img.shape[1], img.shape[0]))
    heatmap = np.uint8(255*heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    superimposed = cv2.addWeighted(img, 1-alpha, heatmap, alpha, 0)
    return superimposed


In [None]:
# train_models_dryrun.py
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import json
import os

# -----------------------------
# Config
# -----------------------------
IMG_SIZE = (224,224)
BATCH_SIZE = 16           # smaller for dry-run
EPOCHS_BASE = 5           # short dry-run
EPOCHS_FINE = 5
DRY_RUN_FRACTION = 0.15   # 15% of dataset for quick testing
MODELS_TO_TRAIN = ["EfficientNetB0", "ResNet50", "MobileNetV2"]

# -----------------------------
# Load Dataset & Subset
# -----------------------------
df = pd.read_csv("/kaggle/working/kaggle_dataset_manifest.csv")
df_sample, _ = train_test_split(df, test_size=1-DRY_RUN_FRACTION, stratify=df['label'], random_state=42)

train_df, temp_df = train_test_split(df_sample, test_size=0.30, stratify=df_sample['label'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.50, stratify=temp_df['label'], random_state=42)

print(f"Dry-run dataset size: {len(df_sample)}, Train: {len(train_df)}, Val: {len(val_df)}, Test: {len(test_df)}")

# -----------------------------
# Generators
# -----------------------------
def create_generators(model_name):
    if model_name=="EfficientNetB0":
        preprocess_func = tf.keras.applications.efficientnet.preprocess_input
    elif model_name=="ResNet50":
        preprocess_func = tf.keras.applications.resnet.preprocess_input
    else:
        preprocess_func = tf.keras.applications.mobilenet_v2.preprocess_input

    train_datagen = ImageDataGenerator(
        preprocessing_function=preprocess_func,
        rotation_range=20, width_shift_range=0.1, height_shift_range=0.1,
        brightness_range=[0.8,1.2], horizontal_flip=True
    )
    val_datagen = ImageDataGenerator(preprocessing_function=preprocess_func)

    train_gen = train_datagen.flow_from_dataframe(
        train_df, x_col='path', y_col='disease_label',
        target_size=IMG_SIZE, class_mode='categorical',
        batch_size=BATCH_SIZE, shuffle=True
    )
    val_gen = val_datagen.flow_from_dataframe(
        val_df, x_col='path', y_col='disease_label',
        target_size=IMG_SIZE, class_mode='categorical',
        batch_size=BATCH_SIZE, shuffle=False
    )
    return train_gen, val_gen

# -----------------------------
# Build Model
# -----------------------------
def build_model(model_name, num_classes):
    if model_name=="EfficientNetB0":
        base = tf.keras.applications.EfficientNetB0(include_top=False, weights='imagenet', input_shape=(*IMG_SIZE,3))
    elif model_name=="ResNet50":
        base = tf.keras.applications.ResNet50(include_top=False, weights='imagenet', input_shape=(*IMG_SIZE,3))
    elif model_name=="MobileNetV2":
        base = tf.keras.applications.MobileNetV2(include_top=False, weights='imagenet', input_shape=(*IMG_SIZE,3))
    else:
        raise ValueError("Invalid model name")
    
    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.BatchNormalization()(x)
    x = layers.Dropout(0.3)(x)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(num_classes, activation='softmax')(x)
    model = models.Model(inputs=base.input, outputs=out)
    base.trainable = False
    
    model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
                  loss='categorical_crossentropy', metrics=['accuracy'])
    return model, base

# -----------------------------
# Plot Training Curves
# -----------------------------
def plot_training(history_base, history_fine, model_name):
    acc = history_base['accuracy'] + history_fine['accuracy']
    val_acc = history_base['val_accuracy'] + history_fine['val_accuracy']
    loss = history_base['loss'] + history_fine['loss']
    val_loss = history_base['val_loss'] + history_fine['val_loss']

    plt.figure(figsize=(12,5))
    plt.subplot(1,2,1)
    plt.plot(acc,label='Train Acc')
    plt.plot(val_acc,label='Val Acc')
    plt.title(f"{model_name} Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()

    plt.subplot(1,2,2)
    plt.plot(loss,label='Train Loss')
    plt.plot(val_loss,label='Val Loss')
    plt.title(f"{model_name} Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

# -----------------------------
# Confusion Matrix
# -----------------------------
def evaluate_model(model, test_df, class_indices, model_name):
    test_datagen = ImageDataGenerator(rescale=1./255)
    test_gen = test_datagen.flow_from_dataframe(
        test_df, x_col='path', y_col='disease_label',
        target_size=IMG_SIZE, batch_size=BATCH_SIZE, class_mode='categorical', shuffle=False
    )
    preds = model.predict(test_gen)
    y_true = test_gen.classes
    y_pred = np.argmax(preds, axis=1)
    labels = list(class_indices.keys())

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(12,10))
    sns.heatmap(cm/np.sum(cm), annot=False, cmap='Blues')
    plt.title(f"{model_name} Confusion Matrix (Normalized)")
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()

    print(classification_report(y_true, y_pred, target_names=labels))

# -----------------------------
# Train All Models (Dry Run)
# -----------------------------
def train_all_models():
    trained_models_info = {}
    for model_name in MODELS_TO_TRAIN:
        print(f"\n===== Training {model_name} (Dry Run) =====")
        train_gen, val_gen = create_generators(model_name)
        num_classes = len(train_gen.class_indices)
        model, base = build_model(model_name, num_classes)

        callbacks = [
            EarlyStopping(patience=3, restore_best_weights=True),
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2)
        ]

        # Base training
        history_base = model.fit(train_gen, validation_data=val_gen, epochs=EPOCHS_BASE, callbacks=callbacks)

        # Fine-tuning last 50 layers
        for layer in base.layers[-50:]:
            layer.trainable = True
        model.compile(optimizer=tf.keras.optimizers.Adam(1e-5),
                      loss='categorical_crossentropy', metrics=['accuracy'])
        history_fine = model.fit(train_gen, validation_data=val_gen, epochs=EPOCHS_FINE, callbacks=callbacks)

        # Save model & info
        model_file = f"{model_name.lower()}_dryrun_model.h5"
        model.save(model_file)
        info = {
            "model_file": model_file,
            "last_conv_layer": base.layers[-1].name,
            "class_indices": train_gen.class_indices,
            "history_base": history_base.history,
            "history_fine": history_fine.history
        }
        with open(f"{model_name.lower()}_dryrun_info.json","w") as f:
            json.dump(info, f)

        # Plot curves & evaluate
        plot_training(history_base, history_fine, model_name)
        evaluate_model(model, test_df, train_gen.class_indices, model_name)

        trained_models_info[model_name] = info

    return trained_models_info

# -----------------------------
# Run Dry-Run Training
# -----------------------------
if __name__ == "__main__":
    trained_models_info = train_all_models()
    print("Dry-run complete for all models.")
