## Library

In [None]:
from sklearn.experimental import enable_halving_search_cv

In [None]:
import os
import cv2
import numpy as np
from scipy.stats import kurtosis, skew
from skimage.feature import hog, local_binary_pattern
from mahotas.features import haralick
import pickle
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, HalvingGridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
import keras_tuner as kt
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

## Dataset and Parameters

In [None]:
DATA_PATH = 'RetinalOCT_Dataset/RetinalOCT_Dataset'
classes = ['AMD','CNV','CSR','DME','DR','DRUSEN','MH','NORMAL']

HOG_ORIENTATIONS = 9
HOG_PIXELS_PER_CELL = (32, 32)
HOG_CELLS_PER_BLOCK = (2, 2)

LBP_RADIUS = 1
LBP_POINTS = 8 * LBP_RADIUS

GABOR_THETAS = np.arange(0, np.pi, np.pi/4)

USE_PCA = True
PCA_VARIANCE = 0.9

## Preprocessing

In [None]:
def load_image(split):
    images = []
    labels = []
    split_path = os.path.join(DATA_PATH, split)
    
    for i, folder in enumerate(os.listdir(split_path)):
        folder_path = os.path.join(split_path, folder)
        files = sorted(os.listdir(folder_path))
        for file in files:
            file_path = os.path.join(folder_path, file)
            img = cv2.imread(file_path)
            img = cv2.resize(img, (224, 224))
            img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)  
            images.append(img)
            labels.append(i)
    
    return np.array(images), np.array(labels)

In [None]:
def preprocess_oct(image):
    # 1. Denoising: bilateral filter lebih halus untuk struktur retina
    # denoised = cv2.bilateralFilter(image, 9, 75, 75)

    # 1. Denoising: Fast Non-Local Mean Denoising, lebih bagus untuk OCT-Scan karena memiliki struktur yang repetitif
    denoised = cv2.fastNlMeansDenoising(image, None, h=10, templateWindowSize=7, searchWindowSize=21)
    
    # 2. Kontras lokal pakai CLAHE
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    enhanced = clahe.apply(denoised)
    
    # 3. Normalisasi z-score (lebih stabil dari min-max)
    norm = (enhanced - np.mean(enhanced)) / (np.std(enhanced) + 1e-8)
    
    return enhanced, norm

## Feature Extraction

In [None]:
# function for each feature extraction method
def extract_intensity_features(image):
    return [
        np.mean(image),
        np.std(image),
        skew(image.ravel()),
        kurtosis(image.ravel()),
        np.percentile(image, 10),
        np.percentile(image, 50),
        np.percentile(image, 90)
    ]

def extract_hog_features(image):
    return hog(image,
               orientations=HOG_ORIENTATIONS,
               pixels_per_cell=HOG_PIXELS_PER_CELL,
               cells_per_block=HOG_CELLS_PER_BLOCK,
               block_norm='L2-Hys',
               visualize=False,
               feature_vector=True).tolist()

def extract_lbp_features(image):
    lbp = local_binary_pattern(image, LBP_POINTS, LBP_RADIUS, method='uniform')
    hist, _ = np.histogram(lbp.ravel(), bins=LBP_POINTS+2, range=(0, LBP_POINTS+2))
    hist = hist.astype(float) / (hist.sum() + 1e-8)
    return hist.tolist()

def extract_haralick_features(image):
    haralick_features = haralick(image).mean(axis=0)
    return haralick_features.tolist()

def extract_gabor_features(image):
    features = []
    for theta in GABOR_THETAS:
        kernel = cv2.getGaborKernel((15, 15), 4.0, theta, 10.0, 0.5, 0)
        fimg = cv2.filter2D(image, cv2.CV_32F, kernel)
        features.extend([np.mean(fimg), np.std(fimg)])
    return features

def extract_fourier_features(image):
    fft = np.fft.fft2(image)
    fft_shift = np.fft.fftshift(fft)
    magnitude = np.abs(fft_shift)
    return [np.mean(magnitude), np.std(magnitude), np.percentile(magnitude, 75), np.percentile(magnitude, 90)]

In [None]:
def extract_features(img):
    enhanced, norm = preprocess_oct(img)
    enhanced_uint8 = enhanced.astype(np.uint8)
    features = []

    features.extend(extract_hog_features(enhanced_uint8))
    features.extend(extract_lbp_features(enhanced_uint8))
    features.extend(extract_haralick_features(enhanced_uint8))
    features.extend(extract_intensity_features(norm))
    features.extend(extract_gabor_features(enhanced_uint8))
    features.extend(extract_fourier_features(enhanced_uint8))
    
    return np.array(features)

