## CNN for Image Classification

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import pandas as pd
import numpy as np
import seaborn as sns
from PIL import Image
import matplotlib.pyplot as plt
from matplotlib.image import imread
from tqdm import tqdm
import cv2
%matplotlib inline

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, Activation, Flatten, Dense, Conv2D, MaxPooling2D, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import plot_model, to_categorical
from tensorflow.keras import backend

import torch
from torch.utils.data import Dataset, random_split, DataLoader

from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split

In [None]:
base_path = '/content/drive/MyDrive/Colab_Session/CNN/Brake_disk/'
dataset_dir = base_path + 'Casting_Dataset/'

if os.path.exists('/content/Casting_Dataset'):
    !rm -rf '/content/Casting_Dataset'

!unzip '/content/drive/MyDrive/Colab_Session/CNN/Brake_disk/Casting_Dataset.zip'

if os.path.exists(dataset_dir):
    !rm -rf '/content/drive/MyDrive/Colab_Session/CNN/Brake_disk/Casting_Dataset/'

!cp -r '/content/Casting_Dataset' '/content/drive/MyDrive/Colab_Session/CNN/Brake_disk/Casting_Dataset/'

In [None]:
train_path = dataset_dir + 'Train/'
test_path = dataset_dir + 'Test/'

In [None]:
image_gen = ImageDataGenerator(rescale=1/255,
                               zoom_range=0.1,
                               brightness_range=[0.9,1.0],
                               validation_split = 0.2)

In [None]:
def load_image_dataset(root_dir, target_size=(100, 100), grayscale=True):
    X = []
    y = []
    class_map = {}  # {'Train_Abnormal': 0, 'Train_Normal': 1}

    for idx, class_name in enumerate(sorted(os.listdir(root_dir))):
        class_path = os.path.join(root_dir, class_name)
        if not os.path.isdir(class_path):
            continue
        class_map[class_name] = idx

        for fname in os.listdir(class_path):
            if not fname.lower().endswith((".jpg", ".jpeg", ".png", ".bmp")):
                continue
            img_path = os.path.join(class_path, fname)
            img = Image.open(img_path)
            if grayscale:
                img = img.convert("L")
            else:
                img = img.convert("RGB")

            img = img.resize(target_size)
            X.append(np.array(img))
            y.append(idx)

    X = np.array(X)
    if grayscale:
        X = X[..., np.newaxis]
    y = np.array(y)

    return X, y, class_map

In [None]:
def get_tf_dataset(X, y, batch_size=32, shuffle=True):
    ds = tf.data.Dataset.from_tensor_slices((X, y))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(X))
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

class NumpyDataset(Dataset):
    def __init__(self, X, y, transform=None):
        self.X = torch.from_numpy(X).float().permute(0, 3, 1, 2)
        self.y = torch.from_numpy(y).long()
        self.transform = transform

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        img, label = self.X[idx], self.y[idx]
        if self.transform:
            img = self.transform(img)
        return img, label

In [None]:
X_train, y_train, class_map = load_image_dataset(train_path, target_size=(300, 300), grayscale=True)
X_test, y_test, _ = load_image_dataset(test_path, target_size=(300, 300), grayscale=True)

y_train = to_categorical(y_train, num_classes=2)
y_test = to_categorical(y_test, num_classes=2)

print("X shape:", X_train.shape)
print("y shape:", y_train.shape)
print("Classes:", class_map)

In [None]:
# TODO: Implement a CNN model
'''
train_data: (n_train, 300, 300, 1)
train_labels: (n_train, 2)
Classes: {'Train_Abnormal': 0, 'Train_Normal': 1}

Use the AI chatbot to help you. Build a CNN model for industrial data.
You can import whatever you need. You can also define the model without a device (automatically selected)
This time, the model can be huge, up to the limit allowed by Colab.
'''
'''Your code here'''

