<a href="https://colab.research.google.com/github/armelyara/drgreen/blob/claude/drgreen-v2-01TfLAqRxjEF2BkLLt72vJrL/drgreen_v8_finetuning_tta.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dr Green V8 - Fine-Tuning + Test-Time Augmentation

**V7 achieved 70.82% - targeting 85%+ with:**

1. **Two-phase training:**
   - Phase 1: Frozen base (like V7)
   - Phase 2: Fine-tune last 30 layers of MobileNetV2

2. **Test-Time Augmentation (TTA):**
   - Multiple augmented predictions per image
   - Average predictions for more robust results
   - Expected improvement: 3-5%

3. **Stratified split** (from V7)
4. **Focal Loss** (from V6)

### Target: >85% accuracy

## 1. Setup & Imports

In [None]:
!pip install -q gdown

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import json
from datetime import datetime
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
import zipfile
import os
import gdown

print(f"TensorFlow version: {tf.__version__}")
print(f"GPU available: {len(tf.config.list_physical_devices('GPU')) > 0}")

In [None]:
# Download dataset
file_id = '1zI5KfTtuV0BlBQnNDNq4tBJuEkxLZZBD'
url = f'https://drive.google.com/uc?id={file_id}'
output = '/content/drgreen.zip'

print("Downloading dataset...")
gdown.download(url, output, quiet=False)
with zipfile.ZipFile(output, 'r') as zip_ref:
    zip_ref.extractall('/content')
print("Dataset ready!")

## 2. Configuration

In [None]:
CONFIG = {
    'data_dir': 'rename',
    'model_save_dir': 'models',
    'img_height': 224,
    'img_width': 224,
    'batch_size': 16,
    
    # Phase 1: Frozen training
    'phase1_epochs': 30,
    'phase1_lr': 0.001,
    
    # Phase 2: Fine-tuning
    'phase2_epochs': 30,
    'phase2_lr': 0.00005,  # Much lower LR for fine-tuning
    'fine_tune_at': 100,  # Unfreeze from layer 100 onwards (last ~30 layers)
    
    'validation_split': 0.2,
    'seed': 42,
    'num_classes': 4,
    
    # Model
    'dropout_rate': 0.5,
    'dense_units': 128,  # Larger head for fine-tuning
    'l2_reg': 0.01,
    'label_smoothing': 0.1,
    
    # Focal Loss
    'focal_gamma': 2.0,
    'focal_alpha': 0.25,
    
    # TTA
    'tta_augmentations': 10,  # Number of augmented predictions per image
    
    # Callbacks
    'early_stopping_patience': 10,
    'reduce_lr_patience': 4,
}

Path(CONFIG['model_save_dir']).mkdir(exist_ok=True)

print("="*60)
print("DR GREEN V8 - FINE-TUNING + TTA")
print("="*60)
print(f"\nPhase 1: {CONFIG['phase1_epochs']} epochs frozen, LR={CONFIG['phase1_lr']}")
print(f"Phase 2: {CONFIG['phase2_epochs']} epochs fine-tuning, LR={CONFIG['phase2_lr']}")
print(f"Fine-tune from layer: {CONFIG['fine_tune_at']}")
print(f"TTA augmentations: {CONFIG['tta_augmentations']}")

## 3. Focal Loss

In [None]:
class FocalLoss(tf.keras.losses.Loss):
    def __init__(self, gamma=2.0, alpha=0.25, label_smoothing=0.0, **kwargs):
        super().__init__(**kwargs)
        self.gamma = gamma
        self.alpha = alpha
        self.label_smoothing = label_smoothing

    def call(self, y_true, y_pred):
        num_classes = tf.cast(tf.shape(y_true)[-1], tf.float32)
        y_true = y_true * (1.0 - self.label_smoothing) + (self.label_smoothing / num_classes)
        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())
        cross_entropy = -y_true * tf.math.log(y_pred)
        p_t = tf.reduce_sum(y_true * y_pred, axis=-1)
        focal_weight = tf.pow(1 - p_t, self.gamma)
        return tf.reduce_mean(self.alpha * focal_weight * tf.reduce_sum(cross_entropy, axis=-1))

    def get_config(self):
        return {'gamma': self.gamma, 'alpha': self.alpha, 'label_smoothing': self.label_smoothing}

## 4. Stratified Data Loading

In [None]:
# Get all image paths and labels
data_dir = Path(CONFIG['data_dir'])
class_names = sorted([d.name for d in data_dir.iterdir() if d.is_dir()])

