<a href="https://colab.research.google.com/github/VaishnaviK003/Machine-Learning-for-Classification-of-Grain-Orientation-in-Materials/blob/main/MAE298_Final_new.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import files
uploaded = files.upload()   # zip your InputImages folder and upload




Saving InputImages.zip to InputImages.zip


In [None]:
!unzip -o InputImages.zip -d /content

Archive:  InputImages.zip
   creating: /content/InputImages/
  inflating: /content/InputImages/IMG_1101_180.png  
  inflating: /content/InputImages/IMG_1102_180.png  
  inflating: /content/InputImages/IMG_1103_180.png  
  inflating: /content/InputImages/IMG_1104_90.png  
  inflating: /content/InputImages/IMG_1105_180.png  
  inflating: /content/InputImages/IMG_1106_180.png  
  inflating: /content/InputImages/IMG_1107_180.png  
  inflating: /content/InputImages/IMG_1108_90.png  
  inflating: /content/InputImages/IMG_1109_90.png  
  inflating: /content/InputImages/IMG_1110_90.png  
  inflating: /content/InputImages/IMG_1111_180.png  
  inflating: /content/InputImages/IMG_1112_180.png  
  inflating: /content/InputImages/IMG_1113_90.png  
  inflating: /content/InputImages/IMG_1114_90.png  
  inflating: /content/InputImages/IMG_1115_180.png  
  inflating: /content/InputImages/IMG_1116_90.png  
  inflating: /content/InputImages/IMG_1117_90.png  
  inflating: /content/InputImages/IMG_1118_180

In [None]:
import os
import random
import shutil
import json

import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


# -------------------------------------------------
# CONFIGURATION
# -------------------------------------------------
INPUT_DIR = "/content/InputImages"   # folder with your original brick images
BASE_DIR = "/content/data_split"     # where train/val/test folders & outputs will go

TRAIN_DIR = os.path.join(BASE_DIR, "train")
VAL_DIR   = os.path.join(BASE_DIR, "val")
TEST_DIR  = os.path.join(BASE_DIR, "test")

LABELS_JSON = os.path.join(BASE_DIR, "angle_labels.json")

IMG_SIZE = (224, 224)
BATCH_SIZE = 8
EPOCHS = 10
SEED = 42

# 2 classes: 0 = vertical (0°), 1 = horizontal (≈90°)
NUM_CLASSES = 2
ANGLE_THRESHOLD = 15.0  # degrees from vertical: |angle| <= 15 -> vertical, else horizontal

CLASSIFIED_DIR = os.path.join(BASE_DIR, "classified_test_images")


# -------------------------------------------------
# UTILITY: create / clean directories
# -------------------------------------------------
def prepare_split_dirs():
    if os.path.exists(BASE_DIR):
        shutil.rmtree(BASE_DIR)
    os.makedirs(TRAIN_DIR, exist_ok=True)
    os.makedirs(VAL_DIR, exist_ok=True)
    os.makedirs(TEST_DIR, exist_ok=True)


# -------------------------------------------------
# STEP 1: Split InputImages into train/val/test
# -------------------------------------------------
def split_dataset():
    image_files = [
        f for f in os.listdir(INPUT_DIR)
        if f.lower().endswith((".png", ".jpg", ".jpeg", ".tif", ".bmp"))
    ]
    if len(image_files) == 0:
        raise RuntimeError(f"No images found in {INPUT_DIR}")

    random.seed(SEED)
    random.shuffle(image_files)

    n_total = len(image_files)
    n_train = int(0.8 * n_total)
    n_val   = int(0.1 * n_total)
    n_test  = n_total - n_train - n_val  # remaining

    train_files = image_files[:n_train]
    val_files   = image_files[n_train:n_train + n_val]
    test_files  = image_files[n_train + n_val:]

    for fname in train_files:
        shutil.copy(os.path.join(INPUT_DIR, fname),
                    os.path.join(TRAIN_DIR, fname))

    for fname in val_files:
        shutil.copy(os.path.join(INPUT_DIR, fname),
                    os.path.join(VAL_DIR, fname))

    for fname in test_files:
        shutil.copy(os.path.join(INPUT_DIR, fname),
                    os.path.join(TEST_DIR, fname))

    print(f"Split {n_total} images into:")
    print(f"  Train: {len(train_files)}")
    print(f"  Val  : {len(val_files)}")
    print(f"  Test : {len(test_files)}")

