## Import Libraries

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

from skimage.feature import local_binary_pattern
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import StandardScaler

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

import optuna
from sklearn.model_selection import cross_val_score
import joblib
import json
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

## Configuration & Parameters

In [None]:
TRAIN_PATH="data/NEU-DET/train/images"
TEST_PATH = "data/NEU-DET/validation/images"
CHECKPOINT_DIR = "checkpoints"
MODEL_DIR = "models"

LBP_PARAMS = {
    "param1": {
        'radius': 1,
        'n_points': 8,
        'method': 'default'
    },
    'param2': {
        'radius': 1,
        'n_points': 8,
        'method': 'uniform'
    }
}

SIFT_PARAMS = {
    "param1": {
        'vocab_size': 100
    },
    'param2': {
        'vocab_size': 200
    }
}

class_names = sorted([f for f in os.listdir(TRAIN_PATH) if os.path.isdir(os.path.join(TRAIN_PATH, f))])

os.makedirs(CHECKPOINT_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

## Data Loading Functions

In [None]:
def load_data(dataset_path):
    """Load dataset and return DataFrame"""
    data = []
    class_names = sorted([f for f in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, f))])
        
    for label in class_names:
        class_path = os.path.join(dataset_path, label)
        image_files = [f for f in os.listdir(class_path) if f.endswith(('.jpg', '.png', '.bmp'))]
        
        for file_name in image_files:
            file_path = os.path.join(class_path, file_name)
            img = cv2.imread(file_path)
            data.append((file_path, label, img.shape))
            
    return pd.DataFrame(data, columns=['filepath', 'label', 'shape'])

In [None]:
df_train = load_data(TRAIN_PATH)
df_train.info()
df_train['label'].value_counts().sort_index()
df_train.head()

In [None]:
df_test = load_data(TEST_PATH)
df_test.info()
df_test['label'].value_counts().sort_index()
df_test.head()

## Image Preprocessing

In [None]:
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))

def preprocess_image(image):
    """Preprocess image: grayscale, resize, CLAHE"""
    image = cv2.imread(image)
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    gray = cv2.resize(gray, (200, 200), interpolation=cv2.INTER_AREA)
    gray = clahe.apply(gray)
    return gray

## LBP Feature Extraction

In [None]:
def extract_lbp(image, radius=1, n_points=8, method='default'):
    """Extract LBP features with specified parameters"""
    gray = preprocess_image(image)
    lbp = local_binary_pattern(gray, n_points, radius, method=method)
    
    if method == 'uniform':
        n_bins = n_points + 3
    else:
        n_bins = 2 ** n_points
    
    hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins))
    hist = hist.astype("float")
    hist /= (hist.sum() + 1e-6)
    
    return hist

## SIFT BoW Feature Extraction

In [None]:
class SiftBowExtractor:
    """SIFT Bag-of-Words feature extractor"""
    def __init__(self, vocab_size=100):
        self.vocab_size = vocab_size
        self.kmeans = MiniBatchKMeans(n_clusters=self.vocab_size, 
                                      batch_size=200, 
                                      random_state=42,
                                      n_init=10)
        self.vocabulary = None

    def _get_sift_descriptors(self, image):
        sift = cv2.SIFT_create()
        gray = preprocess_image(image)
        _, descriptors = sift.detectAndCompute(gray, None)
        return descriptors

    def fit(self, image_paths):
        """Build vocabulary from training images"""
        all_descriptors = []
        
        for img_path in tqdm(image_paths, desc="Building SIFT vocabulary"):
            descriptors = self._get_sift_descriptors(img_path)
            if descriptors is not None:
                all_descriptors.append(descriptors)
            
        all_descriptors = np.vstack(all_descriptors)
        self.kmeans.fit(all_descriptors)
        self.vocabulary = self.kmeans.cluster_centers_

    def transform(self, image_paths):
        """Transform images to BoW histograms"""
        final_features = []
        
        for img_path in tqdm(image_paths, desc="Extracting SIFT features"):
            descriptors = self._get_sift_descriptors(img_path)
            hist = np.zeros(self.vocab_size, dtype=float)
            
            if descriptors is not None:
                visual_words = self.kmeans.predict(descriptors)
                hist, _ = np.histogram(visual_words, bins=np.arange(self.vocab_size + 1))
                hist = hist.astype(float)
                hist /= (hist.sum() + 1e-6)
            
            final_features.append(hist)
            
        return np.array(final_features)

## Prepare Train/Test Data

In [None]:
X_train_paths = df_train['filepath'].tolist()
y_train = df_train['label'].tolist()

X_test_paths = df_test['filepath'].tolist()
y_test = df_test['label'].tolist()

## Extract LBP Features

In [None]:
lbp_train = {}
lbp_test = {}

for param_name, params in LBP_PARAMS.items():
    X_train_lbp = np.array([extract_lbp(p, **params) for p in tqdm(X_train_paths, desc=f"LBP {param_name} train")])
    X_test_lbp = np.array([extract_lbp(p, **params) for p in tqdm(X_test_paths, desc=f"LBP {param_name} test")])
    
    lbp_train[param_name] = {'features': X_train_lbp, 'params': params}
    lbp_test[param_name] = {'features': X_test_lbp, 'params': params}

## Extract SIFT Features

In [None]:
sift_train = {}
sift_test = {}

for param_name, params in SIFT_PARAMS.items():
    extractor = SiftBowExtractor(**params)
    
    print(f"\nProcessing SIFT {param_name}...")
    extractor.fit(X_train_paths)
    
    X_train_sift = extractor.transform(X_train_paths)
    X_test_sift = extractor.transform(X_test_paths)
    
    sift_train[param_name] = {'features': X_train_sift, 'params': params, 'extractor': extractor}
    sift_test[param_name] = {'features': X_test_sift, 'params': params}
    
    extractor_path = f"{MODEL_DIR}/sift_extractor_rf_{param_name}.pkl"
    joblib.dump(extractor, extractor_path)