In [None]:
def train_model(model, X_train, y_train, epochs=10, batch_size=32,
                criterion=None, optimizer=None, loss='MSE', metrics=['accuracy'], lr=1e-3):

    tf_gpu_available = tf.config.list_physical_devices('GPU')
    torch_gpu_available = torch.cuda.is_available()

    history = {
        'train_loss': [],
        'val_loss': [],
        'train_acc': [],
        'val_acc': []
    }

    if isinstance(model, tf.keras.Model):
        device = "GPU" if tf_gpu_available else "CPU"
        print(f"[INFO] Detected TensorFlow model. Using {device}.")

        if optimizer is None:
            optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

        X_tr, X_val, y_tr, y_val = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
        )

        train_dataset = get_tf_dataset(X_tr, y_tr, batch_size=batch_size, shuffle=True)
        val_dataset = get_tf_dataset(X_val, y_val, batch_size=batch_size, shuffle=False)

        model.compile(optimizer=optimizer,
                      loss=loss,
                      metrics=metrics)

        hist = model.fit(train_dataset,
                         validation_data=val_dataset,
                         epochs=epochs,
                         batch_size=batch_size)

        history['train_loss'] = hist.history.get('loss', [])
        history['val_loss'] = hist.history.get('val_loss', [])
        history['train_acc'] = hist.history.get('accuracy', [])
        history['val_acc'] = hist.history.get('val_accuracy', [])

        return model, history

    elif isinstance(model, torch.nn.Module):
        device = torch.device("cuda" if torch_gpu_available else "cpu")
        print(f"[INFO] Detected PyTorch model. Using {device}.")
        model.to(device)

        full_dataset = NumpyDataset(X_train, y_train)
        val_size = int(len(full_dataset) * 0.2)
        train_size = len(full_dataset) - val_size
        train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        if optimizer is None:
            optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        if criterion is None:
            criterion = torch.nn.CrossEntropyLoss()

        for epoch in range(epochs):
            model.train()
            total_loss, correct, total = 0, 0, 0
            for x_batch, y_batch in tqdm(train_loader, desc=f"[Train] Epoch {epoch+1}/{epochs}"):
                x_batch, y_batch = x_batch.to(device), y_batch.to(device).float().to(device)

                y_scalar = y_batch.argmax(dim=1).long()

                optimizer.zero_grad()
                outputs = model(x_batch)
                loss = criterion(outputs, y_scalar)
                loss.backward()
                optimizer.step()

                total_loss += loss.item() * x_batch.size(0)
                preds = outputs.argmax(dim=1)
                correct += (preds == y_scalar).sum().item()
                total += y_scalar.size(0)

            train_acc = correct / total
            history['train_loss'].append(total_loss / len(train_loader))
            history['train_acc'].append(train_acc)

            model.eval()
            val_loss, val_correct, val_total = 0, 0, 0
            with torch.no_grad():
                for x_val, y_val in val_loader:
                    x_val, y_val = x_val.to(device), y_val.to(device).float().to(device)

                    y_scalar = y_val.argmax(dim=1).long()
                    outputs = model(x_val)

                    loss_v = criterion(outputs, y_scalar)
                    val_loss += loss_v.item() * x_val.size(0)

                    preds_v = outputs.argmax(dim=1)
                    val_correct += (preds_v == y_scalar).sum().item()
                    val_total += y_scalar.size(0)

            val_acc = val_correct / val_total
            history['val_loss'].append(val_loss / len(val_loader))
            history['val_acc'].append(val_acc)

            print(f"[Train] Epoch {epoch+1}: Loss={history['train_loss'][-1]:.4f}, "
                  f"Acc={train_acc:.4f} | "
                  f"[Val] Loss={history['val_loss'][-1]:.4f}, Acc={val_acc:.4f}")

        return model, history

    else:
        raise TypeError("Check the model type!")

In [None]:
model, history = train_model(model, X_train, y_train, epochs=5, batch_size=64, criterion=torch.nn.CrossEntropyLoss(), loss='binary_crossentropy', metrics=['accuracy'], lr=0.001)

In [None]:
def plot_training_history(model=None, history_dict=None):
    if isinstance(model, tf.keras.Model):
        if not hasattr(model, "history") or not hasattr(model.history, "history"):
            raise ValueError("TensorFlow model does not have .history.history")

        history_df = pd.DataFrame(model.history.history)
        history_df.index += 1
        history_df.rename(columns={
            'loss': 'train_loss',
            'accuracy': 'train_acc',
            'val_loss': 'val_loss',
            'val_accuracy': 'val_acc'
        }, inplace=True)

    elif isinstance(model, torch.nn.Module):
        if history_dict is None:
            raise ValueError("For PyTorch, provide `history_dict` containing loss/accuracy lists.")

        history_df = pd.DataFrame(history_dict)
        history_df.index += 1

    else:
        raise TypeError("Model must be either a tf.keras.Model or torch.nn.Module")

    valid_cols = [c for c in ['train_loss', 'val_loss', 'train_acc', 'val_acc'] if c in history_df.columns]
    if not valid_cols:
        raise ValueError("No valid columns found in training history.")

    plt.figure(figsize=(8, 4))
    sns.lineplot(data=history_df[valid_cols])
    plt.title("TRAINING EVALUATION", fontweight="bold", fontsize=15)
    plt.xlabel("Epochs")
    plt.ylabel("Metrics")
    plt.legend(labels=valid_cols)
    plt.tight_layout()
    plt.show()


