In [None]:
"""
Ensemble Bagging SVM with Optuna Hyperparameter Tuning
Feature Extraction: HSV Histogram ONLY
Dataset: 54 Indonesian Coffee Bean Varieties
Platform: Kaggle
"""
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')
import cv2
from PIL import Image
from sklearn.svm import SVC
from sklearn.ensemble import BaggingClassifier
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import (
    accuracy_score, f1_score, confusion_matrix, 
    classification_report, top_k_accuracy_score
)
from sklearn.model_selection import cross_val_score
import optuna
from optuna.samplers import TPESampler
import joblib
from tqdm.auto import tqdm
import time
import pickle
# Set random seed
np.random.seed(42)

In [None]:
#  CONFIGURATION 
class Config:
    # Data paths
    DATA_DIR = '/kaggle/input/coffee-bean-dataset'
    TRAIN_DIR = os.path.join(DATA_DIR, 'train')
    VAL_DIR = os.path.join(DATA_DIR, 'valid')
    TEST_DIR = os.path.join(DATA_DIR, 'test')
    
    # Model configuration
    MODEL_NAME = 'Bagging_SVM_HSV_Only'
    NUM_CLASSES = 54
    IMG_SIZE = 224
    
    # Feature extraction configuration
    # HSV Histogram bins
    HSV_BINS = (8, 8, 8)  # H, S, V bins
    
    # Optuna configuration
    N_TRIALS = 50  # Number of Optuna trials
    OPTUNA_TIMEOUT = 3600  # 1 hour timeout
    
    # Output
    SAVE_DIR = './output_svm_hsv'
    MODEL_PATH = os.path.join(SAVE_DIR, 'best_bagging_svm_hsv_model.pkl')
    SCALER_PATH = os.path.join(SAVE_DIR, 'scaler.pkl')
    LABEL_ENCODER_PATH = os.path.join(SAVE_DIR, 'label_encoder.pkl')
# Create output directory
os.makedirs(Config.SAVE_DIR, exist_ok=True)

In [None]:
#  FEATURE EXTRACTION 
class FeatureExtractor:
    """Extract HSV features ONLY from coffee bean images"""
    
    def __init__(self, img_size=224, hsv_bins=(8, 8, 8)):
        self.img_size = img_size
        self.hsv_bins = hsv_bins
    
    def extract_hsv_histogram(self, image):
        """Extract HSV color histogram features"""
        # Convert to HSV
        hsv = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
        
        # Calculate histogram for each channel
        hist_h = cv2.calcHist([hsv], [0], None, [self.hsv_bins[0]], [0, 180])
        hist_s = cv2.calcHist([hsv], [1], None, [self.hsv_bins[1]], [0, 256])
        hist_v = cv2.calcHist([hsv], [2], None, [self.hsv_bins[2]], [0, 256])
        
        # Normalize histograms
        hist_h = cv2.normalize(hist_h, hist_h).flatten()
        hist_s = cv2.normalize(hist_s, hist_s).flatten()
        hist_v = cv2.normalize(hist_v, hist_v).flatten()
        
        # Concatenate all histograms
        hsv_features = np.concatenate([hist_h, hist_s, hist_v])
        
        return hsv_features
    
    def extract_features(self, image_path):
        """Extract HSV features from an image"""
        # Load and resize image
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (self.img_size, self.img_size))
        
        # Extract HSV features ONLY
        hsv_features = self.extract_hsv_histogram(image)
        
        return hsv_features
    
    def extract_from_directory(self, directory):
        """Extract features from all images in directory"""
        features = []
        labels = []
        class_names = sorted(os.listdir(directory))
        
        print(f"\nExtracting features from {directory}...")
        for class_name in tqdm(class_names, desc='Classes'):
            class_dir = os.path.join(directory, class_name)
            if not os.path.isdir(class_dir):
                continue
            
            for img_name in os.listdir(class_dir):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    img_path = os.path.join(class_dir, img_name)
                    try:
                        feature = self.extract_features(img_path)
                        features.append(feature)
                        labels.append(class_name)
                    except Exception as e:
                        print(f"Error processing {img_path}: {e}")
        
        return np.array(features), np.array(labels), class_names