# -------------------------------------------------
# STEP 1b: DATA AUGMENTATION ON DISK (rotate/scale)
# -------------------------------------------------
def augment_all_images():
    """
    For each original image in train/val/test, create a few rotated/scaled
    versions and save them alongside, so they will also be labeled and used
    as extra samples.
    """
    aug_params = [
        (0,   1.2),
        (0,   0.8),
        (10,  1.0),
        (-10, 1.0),
    ]

    for subset_dir in [TRAIN_DIR, VAL_DIR, TEST_DIR]:
        files = [f for f in os.listdir(subset_dir)
                 if f.lower().endswith((".png", ".jpg", ".jpeg", ".tif", ".bmp"))]

        for fname in files:
            if "_aug" in fname:
                continue
            img_path = os.path.join(subset_dir, fname)
            img = cv2.imread(img_path)
            if img is None:
                continue
            h, w = img.shape[:2]
            base, ext = os.path.splitext(fname)

            for idx, (angle, scale) in enumerate(aug_params):
                M = cv2.getRotationMatrix2D((w / 2, h / 2), angle, scale)
                aug = cv2.warpAffine(img, M, (w, h), borderMode=cv2.BORDER_REFLECT)
                out_name = f"{base}_aug{idx}{ext}"
                out_path = os.path.join(subset_dir, out_name)
                cv2.imwrite(out_path, aug)

    print("Disk augmentation complete (rotate/scale).")