## Feature Scaling

In [None]:
def scale_features(train_dict, test_dict):
    """Scale features for all parameter sets"""
    scaled = {}
    
    for param_name in train_dict.keys():
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(train_dict[param_name]['features'])
        X_test_scaled = scaler.transform(test_dict[param_name]['features'])
        
        scaled[param_name] = {
            'train': X_train_scaled,
            'test': X_test_scaled,
            'scaler': scaler
        }
    
    return scaled

lbp_scaled = scale_features(lbp_train, lbp_test)
sift_scaled = scale_features(sift_train, sift_test)

## Optuna Optimization Setup

In [None]:
def objective_rf(trial, X_train, y_train):
    """Optuna objective function for Random Forest optimization"""
    n_estimators = trial.suggest_int('n_estimators', 50, 300)
    max_depth = trial.suggest_int('max_depth', 5, 30)
    min_samples_split = trial.suggest_int('min_samples_split', 2, 10)
    min_samples_leaf = trial.suggest_int('min_samples_leaf', 1, 5)
    max_features = trial.suggest_categorical('max_features', ['sqrt', 'log2'])
    
    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        min_samples_split=min_samples_split,
        min_samples_leaf=min_samples_leaf,
        max_features=max_features,
        random_state=42,
        n_jobs=-1
    )
    
    cv_scores = cross_val_score(model, X_train, y_train, cv=3, scoring='accuracy', n_jobs=-1)
    return cv_scores.mean()

def build_feature_sets():
    """Build all feature sets for optimization"""
    feature_sets = {}
    
    for param_name in LBP_PARAMS.keys():
        feat_name = f"LBP_{param_name}"
        feature_sets[feat_name] = (
            lbp_scaled[param_name]['train'], 
            lbp_scaled[param_name]['test'], 
            lbp_train[param_name]['params']
        )
    
    for param_name in SIFT_PARAMS.keys():
        feat_name = f"SIFT_{param_name}"
        feature_sets[feat_name] = (
            sift_scaled[param_name]['train'], 
            sift_scaled[param_name]['test'], 
            sift_train[param_name]['params']
        )
    
    return feature_sets

feature_sets = build_feature_sets()

## Train & Save Models

In [None]:
def train_and_save_model(feat_name, X_train, X_test, y_train, y_test, feat_params):
    """Train model with Optuna and save artifacts"""
    print(f"\nðŸ”„ {feat_name} | Shape: {X_train.shape}")
    
    study = optuna.create_study(
        direction='maximize',
        study_name=f'RF_{feat_name}',
        sampler=optuna.samplers.TPESampler(seed=42)
    )
    
    study.optimize(
        lambda trial: objective_rf(trial, X_train, y_train),
        n_trials=100,
        show_progress_bar=True
    )
    
    best_model = RandomForestClassifier(**study.best_params, random_state=42, n_jobs=-1)
    best_model.fit(X_train, y_train)
    
    y_pred = best_model.predict(X_test)
    test_accuracy = accuracy_score(y_test, y_pred)
    
    print(f"âœ“ {feat_name}: CV={study.best_value:.4f} | Test={test_accuracy:.4f}")
    
    joblib.dump(study, f"{CHECKPOINT_DIR}/study_rf_{feat_name}.pkl")
    joblib.dump(best_model, f"{MODEL_DIR}/best_rf_{feat_name}.pkl")
    
    if feat_name.startswith("LBP"):
        param_name = feat_name.replace("LBP_", "")
        scaler = lbp_scaled[param_name]['scaler']
    else:
        param_name = feat_name.replace("SIFT_", "")
        scaler = sift_scaled[param_name]['scaler']
    
    joblib.dump(scaler, f"{MODEL_DIR}/scaler_rf_{feat_name}.pkl")
    
    metadata = {
        'feature_set': feat_name,
        'feature_params': feat_params,
        'model_type': 'RandomForest',
        'best_params': study.best_params,
        'cv_accuracy': float(study.best_value),
        'test_accuracy': float(test_accuracy),
        'n_trials': len(study.trials),
        'train_shape': X_train.shape,
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }
    
    with open(f"{MODEL_DIR}/metadata_rf_{feat_name}.json", 'w') as f:
        json.dump(metadata, f, indent=2)
    
    return {
        'study': study,
        'model': best_model,
        'cv_accuracy': study.best_value,
        'test_accuracy': test_accuracy,
        'best_params': study.best_params,
        'feature_params': feat_params
    }

optuna_results = {}
for feat_name, (X_train, X_test, feat_params) in feature_sets.items():
    optuna_results[feat_name] = train_and_save_model(
        feat_name, X_train, X_test, y_train, y_test, feat_params
    )

## Classification Reports - All Models

In [None]:
for feat_name, result in optuna_results.items():
    model = result['model']
    
    if feat_name.startswith("LBP"):
        param_name = feat_name.replace("LBP_", "")
        X_test_feat = lbp_scaled[param_name]['test']
    else:
        param_name = feat_name.replace("SIFT_", "")
        X_test_feat = sift_scaled[param_name]['test']
    
    y_pred = model.predict(X_test_feat)
    
    print(f"Feature Set: {feat_name}")
    print(f"Feature Params: {result['feature_params']}")
    print(f"Best Model Params: {result['best_params']}")
    print(f"CV Accuracy: {result['cv_accuracy']:.4f} | Test Accuracy: {result['test_accuracy']:.4f}")
    print(classification_report(y_test, y_pred, target_names=class_names))