In [None]:
#  OPTUNA OPTIMIZATION 
class OptunaOptimizer:
    """Optimize Bagging SVM hyperparameters using Optuna"""
    
    def __init__(self, X_train, y_train, X_val, y_val):
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.best_params = None
        self.best_score = 0.0
    
    def objective(self, trial):
        """Optuna objective function"""
        # SVM hyperparameters
        C = trial.suggest_float('C', 0.1, 100.0, log=True)
        kernel = trial.suggest_categorical('kernel', ['rbf', 'poly', 'sigmoid'])
        
        if kernel == 'rbf':
            gamma = trial.suggest_categorical('gamma', ['scale', 'auto'])
        elif kernel == 'poly':
            gamma = trial.suggest_categorical('gamma', ['scale', 'auto'])
            degree = trial.suggest_int('degree', 2, 5)
        else:  # sigmoid
            gamma = trial.suggest_categorical('gamma', ['scale', 'auto'])
        
        # Bagging hyperparameters
        n_estimators = trial.suggest_int('n_estimators', 5, 50)
        max_samples = trial.suggest_float('max_samples', 0.5, 1.0)
        max_features = trial.suggest_float('max_features', 0.5, 1.0)
        
        # Create base SVM
        svm_params = {
            'C': C,
            'kernel': kernel,
            'gamma': gamma,
            'probability': True,
            'random_state': 42,
            'cache_size': 1000
        }
        
        if kernel == 'poly':
            svm_params['degree'] = degree
        
        base_svm = SVC(**svm_params)
        
        # Create Bagging ensemble
        bagging = BaggingClassifier(
            estimator=base_svm,
            n_estimators=n_estimators,
            max_samples=max_samples,
            max_features=max_features,
            random_state=42,
            n_jobs=-1
        )
        
        # Train on training set
        bagging.fit(self.X_train, self.y_train)
        
        # Evaluate on validation set
        y_pred = bagging.predict(self.X_val)
        accuracy = accuracy_score(self.y_val, y_pred)
        
        return accuracy
    
    def optimize(self, n_trials=50, timeout=3600):
        """Run Optuna optimization"""
        print("\n" + "="*60)
        print("Starting Optuna Hyperparameter Optimization")
        print("="*60)
        
        study = optuna.create_study(
            direction='maximize',
            sampler=TPESampler(seed=42)
        )
        
        study.optimize(
            self.objective,
            n_trials=n_trials,
            timeout=timeout,
            show_progress_bar=True
        )
        
        self.best_params = study.best_params
        self.best_score = study.best_value
        
        print("\n" + "="*60)
        print("Optimization Completed!")
        print("="*60)
        print(f"Best Validation Accuracy: {self.best_score:.4f}")
        print(f"Best Parameters:")
        for key, value in self.best_params.items():
            print(f"  {key}: {value}")
        print("="*60)
        
        return self.best_params, study

In [None]:
#  MODEL TRAINING 
def train_best_model(X_train, y_train, best_params):
    """Train final model with best parameters"""
    print("\n" + "="*60)
    print("Training Final Model with Best Parameters")
    print("="*60)
    
    # Extract parameters
    C = best_params['C']
    kernel = best_params['kernel']
    gamma = best_params['gamma']
    n_estimators = best_params['n_estimators']
    max_samples = best_params['max_samples']
    max_features = best_params['max_features']
    
    # Create base SVM
    svm_params = {
        'C': C,
        'kernel': kernel,
        'gamma': gamma,
        'probability': True,
        'random_state': 42,
        'cache_size': 1000
    }
    
    if kernel == 'poly' and 'degree' in best_params:
        svm_params['degree'] = best_params['degree']
    
    base_svm = SVC(**svm_params)
    
    # Create Bagging ensemble
    model = BaggingClassifier(
        estimator=base_svm,
        n_estimators=n_estimators,
        max_samples=max_samples,
        max_features=max_features,
        random_state=42,
        n_jobs=-1,
        verbose=1
    )
    
    # Train
    print("\nTraining model...")
    start_time = time.time()
    model.fit(X_train, y_train)
    training_time = time.time() - start_time
    
    print(f"Training completed in {training_time:.2f} seconds")
    
    return model

