"""
Coffee Disease Detection using MobileNetV2 and TensorFlow
---------------------------------------------------------
Classifies coffee leaf images into Healthy, Rust, and Phoma.
Implements transfer learning, data augmentation, reproducibility, and deployment readiness.
"""

In [1]:
# coffee_disease_detection.py

import os
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix

# -----------------------------
# 1. Reproducibility Settings
# -----------------------------
SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
tf.keras.utils.set_random_seed(SEED)
tf.config.experimental.enable_op_determinism()

# -----------------------------
# 2. Dataset Parameters
# -----------------------------
IMG_SIZE = (256, 256)
BATCH_SIZE = 100
DATA_DIR = 'dataset'  # Root directory containing train/val/test subfolders

# -----------------------------
# 3. Data Loading
# -----------------------------
def get_datasets(data_dir, img_size, batch_size, seed):
    train_ds = tf.keras.utils.image_dataset_from_directory(
        os.path.join(data_dir, 'train'),
        labels='inferred',
        label_mode='categorical',
        batch_size=batch_size,
        image_size=img_size,
        shuffle=True,
        seed=seed
    )
    val_ds = tf.keras.utils.image_dataset_from_directory(
        os.path.join(data_dir, 'val'),
        labels='inferred',
        label_mode='categorical',
        batch_size=batch_size,
        image_size=img_size,
        shuffle=False,
        seed=seed
    )
    test_ds = tf.keras.utils.image_dataset_from_directory(
        os.path.join(data_dir, 'test'),
        labels='inferred',
        label_mode='categorical',
        batch_size=batch_size,
        image_size=img_size,
        shuffle=False,
        seed=seed
    )
    return train_ds, val_ds, test_ds

train_ds, val_ds, test_ds = get_datasets(DATA_DIR, IMG_SIZE, BATCH_SIZE, SEED)
class_names = train_ds.class_names

# -----------------------------
# 4. Data Augmentation
# -----------------------------
data_augmentation = tf.keras.Sequential([
    layers.RandomFlip('horizontal_and_vertical', seed=SEED),
    layers.RandomRotation(0.1, seed=SEED),
    layers.RandomZoom(0.1, seed=SEED),
    layers.RandomContrast(0.1, seed=SEED),
    layers.RandomBrightness(0.1, seed=SEED)
], name='data_augmentation')

# -----------------------------
# 5. Preprocessing Pipeline
# -----------------------------
def preprocess(image, label):
    image = preprocess_input(image)
    return image, label

train_ds = train_ds.map(lambda x, y: (preprocess_input(x), y), num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.map(lambda x, y: (preprocess_input(x), y), num_parallel_calls=tf.data.AUTOTUNE)
test_ds = test_ds.map(lambda x, y: (preprocess_input(x), y), num_parallel_calls=tf.data.AUTOTUNE)

train_ds = train_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
test_ds = test_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

# -----------------------------
# 6. Model Building
# -----------------------------
def build_model(input_shape, num_classes, base_trainable=False, fine_tune_at=None):
    base_model = MobileNetV2(
        input_shape=input_shape,
        include_top=False,
        weights='imagenet'
    )
    base_model.trainable = base_trainable
    if fine_tune_at is not None:
        for layer in base_model.layers[:fine_tune_at]:
            layer.trainable = False

    inputs = layers.Input(shape=input_shape)
    x = data_augmentation(inputs)
    x = base_model(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    model = models.Model(inputs, outputs)
    return model

model = build_model(input_shape=IMG_SIZE + (3,), num_classes=len(class_names), base_trainable=False)

# -----------------------------
# 7. Compile Model
# -----------------------------
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# -----------------------------
# 8. Compute Class Weights (Optional)
# -----------------------------
def get_class_weights(dataset):
    labels = np.concatenate([y for x, y in dataset], axis=0)
    y_integers = np.argmax(labels, axis=1)
    class_weights = compute_class_weight('balanced', classes=np.arange(len(class_names)), y=y_integers)
    return dict(enumerate(class_weights))

class_weight = get_class_weights(train_ds)

# -----------------------------
# 9. Callbacks
# -----------------------------
checkpoint_cb = callbacks.ModelCheckpoint(
    'best_model.h5', monitor='val_loss', save_best_only=True, verbose=1
)
earlystop_cb = callbacks.EarlyStopping(
    monitor='val_loss', patience=10, restore_best_weights=True, verbose=1
)
tensorboard_cb = callbacks.TensorBoard(log_dir='logs')

callback_list = [checkpoint_cb, earlystop_cb, tensorboard_cb]

# -----------------------------
# 10. Initial Training (Feature Extraction)
# -----------------------------
EPOCHS = 30
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=callback_list,
    class_weight=class_weight
)

# -----------------------------
# 11. Fine-Tuning
# -----------------------------
# Unfreeze last 20 layers for fine-tuning
model = build_model(input_shape=IMG_SIZE + (3,), num_classes=len(class_names), base_trainable=True, fine_tune_at=-20)
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)
# Load best weights from initial training
model.load_weights('best_model.h5')

FINE_TUNE_EPOCHS = 10
total_epochs = EPOCHS + FINE_TUNE_EPOCHS

history_fine = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=total_epochs,
    initial_epoch=history.epoch[-1] + 1,
    callbacks=callback_list,
    class_weight=class_weight
)

# -----------------------------
# 12. Evaluation
# -----------------------------
# Load best weights
model.load_weights('best_model.h5')

# Evaluate on test set
test_loss, test_acc = model.evaluate(test_ds)
print(f"Test Accuracy: {test_acc:.4f}")

# Classification report and confusion matrix
y_true = np.concatenate([y for x, y in test_ds], axis=0)
y_pred = model.predict(test_ds)
y_true_labels = np.argmax(y_true, axis=1)
y_pred_labels = np.argmax(y_pred, axis=1)

print("Confusion Matrix:")
print(confusion_matrix(y_true_labels, y_pred_labels))
print("Classification Report:")
print(classification_report(y_true_labels, y_pred_labels, target_names=class_names))

# -----------------------------
# 13. Save Model for Deployment
# -----------------------------
model.save('saved_model/coffee_disease_detector')
# Optionally, save as H5
model.save('coffee_disease_detector.h5')

# Convert to TFLite
converter = tf.lite.TFLiteConverter.from_saved_model('saved_model/coffee_disease_detector')
tflite_model = converter.convert()
with open('coffee_disease_detector.tflite', 'wb') as f:
    f.write(tflite_model)

# -----------------------------
# 14. Grad-CAM for Interpretability (Example)
# -----------------------------
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    grad_model = tf.keras.models.Model(
        [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
    )
    with tf.GradientTape() as tape:
        conv_outputs, predictions = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(predictions[0])
        class_channel = predictions[:, pred_index]
    grads = tape.gradient(class_channel, conv_outputs)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    conv_outputs = conv_outputs[0]
    heatmap = conv_outputs @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

# Usage example:
# img = tf.keras.utils.load_img('sample.jpg', target_size=IMG_SIZE)
# img_array = tf.keras.utils.img_to_array(img)
# img_array = np.expand_dims(img_array, axis=0)
# img_array = preprocess_input(img_array)
# heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name='Conv_1')

NameError: name 'train_ds' is not defined