# -------------------------------------------------
# GRAIN DETECTION FUNCTION
# -------------------------------------------------
def detect_grains_and_angles(
    img_path,
    min_area=120,
    max_area=50000,
    kernel_size=3
):
    img = cv2.imread(img_path)
    if img is None:
        raise ValueError(f"Could not read image at {img_path}")

    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # 1. Strong preprocessing
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
    gray = clahe.apply(gray)
    gray = cv2.bilateralFilter(gray, 9, 50, 50)
    show_and_save("1_grayscale", gray)

    sharpen_kernel = np.array([[0, -1, 0],
                               [-1, 5, -1],
                               [0, -1, 0]])
    sharp = cv2.filter2D(gray, -1, sharpen_kernel)
    show_and_save("2_preprocessed", sharp)

    # 2. Thresholding
    adaptive = cv2.adaptiveThreshold(
        sharp, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
        cv2.THRESH_BINARY_INV, 25, 3
    )
    _, otsu = cv2.threshold(sharp, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    thresh = cv2.bitwise_or(adaptive, otsu)
    show_and_save("3_threshold", thresh)

    # 3. Morphology
    k = np.ones((kernel_size, kernel_size), np.uint8)
    bin_clean = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, k, iterations=2)
    bin_clean = cv2.morphologyEx(bin_clean, cv2.MORPH_CLOSE, k, iterations=2)
    bin_clean = cv2.morphologyEx(bin_clean, cv2.MORPH_OPEN, k, iterations=1)
    show_and_save("4_morphology", bin_clean)

    # 4. Distance transform & watershed
    dist = cv2.distanceTransform(bin_clean, cv2.DIST_L2, 5)
    _, sure_fg = cv2.threshold(dist, 0.35 * dist.max(), 255, 0)
    sure_fg = np.uint8(sure_fg)
    show_and_save("5_sure_foreground", sure_fg)

    unknown = cv2.subtract(bin_clean, sure_fg)

    num_labels, markers = cv2.connectedComponents(sure_fg)
    markers = markers + 1
    markers[unknown == 255] = 0

    markers = cv2.watershed(cv2.cvtColor(img, cv2.COLOR_BGR2RGB), markers)
    show_and_save("6_watershed", markers.astype(np.uint8) * 10)

    grain_data = []
    output = img.copy()
    show_and_save("7_final_output", output)

    # 5. Extract contours from watershed regions
    for label in range(2, num_labels + 1):
        mask = np.uint8(markers == label) * 255
        cnts, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        if not cnts:
            continue

        cnt = cnts[0]
        area = cv2.contourArea(cnt)

        if area < min_area or area > max_area:
            continue
        if len(cnt) < 20:
            continue

        (x, y), (MA, ma), angle = cv2.fitEllipse(cnt)

        if angle > 90:
            angle = angle - 180

        grain_data.append({
            "center": (x, y),
            "angle": float(angle)
        })

        cv2.ellipse(output, ((x, y), (MA, ma), angle), (0, 255, 0), 2)
        cv2.putText(output, f"{angle:.1f}°", (int(x), int(y)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    return output, grain_data


# -------------------------------------------------
# STEP 2: dominant grain angle per brick
# -------------------------------------------------
def dominant_angle_from_grains(grain_data, bin_width=5):
    if not grain_data:
        return 0.0

    angles = np.array([g["angle"] for g in grain_data])

    angle_min, angle_max = -90, 90
    bins = np.arange(angle_min, angle_max + bin_width, bin_width)
    hist, bin_edges = np.histogram(angles, bins=bins)

    max_bin_idx = np.argmax(hist)
    bin_center = 0.5 * (bin_edges[max_bin_idx] + bin_edges[max_bin_idx + 1])
    return float(bin_center)


def label_all_images():
    """
    Running grain detection on EVERY image in train/val/test (original + aug),
    compute dominant angle, and save to LABELS_JSON.
    """
    all_labels = {}

    for subset_dir in [TRAIN_DIR, VAL_DIR, TEST_DIR]:
        for fname in os.listdir(subset_dir):
            if not fname.lower().endswith((".png", ".jpg", ".jpeg", ".tif", ".bmp")):
                continue
            img_path = os.path.join(subset_dir, fname)
            print(f"Processing {img_path} ...")
            _, grain_data = detect_grains_and_angles(img_path)
            dom_angle = dominant_angle_from_grains(grain_data)

            rel_path = os.path.join(os.path.basename(subset_dir), fname)
            all_labels[rel_path] = dom_angle

    with open(LABELS_JSON, "w") as f:
        json.dump(all_labels, f, indent=2)

    print(f"Saved angle labels to {LABELS_JSON}")


# -------------------------------------------------
# ANGLE -> CLASS (0 vertical, 1 horizontal)
# -------------------------------------------------
def angle_to_class(angle):
    if abs(angle) <= ANGLE_THRESHOLD:
        return 0
    else:
        return 1


def build_datasets():
    with open(LABELS_JSON, "r") as f:
        labels_dict = json.load(f)

    def list_paths_and_labels(subdir_name):
        img_paths = []
        img_labels = []
        folder = os.path.join(BASE_DIR, subdir_name)
        for fname in os.listdir(folder):
            if not fname.lower().endswith((".png", ".jpg", ".jpeg", ".tif", ".bmp")):
                continue
            rel_path = os.path.join(subdir_name, fname)
            if rel_path not in labels_dict:
                continue
            img_paths.append(os.path.join(folder, fname))
            img_labels.append(labels_dict[rel_path])   # raw angle
        return img_paths, img_labels

    train_paths, train_labels = list_paths_and_labels("train")
    val_paths,   val_labels   = list_paths_and_labels("val")
    test_paths,  test_labels  = list_paths_and_labels("test")

    train_labels = [angle_to_class(a) for a in train_labels]
    val_labels   = [angle_to_class(a) for a in val_labels]
    test_labels  = [angle_to_class(a) for a in test_labels]

    print(f"Train samples: {len(train_paths)}")
    print(f"Val   samples: {len(val_paths)}")
    print(f"Test  samples: {len(test_paths)}")

    def preprocess(path, label):
        img_bytes = tf.io.read_file(path)
        img = tf.io.decode_image(img_bytes, channels=3, expand_animations=False)
        img.set_shape((None, None, 3))
        img = tf.image.convert_image_dtype(img, tf.float32)
        img = tf.image.resize(img, IMG_SIZE)
        img = preprocess_input(img * 255.0)
        return img, tf.cast(label, tf.int32)

    def make_dataset(paths, labels, shuffle=False):
        ds = tf.data.Dataset.from_tensor_slices((paths, labels))
        if shuffle:
            ds = ds.shuffle(buffer_size=len(paths), seed=SEED)
        ds = ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
        ds = ds.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
        return ds

    train_ds = make_dataset(train_paths, train_labels, shuffle=True)
    val_ds   = make_dataset(val_paths,   val_labels,   shuffle=False)
    test_ds  = make_dataset(test_paths,  test_labels,  shuffle=False)

    return train_ds, val_ds, test_ds, train_paths, val_paths, test_paths


# -------------------------------------------------
# STEP 4: Build and train ResNet50 (2-class classification)
# -------------------------------------------------
def build_resnet_model():
    data_augmentation = tf.keras.Sequential([
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(0.1),
        layers.RandomZoom(0.1),
        layers.RandomContrast(0.1),
    ])

    base_model = ResNet50(
        weights="imagenet",
        include_top=False,
        input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3)
    )
    base_model.trainable = False

    inputs = layers.Input(shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
    x = data_augmentation(inputs)
    x = base_model(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(NUM_CLASSES, activation="softmax")(x)

    model = models.Model(inputs, outputs)
    model.compile(
        optimizer=tf.keras.optimizers.Adam(1e-4),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )
    return model


def plot_history(history_dict):
    # Loss
    plt.figure(figsize=(8, 4))
    plt.plot(history_dict["loss"], label="Train Loss")
    plt.plot(history_dict["val_loss"], label="Val Loss")
    plt.title("Loss vs Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(BASE_DIR, "loss_curve.png"))
    plt.close()

    # Accuracy
    plt.figure(figsize=(8, 4))
    plt.plot(history_dict["accuracy"], label="Train Accuracy")
    plt.plot(history_dict["val_accuracy"], label="Val Accuracy")
    plt.title("Accuracy vs Epochs")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(BASE_DIR, "accuracy_curve.png"))
    plt.close()


def save_classified_images(test_paths, y_true, y_pred):
    """
    Save test images with overlayed true/pred labels into CLASSIFIED_DIR.
    """
    os.makedirs(CLASSIFIED_DIR, exist_ok=True)
    class_name = {0: "vertical_0deg", 1: "horizontal_90deg"}

    for path, t, p in zip(test_paths, y_true, y_pred):
        img = cv2.imread(path)
        if img is None:
            continue
        h, w = img.shape[:2]

        text = f"True: {class_name[t]}, Pred: {class_name[p]}"
        color = (0, 255, 0) if t == p else (0, 0, 255)

        # Draw a filled rectangle for text background
        cv2.rectangle(img, (10, 10), (w - 10, 70), (0, 0, 0), -1)
        cv2.putText(img, text, (20, 55),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2)

        sub_dir = os.path.join(CLASSIFIED_DIR, f"class_{p}")
        os.makedirs(sub_dir, exist_ok=True)
        out_path = os.path.join(sub_dir, os.path.basename(path))
        cv2.imwrite(out_path, img)

    print(f"Saved classified test images to {CLASSIFIED_DIR}")


def train_and_evaluate():
    train_ds, val_ds, test_ds, train_paths, val_paths, test_paths = build_datasets()
    model = build_resnet_model()
    model.summary()

    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=EPOCHS
    )

    hist_path = os.path.join(BASE_DIR, "training_history.json")
    with open(hist_path, "w") as f:
        json.dump(history.history, f, indent=2)

    plot_history(history.history)

    print("\nEvaluating on test set:")
    test_loss, test_acc = model.evaluate(test_ds)
    print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

    model_path = os.path.join(BASE_DIR, "resnet50_brick_orientation_model.keras")
    model.save(model_path)
    print("Saved model to", model_path)

    # Predict on test set
    y_true = []
    y_pred = []
    for batch_imgs, batch_labels in test_ds:
        preds = model.predict(batch_imgs)
        y_true.extend(batch_labels.numpy().tolist())
        y_pred.extend(np.argmax(preds, axis=1).tolist())

    y_true = np.array(y_true, dtype=int)
    y_pred = np.array(y_pred, dtype=int)

    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])

    fig, ax = plt.subplots(figsize=(5, 5))
    disp = ConfusionMatrixDisplay(
        confusion_matrix=cm,
        display_labels=["vertical", "horizontal"]
    )
    disp.plot(cmap="Blues", colorbar=False, ax=ax)
    ax.set_title("Confusion Matrix (vertical vs horizontal)")
    plt.tight_layout()
    plt.savefig(os.path.join(BASE_DIR, "confusion_matrix_orientation.png"))
    plt.close()

    save_classified_images(test_paths, y_true, y_pred)