In [None]:
def extract_features_batch(images, split_name=""):
    features = []
    total = len(images)

    print(f"\nExtracting features from {split_name} set")
    for i, img in enumerate(images, 1):
        feature = extract_features(img)
        features.append(feature)

        if i % (total//10) == 0 or i == total:
            percentage = int((i/total) * 100)
            print(f"==> Progress: {percentage}% done ({i}/{total} images)\n")
    
    print(f"Features extraction completed for {split_name} set!\n")
    return np.vstack(features)

## Load, Preprocess, Extract Features, Scaling and PCA

In [None]:
train_img_raw, Y_train = load_image('train')
val_img_raw, Y_val = load_image('val')
test_img_raw, Y_test = load_image('test')
print(f"Train: {len(train_img_raw)} Images")
print(f"Validation: {len(val_img_raw)} Images")
print(f"Test: {len(test_img_raw)} Images")

In [None]:
X_train = extract_features_batch(train_img_raw, "Train")
X_val = extract_features_batch(val_img_raw, "Validation")
X_test = extract_features_batch(test_img_raw, "Test")
print(f"Raw feature count: {X_train.shape[1]} features per image")

In [None]:
# Scaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

In [None]:
# Use PCA or not
if USE_PCA:
    pca = PCA(n_components=PCA_VARIANCE, random_state=42)
    X_train_scaled = pca.fit_transform(X_train_scaled)
    X_val_scaled = pca.transform(X_val_scaled)
    X_test_scaled = pca.transform(X_test_scaled)
    print(f"Reduce features from {X_train.shape[1]} to {X_train_scaled.shape[1]} features")
    print(f"Variance explained: {pca.explained_variance_ratio_.sum():.4f}")
else:
    print("Skipping PCA")
    pca = None

## Saving Features, Scaler and PCA

In [None]:
# Save raw and preprocessed features, scaler and PCA

with open('features_raw.pkl', 'wb') as f:
    pickle.dump({
        'X_train': X_train,
        'X_val': X_val,
        'X_test': X_test,
        'Y_train': Y_train,
        'Y_val': Y_val,
        'Y_test': Y_test
    }, f)
print("Raw features saved!")

with open('features_scaled.pkl', 'wb') as f:
    pickle.dump({
        'X_train_scaled': X_train_scaled,
        'X_val_scaled': X_val_scaled,
        'X_test_scaled': X_test_scaled,
        'Y_train': Y_train,
        'Y_val': Y_val,
        'Y_test': Y_test
    }, f)
print("Scaled features saved!")

with open('scaler.pkl', 'wb') as f:
    pickle.dump(scaler, f)
print("Scaler saved!")

if USE_PCA:
    with open('pca.pkl', 'wb') as f:
        pickle.dump(pca, f)
    print("PCA saved!")

## Hyperparameter Tuning

In [None]:
# Comment out the "Load, Preprocess, Extract Features, Scaling and PCA" part 
# and load this file if you dont want to extract the feature again

with open('features_scaled.pkl', 'rb') as f:
    data = pickle.load(f)

# Unpack the dictionary
X_train_scaled = data['X_train_scaled']
X_val_scaled   = data['X_val_scaled']
X_test_scaled  = data['X_test_scaled']

Y_train = data['Y_train']
Y_val   = data['Y_val']
Y_test  = data['Y_test']

print("Loaded successfully!")

## SVM

In [None]:
# SVM

svm = SVC(class_weight='balanced', random_state=42)

param_grid_svm = {
    'C': [0.1, 1, 10, 100],
    'gamma': ['scale', 0.001, 0.01, 0.1, 1],
    'kernel': ['rbf', 'poly']
}

grid_svm = GridSearchCV(
    estimator=svm,
    param_grid=param_grid_svm,
    cv=5,               
    n_jobs=1,          
    verbose=2,
    scoring='accuracy'
)


grid_svm.fit(X_train_scaled, Y_train)
print("Best Parameter:", grid_svm.best_params_)
print("Best CV Score:", grid_svm.best_estimator_)
best_svm = grid_svm.best_estimator_

# Saving Model
with open('best_svm_model.pkl', 'wb') as f:
    pickle.dump(best_svm, f)
print("Best SVM saved!")

In [None]:
# Prediction and Metrics
y_val_pred_svm = best_svm.predict(X_val_scaled)
y_test_pred_svm = best_svm.predict(X_test_scaled)

print("\nValidation Accuracy:", accuracy_score(Y_val, y_val_pred_svm))
print("Test Accuracy:", accuracy_score(Y_test, y_test_pred_svm))

print("\nClassification Report (Validation):")
print(classification_report(Y_val, y_val_pred_svm, target_names=classes))

print("\nClassification Report (Test):")
print(classification_report(Y_test, y_test_pred_svm, target_names=classes))

print("\nConfusion Matrix (Validation):")
print(confusion_matrix(Y_val, y_val_pred_svm))

print("\nConfusion Matrix (Test):")
print(confusion_matrix(Y_test, y_test_pred_svm))

## Random Forest

In [None]:
# No PCA for RF
with open('features_raw.pkl', 'rb') as f:
    data = pickle.load(f)
    X_train_raw_rf = data['X_train']
    X_val_raw_rf = data['X_val']
    X_test_raw_rf = data['X_test']
    
# Scaled
scaler_test = StandardScaler()
X_train_scaled_rf = scaler_test.fit_transform(X_train_raw_rf)
X_val_scaled_rf = scaler_test.transform(X_val_raw_rf)
X_test_scaled_rf = scaler_test.transform(X_test_raw_rf)

In [None]:
# Random Forest
rf = RandomForestClassifier(
    class_weight='balanced',
    random_state=42,
)

param_grid_rf = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 30, 50, 70],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4],
    'max_features': ['sqrt', 'log2'],
}