all_image_paths = []
all_labels = []

for class_idx, class_name in enumerate(class_names):
    class_dir = data_dir / class_name
    for ext in ['*.jpg', '*.jpeg', '*.png', '*.JPG', '*.JPEG', '*.PNG']:
        for img_path in class_dir.glob(ext):
            all_image_paths.append(str(img_path))
            all_labels.append(class_idx)

all_image_paths = np.array(all_image_paths)
all_labels = np.array(all_labels)

print(f"Total images: {len(all_image_paths)}")
for i, name in enumerate(class_names):
    print(f"  {name}: {(all_labels == i).sum()}")

In [None]:
# Stratified split
train_paths, val_paths, train_labels, val_labels = train_test_split(
    all_image_paths, all_labels,
    test_size=CONFIG['validation_split'],
    random_state=CONFIG['seed'],
    stratify=all_labels
)

print(f"\nTraining: {len(train_paths)} images")
print(f"Validation: {len(val_paths)} images")

# Class weights
class_weights = {}
for i in range(len(class_names)):
    count = (train_labels == i).sum()
    class_weights[i] = (len(train_labels) / (len(class_names) * count)) ** 1.2

print("\nClass weights:")
for i, w in class_weights.items():
    print(f"  {class_names[i]}: {w:.3f}")

In [None]:
# Create datasets
def load_image(path, label):
    img = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, [CONFIG['img_height'], CONFIG['img_width']])
    return img, label

train_ds = tf.data.Dataset.from_tensor_slices((train_paths, train_labels))
train_ds = train_ds.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.map(lambda x, y: (x, tf.one_hot(y, CONFIG['num_classes'])))

val_ds = tf.data.Dataset.from_tensor_slices((val_paths, val_labels))
val_ds = val_ds.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.map(lambda x, y: (x, tf.one_hot(y, CONFIG['num_classes'])))

## 5. Data Augmentation

In [None]:
# Training augmentation
train_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal_and_vertical"),
    tf.keras.layers.RandomRotation(0.3),
    tf.keras.layers.RandomZoom(0.2),
    tf.keras.layers.RandomBrightness(0.2),
    tf.keras.layers.RandomContrast(0.2),
    tf.keras.layers.RandomTranslation(0.1, 0.1),
], name="train_augmentation")

# TTA augmentation (lighter for inference)
tta_augmentation = tf.keras.Sequential([
    tf.keras.layers.RandomFlip("horizontal"),
    tf.keras.layers.RandomRotation(0.15),
    tf.keras.layers.RandomZoom(0.1),
    tf.keras.layers.RandomBrightness(0.1),
], name="tta_augmentation")

preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input

# Apply to training
train_ds = train_ds.map(lambda x, y: (train_augmentation(x, training=True), y), num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.map(lambda x, y: (preprocess_input(x), y), num_parallel_calls=tf.data.AUTOTUNE)
train_ds = train_ds.shuffle(1000).batch(CONFIG['batch_size']).prefetch(tf.data.AUTOTUNE)

# Validation (no augmentation)
val_ds = val_ds.map(lambda x, y: (preprocess_input(x), y), num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.batch(CONFIG['batch_size']).prefetch(tf.data.AUTOTUNE)

print("Data pipelines ready")

## 6. Build Model

In [None]:
def build_model():
    inputs = tf.keras.Input(shape=(CONFIG['img_height'], CONFIG['img_width'], 3))
    
    base_model = tf.keras.applications.MobileNetV2(
        include_top=False,
        weights='imagenet',
        input_tensor=inputs,
        pooling='avg'
    )
    base_model.trainable = False  # Start frozen
    
    x = base_model.output
    x = tf.keras.layers.Dropout(CONFIG['dropout_rate'])(x)
    x = tf.keras.layers.Dense(
        CONFIG['dense_units'],
        activation='relu',
        kernel_regularizer=tf.keras.regularizers.l2(CONFIG['l2_reg'])
    )(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(CONFIG['dropout_rate'] * 0.5)(x)
    outputs = tf.keras.layers.Dense(
        CONFIG['num_classes'],
        activation='softmax',
        kernel_regularizer=tf.keras.regularizers.l2(CONFIG['l2_reg'])
    )(x)
    
    return tf.keras.Model(inputs, outputs, name='DrGreen_V8'), base_model

model, base_model = build_model()

print(f"\nModel built: {model.count_params():,} parameters")
print(f"MobileNetV2 has {len(base_model.layers)} layers")
print(f"Will fine-tune from layer {CONFIG['fine_tune_at']}")

## 7. Phase 1: Frozen Training

In [None]:
print("\n" + "="*60)
print("PHASE 1: FROZEN BASE TRAINING")
print("="*60)

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=CONFIG['phase1_lr']),
    loss=FocalLoss(
        gamma=CONFIG['focal_gamma'],
        alpha=CONFIG['focal_alpha'],
        label_smoothing=CONFIG['label_smoothing']
    ),
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
        tf.keras.metrics.TopKCategoricalAccuracy(k=2, name='top2_accuracy')
    ]
)