In [None]:
#  EVALUATION 
def evaluate_model(model, X_test, y_test, label_encoder, class_names):
    """Comprehensive model evaluation"""
    print("\n" + "="*60)
    print("Evaluating Model on Test Set")
    print("="*60)
    
    # Predictions
    print("Making predictions...")
    y_pred = model.predict(X_test)
    y_proba = model.predict_proba(X_test)
    
    # Calculate metrics
    top1_acc = 100. * accuracy_score(y_test, y_pred)
    
    # Top-5 Accuracy
    top5_acc = 100. * top_k_accuracy_score(
        y_test, y_proba, k=5, 
        labels=range(len(label_encoder.classes_))
    )
    
    macro_f1 = f1_score(y_test, y_pred, average='macro')
    
    print(f"\n{'='*60}")
    print(f"TEST RESULTS - BAGGING SVM (HSV ONLY):")
    print(f"{'='*60}")
    print(f"Top-1 Accuracy: {top1_acc:.2f}%")
    print(f"Top-5 Accuracy: {top5_acc:.2f}%")
    print(f"Macro F1-Score: {macro_f1:.4f}")
    print(f"{'='*60}\n")
    
    results = {
        'predictions': y_pred,
        'labels': y_test,
        'probabilities': y_proba,
        'top1_accuracy': top1_acc,
        'top5_accuracy': top5_acc,
        'macro_f1': macro_f1
    }
    
    return results

In [None]:
#  VISUALIZATION 
def plot_confusion_matrix(y_test, y_pred, class_names):
    """Plot confusion matrix"""
    # Ensure we only use classes that exist in the test set
    unique_labels = np.unique(y_test)
    cm = confusion_matrix(y_test, y_pred, labels=unique_labels)
    actual_class_names = [class_names[i] for i in unique_labels]
    
    plt.figure(figsize=(20, 18))
    sns.heatmap(cm, annot=False, fmt='d', cmap='Blues',
                xticklabels=actual_class_names, yticklabels=actual_class_names,
                cbar_kws={'label': 'Count'})
    plt.xlabel('Predicted Label', fontsize=12)
    plt.ylabel('True Label', fontsize=12)
    plt.title('Confusion Matrix - Bagging SVM (HSV Only)', 
              fontsize=14, fontweight='bold')
    plt.xticks(rotation=90, fontsize=8)
    plt.yticks(rotation=0, fontsize=8)
    plt.tight_layout()
    plt.savefig(os.path.join(Config.SAVE_DIR, 'confusion_matrix_svm_hsv.png'), 
                dpi=300, bbox_inches='tight')
    plt.show()
    
    return cm
def analyze_per_class_performance(y_test, y_pred, class_names):
    """Analyze per-class performance"""
    # Ensure we only use classes that exist in the test set
    unique_labels = np.unique(y_test)
    cm = confusion_matrix(y_test, y_pred, labels=unique_labels)
    
    # Per-class accuracy
    per_class_acc = cm.diagonal() / cm.sum(axis=1)
    
    # Get corresponding class names
    actual_class_names = [class_names[i] for i in unique_labels]
    
    # Create DataFrame
    class_analysis = pd.DataFrame({
        'Class': actual_class_names,
        'Accuracy': per_class_acc * 100,
        'Samples': cm.sum(axis=1)
    })
    class_analysis = class_analysis.sort_values('Accuracy')
    
    print("\nPer-Class Performance:")
    print("="*60)
    print(class_analysis.to_string(index=False))
    print("="*60)
    
    # Top 5 best and worst
    print("\nðŸ“Š TOP 5 BEST PERFORMING CLASSES:")
    print(class_analysis.tail(5).to_string(index=False))
    
    print("\nðŸ“‰ TOP 5 WORST PERFORMING CLASSES:")
    print(class_analysis.head(5).to_string(index=False))
    
    # Save to CSV
    class_analysis.to_csv(
        os.path.join(Config.SAVE_DIR, 'per_class_analysis_svm_hsv.csv'), 
        index=False
    )
    
    return class_analysis
def plot_optuna_study(study):
    """Plot Optuna optimization history"""
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # Optimization history
    trials = study.trials
    values = [t.value for t in trials]
    
    axes[0].plot(values, marker='o', linestyle='-', alpha=0.7)
    axes[0].axhline(y=study.best_value, color='r', 
                    linestyle='--', label=f'Best: {study.best_value:.4f}')
    axes[0].set_xlabel('Trial')
    axes[0].set_ylabel('Validation Accuracy')
    axes[0].set_title('Optuna Optimization History')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    # Parameter importance (if available)
    try:
        importance = optuna.importance.get_param_importances(study)
        params = list(importance.keys())
        importances = list(importance.values())
        
        axes[1].barh(params, importances)
        axes[1].set_xlabel('Importance')
        axes[1].set_title('Hyperparameter Importance')
        axes[1].grid(True, alpha=0.3)
    except Exception as e:
        axes[1].text(0.5, 0.5, 'Parameter importance\nnot available', 
                    ha='center', va='center')
    
    plt.tight_layout()
    plt.savefig(os.path.join(Config.SAVE_DIR, 'optuna_study_hsv.png'), 
                dpi=300, bbox_inches='tight')
    plt.show()
