### Imports

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.image import imread
import joblib
import random

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, Activation
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import load_model

sns.set_style("whitegrid")

print("TensorFlow version:", tf.__version__)

### Directories

In [None]:
cwd = os.getcwd()
print("Current working directory:", cwd)

In [None]:
os.chdir('/workspaces/ML_Project_Image_Recognition')
print("New working directory set.")

In [None]:
work_dir = os.getcwd()
work_dir

### Outputs

In [None]:
data_dir = 'inputs/cracks_dataset_new'
train_path = os.path.join(data_dir, 'train')
val_path = os.path.join(data_dir, 'val')
test_path = os.path.join(data_dir, 'test')

version = 'v1'
output_path = os.path.join('outputs', version)
os.makedirs(output_path, exist_ok=True)
print(f"Output folder: {output_path}")

In [None]:
image_shape = joblib.load(f'{output_path}/image_shape.pkl')
print("Image shape loaded:", image_shape)

class_indices = joblib.load(f'{output_path}/class_indices.pkl')
labels = list(class_indices.keys())
print("Class labels:", labels)

### Augmentation 
 - Augmenting only the "Cracked" images to create image diversity when training as the disparity between cracked and non-cracked images is quite large as noted in notebook 2. 

In [None]:
augmented_gen = ImageDataGenerator(
    rotation_range=15,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.1,
    horizontal_flip=True,
    vertical_flip=True,
    fill_mode='nearest',
    rescale=1./255
)

non_augmented_gen = ImageDataGenerator(rescale=1./255)

def create_combined_generator(structure_name, image_shape=(256, 256), batch_size=32):
    cracked_dir = os.path.join(train_path, structure_name, "Cracked")
    non_cracked_dir = os.path.join(train_path, structure_name, "Non-cracked")

    cracked_generator = augmented_gen.flow_from_directory(
        directory=os.path.join(train_path, structure_name),
        classes=["Cracked"],
        target_size=image_shape[:2],
        class_mode='binary',
        batch_size=batch_size,
        shuffle=True
    )

    non_cracked_generator = non_augmented_gen.flow_from_directory(
        directory=os.path.join(train_path, structure_name),
        classes=["Non-cracked"],
        target_size=image_shape[:2],
        class_mode='binary',
        batch_size=batch_size,
        shuffle=True
    )

    def combined_gen():
        while True:
            cracked_imgs, cracked_labels = cracked_generator.next()
            non_imgs, non_labels = non_cracked_generator.next()

            X = np.concatenate((cracked_imgs, non_imgs), axis=0)
            y = np.concatenate((cracked_labels, non_labels), axis=0)

            indices = np.arange(len(X))
            np.random.shuffle(indices)
            yield X[indices], y[indices]

    return combined_gen(), len(cracked_generator.filenames) + len(non_cracked_generator.filenames)

In [None]:
walls_train_gen, walls_total_images = create_combined_generator("Walls")
decks_train_gen, decks_total_images = create_combined_generator("Decks")
pavements_train_gen, pavements_total_images = create_combined_generator("Pavements")

### Rescaling
 - Avoiding augmentation on the Test and Validation sets and opting instead for just rescaling, this is to reflect real world images when testing the models. 

In [None]:
rescale_gen = ImageDataGenerator(rescale=1./255)

def create_eval_generator(base_path, structure_name, batch_size=32):
    return rescale_gen.flow_from_directory(
        directory=os.path.join(base_path, structure_name),
        target_size=image_shape[:2],
        class_mode='binary',
        batch_size=batch_size,
        shuffle=False
    )

In [None]:
walls_val_gen = create_eval_generator(val_path, "Walls")
decks_val_gen = create_eval_generator(val_path, "Decks")
pavements_val_gen = create_eval_generator(val_path, "Pavements")

walls_test_gen = create_eval_generator(test_path, "Walls")
decks_test_gen = create_eval_generator(test_path, "Decks")
pavements_test_gen = create_eval_generator(test_path, "Pavements")

t

In [None]:
def save_augmented_montage(generator, structure_name, output_path, n_images=9):
    import matplotlib.pyplot as plt
    from math import ceil

    imgs, labels = next(generator)
    imgs = imgs[:n_images]

    n_cols = 3
    n_rows = ceil(n_images / n_cols)
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(10, 10))
    axes = axes.flatten()

    for i, img in enumerate(imgs):
        axes[i].imshow(img)
        axes[i].axis('off')
        axes[i].set_title(f"Augmented")

    for i in range(n_images, len(axes)):
        axes[i].axis('off')

    plt.tight_layout()
    filename = os.path.join(output_path, f"augmented_samples_{structure_name}.png")
    plt.savefig(filename, dpi=150, bbox_inches='tight')
    plt.close()
    print(f"Saved: {filename}")

In [None]:
save_augmented_montage(
    augmented_gen.flow_from_directory(
        directory=os.path.join(train_path, "Walls"),
        classes=["Cracked"],
        target_size=image_shape[:2],
        class_mode='binary',
        batch_size=9,
        shuffle=True
    ),
    "Walls", output_path
)

save_augmented_montage(
    augmented_gen.flow_from_directory(
        directory=os.path.join(train_path, "Decks"),
        classes=["Cracked"],
        target_size=image_shape[:2],
        class_mode='binary',
        batch_size=9,
        shuffle=True
    ),
    "Decks", output_path
)

save_augmented_montage(
    augmented_gen.flow_from_directory(
        directory=os.path.join(train_path, "Pavements"),
        classes=["Cracked"],
        target_size=image_shape[:2],
        class_mode='binary',
        batch_size=9,
        shuffle=True
    ),
    "Pavements", output_path
)

In [None]:
from IPython.display import Image, display

montage_files = [
    os.path.join(output_path, "augmented_samples_Walls.png"),
    os.path.join(output_path, "augmented_samples_Decks.png"),
    os.path.join(output_path, "augmented_samples_Pavements.png")
]

for file in montage_files:
    if os.path.exists(file):
        print(f"Displaying: {os.path.basename(file)}")
        display(Image(filename=file))
    else:
        print(f"File not found: {file}")

### Creating the Models

In [None]:
def create_tf_model_1(input_shape):
    model = Sequential()

    model.add(Conv2D(32, (3, 3), activation='relu', input_shape=input_shape))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(128, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))

    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    return model

In [None]:
tf_model_1 = create_tf_model_1(input_shape=image_shape)

early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

history_tf_model_1 = tf_model_1.fit(
    walls_train_gen,
    steps_per_epoch=walls_total_images // 16,
    validation_data=walls_val_gen,
    validation_steps=walls_val_gen.samples // 16,
    epochs=25,
    callbacks=[early_stop],
    verbose=1
)

In [None]:
tf_model_1.save(os.path.join(output_path, "tf_model_1_walls.h5"))
joblib.dump(history_tf_model_1.history, os.path.join(output_path, "history_tf_model_1_walls.pkl"))