# -------------------------------------------------
# MAIN PIPELINE
# -------------------------------------------------
if __name__ == "__main__":
    prepare_split_dirs()
    split_dataset()
    augment_all_images()   # rotate/scale, then label everything
    label_all_images()
    train_and_evaluate()


Split 66 images into:
  Train: 52
  Val  : 6
  Test : 8
Disk augmentation complete (rotate/scale).
Processing /content/data_split/train/IMG_1112_180_aug0.png ...
Processing /content/data_split/train/IMG_1110_90_aug1.png ...
Processing /content/data_split/train/IMG_1101_180_aug1.png ...
Processing /content/data_split/train/IMG_1134_90.png ...
Processing /content/data_split/train/IMG_1117_90_aug1.png ...
Processing /content/data_split/train/IMG_1131_180_aug1.png ...
Processing /content/data_split/train/IMG_1149_180_aug0.png ...
Processing /content/data_split/train/IMG_1143_90_aug0.png ...
Processing /content/data_split/train/IMG_1158_90_aug0.png ...
Processing /content/data_split/train/IMG_1150_180_aug0.png ...
Processing /content/data_split/train/IMG_1155_90.png ...
Processing /content/data_split/train/IMG_1124_180_aug0.png ...
Processing /content/data_split/train/IMG_1125_90_aug3.png ...
Processing /content/data_split/train/IMG_1114_90_aug1.png ...
Processing /content/data_split/train/

Epoch 1/10
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m116s[0m 3s/step - accuracy: 0.8563 - loss: 0.4142 - val_accuracy: 1.0000 - val_loss: 0.1724
Epoch 2/10
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m128s[0m 3s/step - accuracy: 0.8584 - loss: 0.3578 - val_accuracy: 1.0000 - val_loss: 0.1696
Epoch 3/10
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m102s[0m 3s/step - accuracy: 0.9196 - loss: 0.2305 - val_accuracy: 1.0000 - val_loss: 0.1665
Epoch 4/10
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m128s[0m 3s/step - accuracy: 0.9546 - loss: 0.1661 - val_accuracy: 0.9667 - val_loss: 0.1677
Epoch 5/10
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m140s[0m 3s/step - accuracy: 0.9181 - loss: 0.2621 - val_accuracy: 1.0000 - val_loss: 0.1506
Epoch 6/10
[1m33/33[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m143s[0m 3s/step - accuracy: 0.9542 - loss: 0.1771 - val_accuracy: 1.0000 - val_loss: 0.1190
Epoch 7/10
[1m33/33[0m [32m━━━━



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 8s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 3s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step
Saved classified test images to /content/data_split/classified_test_images


In [None]:
from google.colab import drive
drive.mount('/content/drive')