#  MAIN EXECUTION 
def main():
    print("="*60)
    print("Bagging SVM with Optuna - Coffee Bean Classification")
    print("Feature Extraction: HSV ONLY")
    print("54 Indonesian Coffee Varieties")
    print("="*60)
    
    # Initialize feature extractor
    feature_extractor = FeatureExtractor(
        img_size=Config.IMG_SIZE,
        hsv_bins=Config.HSV_BINS
    )
    
    # Extract features from all datasets
    print("\n" + "="*60)
    print("Feature Extraction Phase")
    print("="*60)
    
    X_train, y_train, class_names = feature_extractor.extract_from_directory(Config.TRAIN_DIR)
    X_val, y_val, _ = feature_extractor.extract_from_directory(Config.VAL_DIR)
    X_test, y_test, _ = feature_extractor.extract_from_directory(Config.TEST_DIR)
    
    print(f"\nDataset Statistics:")
    print(f"  Train samples: {len(X_train)}")
    print(f"  Validation samples: {len(X_val)}")
    print(f"  Test samples: {len(X_test)}")
    print(f"  Number of classes: {len(class_names)}")
    print(f"  Feature dimension: {X_train.shape[1]}")
    print(f"    - HSV features: {sum(Config.HSV_BINS)}")
    
    # Encode labels
    label_encoder = LabelEncoder()
    y_train_encoded = label_encoder.fit_transform(y_train)
    y_val_encoded = label_encoder.transform(y_val)
    y_test_encoded = label_encoder.transform(y_test)
    
    # Feature scaling
    print("\nScaling features...")
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)
    
    # Save scaler and label encoder
    joblib.dump(scaler, Config.SCALER_PATH)
    joblib.dump(label_encoder, Config.LABEL_ENCODER_PATH)
    print(f"Scaler saved to {Config.SCALER_PATH}")
    print(f"Label encoder saved to {Config.LABEL_ENCODER_PATH}")
    
    # Optuna optimization
    optimizer = OptunaOptimizer(
        X_train_scaled, y_train_encoded,
        X_val_scaled, y_val_encoded
    )
    
    best_params, study = optimizer.optimize(
        n_trials=Config.N_TRIALS,
        timeout=Config.OPTUNA_TIMEOUT
    )
    
    # Plot Optuna study
    plot_optuna_study(study)
    
    # Train final model with best parameters
    model = train_best_model(X_train_scaled, y_train_encoded, best_params)
    
    # Save model
    print(f"\nSaving model to {Config.MODEL_PATH}...")
    joblib.dump(model, Config.MODEL_PATH)
    print("Model saved successfully!")
    
    # Evaluate on test set
    results = evaluate_model(
        model, X_test_scaled, y_test_encoded,
        label_encoder, class_names
    )
    
    # Visualizations
    print("\nGenerating visualizations...")
    plot_confusion_matrix(y_test_encoded, results['predictions'], class_names)
    analyze_per_class_performance(y_test_encoded, results['predictions'], class_names)
    
    # Save final results
    final_results = {
        'model': Config.MODEL_NAME,
        'num_classes': Config.NUM_CLASSES,
        'feature_dim': X_train.shape[1],
        'hsv_bins': Config.HSV_BINS,
        'best_params': best_params,
        'top1_accuracy': results['top1_accuracy'],
        'top5_accuracy': results['top5_accuracy'],
        'macro_f1': results['macro_f1'],
        'train_samples': len(X_train),
        'val_samples': len(X_val),
        'test_samples': len(X_test)
    }
    
    import json
    with open(os.path.join(Config.SAVE_DIR, 'final_results_svm_hsv.json'), 'w') as f:
        json.dump(final_results, f, indent=4)
    
    print("\n" + "="*60)
    print("âœ… Training and evaluation completed successfully!")
    print(f"Results saved to: {Config.SAVE_DIR}")
    print("="*60)
    
if __name__ == "__main__":
    main()