callbacks_phase1 = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=CONFIG['early_stopping_patience'],
        restore_best_weights=True,
        mode='max'
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_accuracy',
        factor=0.5,
        patience=CONFIG['reduce_lr_patience'],
        min_lr=1e-6,
        mode='max'
    ),
    tf.keras.callbacks.ModelCheckpoint(
        filepath=f"{CONFIG['model_save_dir']}/best_model_v8_phase1.keras",
        monitor='val_accuracy',
        save_best_only=True,
        mode='max'
    )
]

history_phase1 = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=CONFIG['phase1_epochs'],
    callbacks=callbacks_phase1,
    class_weight=class_weights,
    verbose=1
)

phase1_best_acc = max(history_phase1.history['val_accuracy'])
print(f"\nPhase 1 complete! Best val accuracy: {phase1_best_acc*100:.2f}%")

## 8. Phase 2: Fine-Tuning

In [None]:
print("\n" + "="*60)
print("PHASE 2: FINE-TUNING")
print("="*60)

# Unfreeze layers from fine_tune_at onwards
base_model.trainable = True
for layer in base_model.layers[:CONFIG['fine_tune_at']]:
    layer.trainable = False

trainable_params = sum([tf.size(v).numpy() for v in model.trainable_variables])
print(f"\nTrainable parameters after unfreezing: {trainable_params:,}")
print(f"Fine-tuning {len(base_model.layers) - CONFIG['fine_tune_at']} layers")

# Recompile with lower learning rate
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=CONFIG['phase2_lr']),
    loss=FocalLoss(
        gamma=CONFIG['focal_gamma'],
        alpha=CONFIG['focal_alpha'],
        label_smoothing=CONFIG['label_smoothing']
    ),
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
        tf.keras.metrics.TopKCategoricalAccuracy(k=2, name='top2_accuracy')
    ]
)

callbacks_phase2 = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_accuracy',
        patience=CONFIG['early_stopping_patience'],
        restore_best_weights=True,
        mode='max'
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_accuracy',
        factor=0.5,
        patience=CONFIG['reduce_lr_patience'],
        min_lr=1e-7,
        mode='max'
    ),
    tf.keras.callbacks.ModelCheckpoint(
        filepath=f"{CONFIG['model_save_dir']}/best_model_v8.keras",
        monitor='val_accuracy',
        save_best_only=True,
        mode='max'
    )
]

history_phase2 = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=CONFIG['phase2_epochs'],
    callbacks=callbacks_phase2,
    class_weight=class_weights,
    verbose=1
)

phase2_best_acc = max(history_phase2.history['val_accuracy'])
print(f"\nPhase 2 complete! Best val accuracy: {phase2_best_acc*100:.2f}%")

## 9. Training Visualization

In [None]:
# Combine histories
total_epochs = len(history_phase1.history['accuracy']) + len(history_phase2.history['accuracy'])

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Accuracy
p1_acc = history_phase1.history['accuracy']
p1_val_acc = history_phase1.history['val_accuracy']
p2_acc = history_phase2.history['accuracy']
p2_val_acc = history_phase2.history['val_accuracy']

axes[0].plot(range(1, len(p1_acc)+1), p1_acc, 'b-', label='Train P1')
axes[0].plot(range(1, len(p1_val_acc)+1), p1_val_acc, 'b--', label='Val P1')
axes[0].plot(range(len(p1_acc)+1, len(p1_acc)+len(p2_acc)+1), p2_acc, 'r-', label='Train P2')
axes[0].plot(range(len(p1_val_acc)+1, len(p1_val_acc)+len(p2_val_acc)+1), p2_val_acc, 'r--', label='Val P2')
axes[0].axvline(x=len(p1_acc), color='g', linestyle=':', label='Fine-tune start')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Accuracy')
axes[0].set_title('Training Accuracy')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Loss
p1_loss = history_phase1.history['loss']
p1_val_loss = history_phase1.history['val_loss']
p2_loss = history_phase2.history['loss']
p2_val_loss = history_phase2.history['val_loss']