In [None]:
plot_training_history(model, history)

In [None]:
def evaluate_with_confusion_matrix(model, X_test, y_test):

    if isinstance(model, tf.keras.Model):
        print("[INFO] Detected TensorFlow model.")
        preds_proba = model.predict(X_test, verbose=0)
        y_pred = np.argmax(preds_proba, axis=1)
        y_true = np.argmax(y_test, axis=1)

    elif isinstance(model, torch.nn.Module):
        print("[INFO] Detected PyTorch model.")
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)
        model.eval()

        if X_test.ndim == 4:
            X_test = np.transpose(X_test, (0, 3, 1, 2))

        X_tensor = torch.tensor(X_test, dtype=torch.float32)
        y_tensor = torch.tensor(y_test, dtype=torch.float32)

        test_dataset = TensorDataset(X_tensor, y_tensor)
        test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

        y_true, y_pred = [], []
        with torch.no_grad():
            for x, y in test_loader:
                x = x.to(device)
                y = y.to(device)

                output = model(x)
                pred_label = output.argmax(dim=1).item()
                y_pred.append(pred_label)

                true_label = y.argmax(dim=1).item()
                y_true.append(true_label)

        y_pred = np.array(y_pred)
        y_true = np.array(y_true)

    else:
        raise TypeError("Model must be a tf.keras.Model or torch.nn.Module")

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt='g', cmap='viridis', square=True, cbar=False, vmin=0)
    plt.xticks(ticks=np.arange(2) + 0.5, labels=["abnormal", "normal"])
    plt.yticks(ticks=np.arange(2) + 0.5, labels=["abnormal", "normal"])
    plt.xlabel("PREDICT")
    plt.ylabel("ACTUAL")
    plt.title("CONFUSION MATRIX")
    plt.tight_layout()
    plt.savefig("Confusion Matrix.png")
    plt.show()

In [None]:
evaluate_with_confusion_matrix(model, X_test, y_test)

In [None]:
def visualize_predictions(model, test_cases, test_path, image_shape=(300, 300, 1)):

    is_tf = isinstance(model, tf.keras.Model)
    is_torch = isinstance(model, torch.nn.Module)

    if is_torch:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if is_torch else None
        model.to(device)
        model.eval()

    plt.figure(figsize=(20, 8))

    for i, rel_path in enumerate(test_cases):
        full_path = os.path.join(test_path, rel_path)
        img_gray = cv2.imread(full_path, cv2.IMREAD_GRAYSCALE)
        img_gray = cv2.resize(img_gray, image_shape[:2])
        img_rescaled = img_gray.astype(np.float32) / 255.0

        if is_tf:
            x_input = img_rescaled.reshape(1, *image_shape)
            pred = model.predict(x_input, verbose=0)

            if pred.shape[-1] == 2:
                probs = tf.nn.softmax(pred, axis=1).numpy()
            else:
                probs = pred

        elif is_torch:
            x_input = torch.tensor(img_rescaled).unsqueeze(0).unsqueeze(0).float()
            x_input = x_input.to(device)

            with torch.no_grad():
                raw = model(x_input)
                probs = torch.softmax(raw, dim=1).cpu().numpy()

        else:
            raise TypeError("Unsupported model type.")

        img_rgb = cv2.imread(full_path)
        img_rgb = cv2.resize(img_rgb, image_shape[:2])
        img_rgb = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2RGB)

        label = rel_path.split("_")[2]

        pred_class = np.argmax(probs, axis=1)[0]
        prob_percent = probs[0, pred_class] * 100

        if pred_class == 0:
            predicted_label = "Abnormal"
        else:
            predicted_label = "Normal"

        plt.subplot(1, len(test_cases), i + 1)
        plt.imshow(img_rgb, cmap='gray')
        plt.title(f"{rel_path.split('/')[1]}\nActual Label: {label}", weight='bold', size=14)
        plt.xlabel(f"Predicted: {predicted_label} ({prob_percent:.2f}%)", color='g', size=16)
        plt.xticks([])
        plt.yticks([])

    plt.tight_layout()
    plt.savefig("correct_class_img.png", dpi=500, bbox_inches='tight')
    plt.show()

In [None]:
test_cases = ['Test_Normal/cast_ok_0_10.jpeg', 'Test_Abnormal/cast_def_0_26.jpeg']
visualize_predictions(model, test_cases, test_path, image_shape=(300, 300, 1))