In [None]:
import os
from collections import Counter
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models

In [None]:
# Config
DATA_DIR = "dataset"
IMG_SIZE = (256, 256)
RESIZE_TO = (64, 64)
BATCH_SIZE = 32
SEED = 42
# Split ratios
TRAIN_SPLIT = 0.7   # 70% for training
VAL_SPLIT = 0.2    # 20% for validation  
TEST_SPLIT = 0.1   # 10% for testing

CLASS_NAMES = ['Bacterial spot', 'Early blight', 'Late blight', 'Leaf Mold', 'Septoria leaf spot', 'Target Spot', 'Mosaic virus', 'Two-spotted spider mite', 'Yellow Leaf Curl Virus', 'Healthy']

In [None]:
from dataset_tools import create_stratified_splits

# Create stratified splits
print("Creating stratified splits...")
(X_train, y_train), (X_val, y_val), (X_test, y_test), class_names = create_stratified_splits(DATA_DIR, TRAIN_SPLIT, VAL_SPLIT, TEST_SPLIT, SEED)

num_classes = len(class_names)
print(f"Found {num_classes} classes: {class_names}")

# Print distribution info
total_samples = len(X_train) + len(X_val) + len(X_test)
print(f"\nDataset distribution:")
print(f"Train samples: {len(X_train)} ({len(X_train)/total_samples*100:.1f}%)")
print(f"Validation samples: {len(X_val)} ({len(X_val)/total_samples*100:.1f}%)")
print(f"Test samples: {len(X_test)} ({len(X_test)/total_samples*100:.1f}%)")

In [None]:
# Check class distribution in each split
print(f"\nClass distribution per split:")
for i, class_name in enumerate(class_names):
    train_count = np.sum(y_train == i)
    val_count = np.sum(y_val == i)
    test_count = np.sum(y_test == i)
    total_count = train_count + val_count + test_count
    
    print(f"{class_name}:")
    print(f"  Train: {train_count} ({train_count/total_count*100:.1f}%)")
    print(f"  Val:   {val_count} ({val_count/total_count*100:.1f}%)")
    print(f"  Test:  {test_count} ({test_count/total_count*100:.1f}%)")

In [None]:
from dataset_tools import create_tf_dataset_from_paths
# Create TensorFlow datasets

print("\nCreating TensorFlow datasets...")
train_ds = create_tf_dataset_from_paths(IMG_SIZE, X_train, y_train, BATCH_SIZE, seed=SEED, shuffle=True)
val_ds = create_tf_dataset_from_paths(IMG_SIZE, X_val, y_val, BATCH_SIZE, seed=SEED, shuffle=False)
test_ds = create_tf_dataset_from_paths(X_test, y_test, BATCH_SIZE, seed=SEED, shuffle=False)

# Preprocessing function
def resize_and_normalize(image, label):
    image = tf.image.resize(image, RESIZE_TO)
    image = image / 255.0
    return image, label

# Apply preprocessing
train_ds = train_ds.map(resize_and_normalize, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.map(resize_and_normalize, num_parallel_calls=tf.data.AUTOTUNE)
test_ds = test_ds.map(resize_and_normalize, num_parallel_calls=tf.data.AUTOTUNE)

# Optimize performance
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

In [None]:
from keras.layers import  MaxPooling2D, GlobalAveragePooling2D, Dense, DepthwiseConv2D, Dropout, Conv2D, LeakyReLU, BatchNormalization
from keras.optimizers import Adam, SGD, RMSprop

activation = 'relu'

# 3. Define a simple CNN model
model = models.Sequential([
    # Layer 1
    DepthwiseConv2D(kernel_size=3, input_shape=RESIZE_TO + (3,), padding='same'),
    Conv2D(16, 1, activation=activation),
    MaxPooling2D(),
   
    # Layer 2
    DepthwiseConv2D(kernel_size=3, padding='same'),
    Conv2D(32, 1, activation=activation),
    MaxPooling2D(),
   
    # Layer 3
    DepthwiseConv2D(kernel_size=3, padding='same'),
    BatchNormalization(),
    Conv2D(64, 1, activation=activation),
    MaxPooling2D(),

   
    GlobalAveragePooling2D(),
    Dense(32, activation=activation),
    Dense(num_classes, activation='softmax')
])

## Train

In [None]:
optimizer = RMSprop()


model.compile(
    optimizer= optimizer,
    loss='sparse_categorical_crossentropy',  # or 'binary_crossentropy' for 2 classes
    metrics=['accuracy']
)

model.summary()

In [None]:
from datetime import datetime

# Define your experiment configuration
experiment_name = "BN_no_Data_no_RMS"
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
log_dir = f"logs/fit/{experiment_name}_{timestamp}"

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1),
    tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=3, min_lr=1e-7)
]