axes[1].plot(range(1, len(p1_loss)+1), p1_loss, 'b-', label='Train P1')
axes[1].plot(range(1, len(p1_val_loss)+1), p1_val_loss, 'b--', label='Val P1')
axes[1].plot(range(len(p1_loss)+1, len(p1_loss)+len(p2_loss)+1), p2_loss, 'r-', label='Train P2')
axes[1].plot(range(len(p1_val_loss)+1, len(p1_val_loss)+len(p2_val_loss)+1), p2_val_loss, 'r--', label='Val P2')
axes[1].axvline(x=len(p1_loss), color='g', linestyle=':', label='Fine-tune start')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].set_title('Training Loss')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\nPhase 1 best: {phase1_best_acc*100:.2f}%")
print(f"Phase 2 best: {phase2_best_acc*100:.2f}%")

## 10. Test-Time Augmentation (TTA)

In [None]:
# Load best model
best_model = tf.keras.models.load_model(
    f"{CONFIG['model_save_dir']}/best_model_v8.keras",
    custom_objects={'FocalLoss': FocalLoss}
)
print("Best model loaded")

def predict_with_tta(model, image_paths, labels, n_augmentations=10):
    """
    Test-Time Augmentation: Average predictions from multiple augmented versions
    """
    all_predictions = []
    
    for img_path in image_paths:
        # Load image
        img = tf.io.read_file(img_path)
        img = tf.image.decode_jpeg(img, channels=3)
        img = tf.image.resize(img, [CONFIG['img_height'], CONFIG['img_width']])
        
        # Collect predictions from original + augmented versions
        predictions = []
        
        # Original image
        img_preprocessed = preprocess_input(tf.expand_dims(img, 0))
        pred = model.predict(img_preprocessed, verbose=0)
        predictions.append(pred[0])
        
        # Augmented versions
        for _ in range(n_augmentations - 1):
            img_aug = tta_augmentation(img, training=True)
            img_preprocessed = preprocess_input(tf.expand_dims(img_aug, 0))
            pred = model.predict(img_preprocessed, verbose=0)
            predictions.append(pred[0])
        
        # Average predictions
        avg_pred = np.mean(predictions, axis=0)
        all_predictions.append(avg_pred)
    
    return np.array(all_predictions)

print(f"\nRunning TTA with {CONFIG['tta_augmentations']} augmentations per image...")
print("This may take a few minutes...")

tta_predictions = predict_with_tta(
    best_model,
    val_paths,
    val_labels,
    n_augmentations=CONFIG['tta_augmentations']
)

y_pred_tta = np.argmax(tta_predictions, axis=1)
y_true = val_labels

tta_accuracy = np.mean(y_pred_tta == y_true)
print(f"\nTTA Accuracy: {tta_accuracy*100:.2f}%")

In [None]:
# Compare with standard inference
print("\nComparing TTA vs Standard inference...")

# Standard predictions
val_ds_eval = tf.data.Dataset.from_tensor_slices((val_paths, val_labels))
val_ds_eval = val_ds_eval.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
val_ds_eval = val_ds_eval.map(lambda x, y: (preprocess_input(x), y), num_parallel_calls=tf.data.AUTOTUNE)
val_ds_eval = val_ds_eval.batch(CONFIG['batch_size'])

y_pred_standard = []
for images, labels in val_ds_eval:
    preds = best_model.predict(images, verbose=0)
    y_pred_standard.extend(np.argmax(preds, axis=1))
y_pred_standard = np.array(y_pred_standard)

standard_accuracy = np.mean(y_pred_standard == y_true)

print("\n" + "="*60)
print("TTA IMPROVEMENT")
print("="*60)
print(f"Standard accuracy: {standard_accuracy*100:.2f}%")
print(f"TTA accuracy:      {tta_accuracy*100:.2f}%")
print(f"Improvement:       {(tta_accuracy - standard_accuracy)*100:+.2f}%")

## 11. Detailed Evaluation with TTA

In [None]:
# Prediction distribution
print("\n" + "="*60)
print("PREDICTION DISTRIBUTION (TTA)")
print("="*60)

pred_counts = {name: 0 for name in class_names}
for p in y_pred_tta:
    pred_counts[class_names[p]] += 1

