In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.utils import to_categorical
import os

In [None]:
df = pd.read_csv('/content/drive/MyDrive/mini_proj_data/processed_data.csv')
df

In [None]:
df.columns

# 1. With Processed Images 

In [None]:
X = df['clean_path']   # paths to images

In [None]:
import cv2
import numpy as np

def load_and_preprocess_image(path, target_size=(224,224)):
    img = cv2.imread(path)                      # read image
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  
    img = cv2.resize(img, target_size)
    img = img / 255.0                           # normalize to [0,1]
    return img.astype('float32')


X_images=np.array([load_and_preprocess_image(p) for p in X])

# --- Standardization step (per-channel) ---
mean = np.mean(X_images, axis=(0,1,2), keepdims=True)
std  = np.std(X_images, axis=(0,1,2), keepdims=True)

X_images = (X_images - mean) / (std + 1e-7)


In [None]:
X_images

In [None]:
from tensorflow.keras import Model, regularizers
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, Flatten, Dropout, Input
import tensorflow as tf

# ---------------- CNN1 ----------------
def build_cnn1(input_shape):
    inputs = Input(shape=input_shape)

    x = Conv2D(32, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(inputs)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(64, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(128, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(256, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = MaxPooling2D((2,2))(x)
    
    x = Flatten()(x)
    x = Dropout(0.5)(x)   # still useful to avoid overfitting
    return Model(inputs, x, name="CNN1")

# ---------------- CNN2 ----------------
def build_cnn2(input_shape):
    inputs = Input(shape=input_shape)

    x = Conv2D(256, (7,7), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(inputs)
    x = AveragePooling2D((2,2))(x)

    x = Conv2D(128, (5,5), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = AveragePooling2D((2,2))(x)

    x = Conv2D(96, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = AveragePooling2D((2,2))(x)

    x = Conv2D(96, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = AveragePooling2D((2,2))(x)

    x = Flatten()(x)
    x = Dropout(0.5)(x)
    return Model(inputs, x, name="CNN2")

# ---------------- CNN3 ----------------
def build_cnn3(input_shape):
    inputs = Input(shape=input_shape)

    x = Conv2D(32, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(inputs)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(96, (5,5), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(128, (5, 5), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = MaxPooling2D((2,2))(x)

    x = Conv2D(256, (7, 7), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = MaxPooling2D((2,2))(x)
    
    x = Flatten()(x)
    x = Dropout(0.5)(x)   # still useful to avoid overfitting
    return Model(inputs, x, name="CNN1")


# ---------------- CNN4 ----------------
def build_cnn4(input_shape):
    inputs = Input(shape=input_shape)

    x = Conv2D(32, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(inputs)
    x = AveragePooling2D((3,3))(x)

    x = Conv2D(32, (3,3), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = AveragePooling2D((5,5))(x)

    x = Conv2D(64, (5,5), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = AveragePooling2D((3,3))(x)

    x = Conv2D(128, (5,5), activation='relu', padding="same",
               kernel_regularizer=regularizers.l2(1e-4))(x)
    x = AveragePooling2D((3,3))(x)

    x = Flatten()(x)
    x = Dropout(0.5)(x)
    return Model(inputs, x, name="CNN2")


In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.1,
    horizontal_flip=True,
    fill_mode='nearest'
)
# datagen.fit(X_train_images)


In [None]:
input_shape = (224, 224, 3)
CNN1 = build_cnn1(input_shape)
CNN2 = build_cnn2(input_shape)
CNN3 = build_cnn3(input_shape)
CNN4 = build_cnn4(input_shape)
print("---- CNN1 Summary ----")
CNN1.summary()

print("\n---- CNN2 Summary ----")
CNN2.summary()

print("\n---- CNN3 Summary ----")
CNN3.summary()

print("\n---- CNN4 Summary ----")
CNN4.summary()
# Use augmented data for feature extraction
# train_generator = datagen.flow(X_train_images, y_train, batch_size=32, shuffle=False)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

In [None]:
features_cnn1 = CNN1.predict(X_images, verbose=1)
features_cnn2 = CNN2.predict(X_images, verbose=1)
features_cnn3 = CNN3.predict(X_images, verbose=1)
features_cnn4 = CNN4.predict(X_images, verbose=1)

In [None]:
from sklearn.decomposition import PCA

pca = PCA(n_components=50)
all_features = np.concatenate([features_cnn1, features_cnn2], axis=1)
features_pca = pca.fit_transform(all_features)

# later you can split back if needed


In [None]:
from sklearn.decomposition import FactorAnalysis

# Concatenate features from CNN3 and CNN4
all_features_dual2 = np.concatenate([features_cnn3, features_cnn4], axis=1)

# Apply Factor Analysis to reduce dimensionality
fa = FactorAnalysis(n_components=50, random_state=42)
features_fa = fa.fit_transform(all_features_dual2)

# features_fa now contains the reduced features for dual2

In [None]:
# Step 1: Merge features from both Dual CNNs
# features_pca: reduced features from Dual-1 (PCA)
# features_fa: reduced features from Dual-2 (FA)
merged_features = np.concatenate([features_pca, features_fa], axis=1)

# Step 2: Remove duplicate features (columns)
# Convert to DataFrame for easy duplicate removal
import pandas as pd
merged_df = pd.DataFrame(merged_features)
# Remove duplicate columns
merged_df = merged_df.loc[:, ~merged_df.T.duplicated()]




# Step 1: Train + temp (val+test)
X_train, X_temp, y_train, y_temp = train_test_split(
    merged_df.values, df['label'].values, test_size=0.3, random_state=42, stratify=y
)

# Step 2: Validation + Test (split from temp)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=1/3, random_state=42, stratify=y_temp
)

print("Train:", X_train.shape, y_train.shape)
print("Val:  ", X_val.shape, y_val.shape)
print("Test: ", X_test.shape, y_test.shape)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
# ---------------- Classifier ----------------
model = Sequential([
    Dense(256, activation='relu', input_shape=(X_train.shape[1],)),
    Dropout(0.2),
    Dense(128, activation='relu'),
    Dropout(0.2),
    Dense(64, activation='relu'),
    Dropout(0.2),
    Dense(7, activation='softmax')
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.summary()
# Early stopping callback



In [None]:
history = model.fit(
    X_train, y_train,   # <-- use your numpy labels directly
    validation_data=(X_val, y_val),
    epochs=80,
    batch_size=128,
    callbacks=[early_stop], 
       # <- stops when val_acc stops improving
    verbose=1
)

In [None]:
from sklearn.metrics import classification_report  
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt 
from sklearn.metrics import roc_auc_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
def metrics():
    # Evaluate model on test data
    test_loss, test_acc = model.evaluate(X_test, y_test, verbose=1)
    print(f"✅ Test Accuracy: {test_acc:.4f}")
    print(f"✅ Test Loss: {test_loss:.4f}")
    
    y_pred_probs = model.predict(X_test)             # probabilities (N, 7)
    y_pred = np.argmax(y_pred_probs, axis=1)  
    

    print("\n📊 Classification Report:")
    print(classification_report(y_test, y_pred, digits=4)) 
    cm = confusion_matrix(y_test, y_pred)

    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                xticklabels=[f"Class {i}" for i in range(7)],
                yticklabels=[f"Class {i}" for i in range(7)])
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.show()
    # Binarize test labels (one-hot for ROC)
    y_test_bin = label_binarize(y_test, classes=np.arange(7))

    # ROC-AUC (macro average)
    roc_auc = roc_auc_score(y_test_bin, y_pred_probs, multi_class="ovr")
    print(f" ROC-AUC (macro): {roc_auc:.4f}")
    


In [None]:
def save_model(model, model_name="dual_cnn_classifier"):
    model_dir = "/content/drive/MyDrive/mini_proj_data/saved_models/"
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    model_path = os.path.join(model_dir, model_name + ".h5")
    model.save(model_path)
    print(f"Model saved to {model_path}")



# 2. Un processed images

# 3. Unprocessed images with train-test outside arch