grid_rf = HalvingGridSearchCV( # use HalvingGridSearchCV for faster searching
    estimator=rf,
    param_grid=param_grid_rf,
    cv=5,               
    n_jobs=1,          
    verbose=2,
    scoring='accuracy'
)

grid_rf.fit(X_train_scaled_rf, Y_train)
print("Best Parameter:", grid_rf.best_params_)
print("Best CV Score:", grid_rf.best_estimator_)
best_rf = grid_rf.best_estimator_

# Saving Model
with open('best_rf_model.pkl', 'wb') as f:
    pickle.dump(best_rf, f)
print("Best RF saved!")

In [None]:
# Prediction and Metrics
y_val_pred_rf = best_rf.predict(X_val_scaled_rf)
y_test_pred_rf = best_rf.predict(X_test_scaled_rf)

print("\nValidation Accuracy:", accuracy_score(Y_val, y_val_pred_rf))
print("Test Accuracy:", accuracy_score(Y_test, y_test_pred_rf))

print("\nClassification Report (Validation):")
print(classification_report(Y_val, y_val_pred_rf, target_names=classes))

print("\nClassification Report (Test):")
print(classification_report(Y_test, y_test_pred_rf, target_names=classes))

print("\nConfusion Matrix (Validation):")
print(confusion_matrix(Y_val, y_val_pred_rf))

print("\nConfusion Matrix (Test):")
print(confusion_matrix(Y_test, y_test_pred_rf))

## MLP

In [None]:
# Define Model Function for MLP

def build_model(hp):
    model = Sequential()

    # Hidden layer 1
    model.add(Dense(
        units=hp.Int('units1', min_value=64, max_value=512, step=64),
        activation='relu',
        input_shape=(X_train_scaled.shape[1],)
    ))
    model.add(BatchNormalization())
    model.add(Dropout(hp.Float('dropout1', 0.2, 0.5, step=0.1)))

    # Hidden layer 2
    model.add(Dense(
        units=hp.Int('units2', min_value=32, max_value=256, step=32),
        activation='relu'
    ))
    model.add(BatchNormalization())
    model.add(Dropout(hp.Float('dropout2', 0.2, 0.5, step=0.1)))

    # Hidden layer 3
    model.add(Dense(
        units=hp.Int('units3', min_value=16, max_value=128, step=16),
        activation='relu'
    ))
    model.add(BatchNormalization())
    model.add(Dropout(hp.Float('dropout3', 0.2, 0.5, step=0.1)))

    # Output layer (jumlah neuron = jumlah kelas)
    model.add(Dense(len(classes), activation='softmax'))

    # Gunakan categorical_crossentropy karena label sudah one-hot
    model.compile(
        optimizer=Adam(learning_rate=hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
# Initialize Tuner
tuner = kt.RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=20,
    executions_per_trial=1,
    directory='mlp_tuning',
    project_name='mlp_randomsearch'
)

tuner.search(
    X_train_scaled, Y_train,
    validation_data=(X_val_scaled, Y_val),
    epochs=30,
    batch_size=32,
    verbose=2
)

In [None]:
# Check for Best Hyperparameter from the search
best_hp = tuner.get_best_hyperparameters(1)[0]

print("Best Hyperparameters:")
for param, value in best_hp.values.items():
    print(f"{param}: {value}")

In [None]:
# Make the model with the best parameters

best_mlp = build_model(best_hp)
best_mlp.summary()

# Callback for early stopping, and modelcheckpoint to save the model
callbacks = [
    EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
    ModelCheckpoint('best_mlp_model.keras', monitor='val_accuracy', save_best_only=True, verbose=1)
]

# Train back the model with the best parameter
history = best_mlp.fit(
    X_train_scaled, Y_train,
    validation_data=(X_val_scaled, Y_val),
    epochs=30, #100
    batch_size=32,
    callbacks=callbacks,
    verbose=1
)

In [None]:
# Check for accuracy
val_loss, val_acc = best_mlp.evaluate(X_val_scaled, Y_val, verbose=2)
test_loss, test_acc = best_mlp.evaluate(X_test_scaled, Y_test, verbose=2)

print("Validation Accuracy:", val_acc)
print("Test Accuracy:", test_acc)

In [None]:
# Prediction and Metrics
y_val_pred_mlp = np.argmax(best_mlp.predict(X_val_scaled), axis=1)
y_test_pred_mlp = np.argmax(best_mlp.predict(X_test_scaled), axis=1)

print("\nClassification Report (Validation):")
print(classification_report(Y_val, y_val_pred_mlp, target_names=classes))

print("\nClassification Report (Test):")
print(classification_report(Y_test, y_test_pred_mlp, target_names=classes))

print("\nConfusion Matrix (Validation):")
print(confusion_matrix(Y_val, y_val_pred_mlp))

print("\nConfusion Matrix (Test):")
print(confusion_matrix(Y_test, y_test_pred_mlp))