# Train the model
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=200,
    callbacks=callbacks,
    verbose=1
)

In [None]:
from evaluation_tools import plot_training_history

plot_training_history(history, experiment_name)

In [None]:
# Evaluate on test set
test_loss, test_acc = model.evaluate(test_ds)
print(f'Test accuracy: {test_acc}')

In [None]:
from evaluation_tools import show_sample_predictions

show_sample_predictions(test_ds,model,class_names=class_names)

In [None]:
from evaluation_tools import comprehensive_evaluation

results = comprehensive_evaluation(model, test_ds, CLASS_NAMES, experiment_name, save_plots=True)

## Save and Quantize

In [None]:
import os

model_name = f"models\\{experiment_name}.h5"
model.save(model_name)

size = os.path.getsize(model_name) / (1024)  # Convert to MB
print(f"Model size: {size:.2f} KB")

In [None]:
import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8  # or tf.int8 depending on ESP32 support
converter.inference_output_type = tf.uint8

def representative_dataset():
    for image_batch, _ in train_ds.take(100):
        for img in image_batch:
            img = tf.cast(img, tf.float32)
            img = tf.expand_dims(img, axis=0)  # Fix: make input 4D
            yield [img]

converter.representative_dataset = representative_dataset

# 5. Convert
quantized_model = converter.convert()

# 6. Save
with open(f"models\\{experiment_name}_quant.tflite", "wb") as f:
    f.write(quantized_model)

print("TFLite model size:", len(quantized_model) / 1024, "KB")

In [None]:
from quantization_tools import evaluate_quantized_model

# Load the TFLite model
interpreter = tf.lite.Interpreter(model_path=f"models\\{experiment_name}_quant.tflite")
interpreter.allocate_tensors()


interpreter.allocate_tensors()

def convert_to_uint8(image, label):
    # Scale to 0-255 if your images are normalized (0-1 or -1 to 1)
    image = tf.cast(image * 255.0, tf.uint8)
    return image, label

# Apply to your test dataset
test_ds_uint8 = test_ds.map(convert_to_uint8)

# Evaluate
quantized_accuracy = evaluate_quantized_model(interpreter, test_ds_uint8)
print(f"Quantized model accuracy: {quantized_accuracy:.4f}")

## QAT

In [None]:
import tensorflow_model_optimization as tfmot

# Apply quantization-aware training to the loaded model
quantize_model = tfmot.quantization.keras.quantize_model
q_aware_model = quantize_model(model)

# Compile with a much lower learning rate
q_aware_model.compile(
    optimizer=optimizer,  # Very low LR
    loss='sparse_categorical_crossentropy',  # Use your original loss
    metrics=['accuracy']
)

# Fine-tune for just a few epochs
qat_history = q_aware_model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=5,  # Start with just 3 epochs
    verbose=1
)

# Check if accuracy is maintained
print(f"QAT model accuracy: {max(history.history['val_accuracy'])}")

In [None]:
# Evaluate on test set
test_loss, test_acc = q_aware_model.evaluate(test_ds)
print(f'Test accuracy: {test_acc}')

In [None]:
import tensorflow as tf

converter = tf.lite.TFLiteConverter.from_keras_model(q_aware_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8  # or tf.int8 depending on ESP32 support
converter.inference_output_type = tf.uint8

def representative_dataset():
    for image_batch, _ in train_ds.take(100):
        for img in image_batch:
            img = tf.cast(img, tf.float32)
            img = tf.expand_dims(img, axis=0)  # Fix: make input 4D
            yield [img]

converter.representative_dataset = representative_dataset

# 5. Convert
quantized_model = converter.convert()

# 6. Save
with open(f"models\\{experiment_name}__qat.tflite", "wb") as f:
    f.write(quantized_model)

# Check model size
import os
original_size = os.path.getsize(f'models\\{experiment_name}.h5')
quantized_size = os.path.getsize(f"models\\{experiment_name}__qat.tflite")
print(f"Original model: {original_size / 1024:.1f} KB")
print(f"Quantized model: {quantized_size / 1024:.1f} KB")
print(f"Size reduction: {(1 - quantized_size/original_size) * 100:.1f}%")