collapse = False
for name, count in pred_counts.items():
    pct = count/len(y_pred_tta)*100
    if pct > 50:
        collapse = True
    print(f"  {name}: {count} ({pct:.1f}%)")

if not collapse:
    print("\nPredictions are balanced!")

In [None]:
# Confusion matrix with TTA
cm = confusion_matrix(y_true, y_pred_tta)

plt.figure(figsize=(10, 8))
sns.heatmap(
    cm,
    annot=True,
    fmt='d',
    cmap='Blues',
    xticklabels=class_names,
    yticklabels=class_names,
    square=True
)
plt.title(f'Confusion Matrix - V8 with TTA ({tta_accuracy*100:.1f}%)', fontsize=14)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.tight_layout()
plt.show()

# Classification report
print("\n" + "="*60)
print("CLASSIFICATION REPORT (TTA)")
print("="*60)
print(classification_report(y_true, y_pred_tta, target_names=class_names, digits=4))

# Per-class accuracy
print("\nPer-class accuracy:")
for i, class_name in enumerate(class_names):
    class_mask = y_true == i
    if class_mask.sum() > 0:
        class_acc = (y_pred_tta[class_mask] == i).mean()
        status = "GOOD" if class_acc >= 0.75 else "OK" if class_acc >= 0.60 else "LOW"
        print(f"  [{status}] {class_name}: {class_acc*100:.2f}% ({class_mask.sum()} samples)")

## 12. Save Model

In [None]:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
model_name = f"drgreen_v8_finetuned_tta_{timestamp}"

print(f"Saving model: {model_name}")

# Keras
keras_path = f"{CONFIG['model_save_dir']}/{model_name}.keras"
best_model.save(keras_path)

# TFLite
converter = tf.lite.TFLiteConverter.from_keras_model(best_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
tflite_path = f"{CONFIG['model_save_dir']}/{model_name}.tflite"
with open(tflite_path, 'wb') as f:
    f.write(tflite_model)

# Metadata
metadata = {
    'model_name': model_name,
    'version': '8.0-finetuned-tta',
    'created_at': timestamp,
    'class_names': class_names,
    'performance': {
        'phase1_best_accuracy': float(phase1_best_acc),
        'phase2_best_accuracy': float(phase2_best_acc),
        'standard_accuracy': float(standard_accuracy),
        'tta_accuracy': float(tta_accuracy),
        'tta_augmentations': CONFIG['tta_augmentations']
    },
    'config': {
        'fine_tune_at': CONFIG['fine_tune_at'],
        'phase1_epochs': CONFIG['phase1_epochs'],
        'phase2_epochs': CONFIG['phase2_epochs']
    }
}

metadata_path = f"{CONFIG['model_save_dir']}/{model_name}_metadata.json"
with open(metadata_path, 'w') as f:
    json.dump(metadata, f, indent=2)

# Sizes
keras_size = os.path.getsize(keras_path) / (1024*1024)
tflite_size = os.path.getsize(tflite_path) / (1024*1024)
print(f"  Keras: {keras_size:.2f} MB")
print(f"  TFLite: {tflite_size:.2f} MB")

## 13. Final Summary

In [None]:
print("\n" + "="*60)
print("DR GREEN V8 - TRAINING COMPLETE")
print("="*60)

print(f"\nTwo-Phase Training:")
print(f"  Phase 1 (frozen): {phase1_best_acc*100:.2f}%")
print(f"  Phase 2 (fine-tuned): {phase2_best_acc*100:.2f}%")

print(f"\nTest-Time Augmentation:")
print(f"  Standard: {standard_accuracy*100:.2f}%")
print(f"  With TTA: {tta_accuracy*100:.2f}%")
print(f"  Improvement: {(tta_accuracy - standard_accuracy)*100:+.2f}%")

print(f"\nPrediction Distribution:")
for name, count in pred_counts.items():
    print(f"  {name}: {count/len(y_pred_tta)*100:.1f}%")

print(f"\nModel saved: {model_name}")

if tta_accuracy >= 0.85:
    print("\nTARGET ACHIEVED! Ready for deployment.")
elif tta_accuracy >= 0.80:
    print("\nGood performance! Consider more data for 85%+.")
else:
    print("\nMore optimization needed.")

## 14. Download

In [None]:
from google.colab import files
files.download(keras_path)
files.download(tflite_path)
files.download(metadata_path)
print("Downloads initiated!")