In [1]:
import json
from glob import glob as gglob
from os.path import join as joiner
from os import makedirs
from skimage.transform import resize
from pathlib import Path 
from tqdm import tqdm
from PIL import Image, ImageDraw
import numpy as np


###############################
# misc
###############################

TARGET_LIST = ['Crack', 'ACrack', 'Wetspot', 'Efflorescence', 'Rust', 'Rockpocket', 'Hollowareas', 'Cavity',
               'Spalling', 'Graffiti', 'Weathering', 'Restformwork', 'ExposedRebars', 
               'Bearing', 'EJoint', 'Drainage', 'PEquipment', 'JTape', 'WConccor']

def open_json(file):
    with open(file) as f:
        d = json.load(f)
    return d

def save_dict(dct, file):
    with open(file, 'w', encoding='utf-8') as f:
        json.dump(dct, f, indent=2)



###############################
# labelme2mask
###############################

def labelme2mask(data):
    
    TARGET_LIST = ['Crack', 'ACrack', 'Wetspot', 'Efflorescence', 'Rust', 'Rockpocket', 'Hollowareas', 'Cavity',
               'Spalling', 'Graffiti', 'Weathering', 'Restformwork', 'ExposedRebars', 
               'Bearing', 'EJoint', 'Drainage', 'PEquipment', 'JTape', 'WConccor']

    if isinstance(data, str):
        with open(data, 'r') as f:
            data = json.load(f)
    assert type(data) == dict
        
    target_dict = dict(zip(TARGET_LIST, range(len(TARGET_LIST))))
    height = data["size"]["height"]
    width = data["size"]["width"]
    target_mask = np.zeros((height, width, len(TARGET_LIST)))
    for obj in data["objects"]:
        label = obj["classTitle"]
        if label in TARGET_LIST:
            # Get the target index (channel) for this label
            target_index = target_dict[label]
            
            # Create an empty mask for the current object
            target_img = Image.new('L', (width, height), 0)
            
            # Get the exterior polygon points from the "points" field
            polygon = [(x, y) for x, y in obj["points"]["exterior"]]  # Convert list to tuple
            
            # Draw the polygon on the mask
            ImageDraw.Draw(target_img).polygon(polygon, outline=1, fill=1)
            
            # Add the polygon mask to the appropriate channel in the target mask
            target_mask[:, :, target_index] += np.array(target_img)               
    return target_mask.astype(bool).astype(np.uint8)  

In [2]:
# resize images

def resize_images(source_folder, target_folder, size=(512,512)):
    """Resize all images in `source_folder` to size `size` and store it in `target_folder`"""
    image_files = sorted(gglob(joiner(source_folder, "*.jpg")))
    makedirs(target_folder, exist_ok = True)
    
    for image_filename in tqdm(image_files):
        base_filename = Path(image_filename).name
        target_name = joiner(target_folder, base_filename)
        with Image.open(image_filename) as img:
            img = img.resize(size)
            img = img.save(target_name)
    print(f"Resized images saved in {target_folder}")

In [3]:
# resize annotations

def _resize_annotation_data(data, width_factor, height_factor):
    w_index, h_index = 0, 1
    
    # Iterate through all objects
    for obj in data["objects"]:
        # If the object has an exterior polygon (points)
        if "points" in obj and "exterior" in obj["points"]:
            # Get the length of points in the "exterior" polygon
            point_len = len(obj["points"]["exterior"])
            
            # Loop through each point in the polygon
            for point_idx in range(point_len):
                the_w = obj["points"]["exterior"][point_idx][w_index]
                the_h = obj["points"]["exterior"][point_idx][h_index]

                # Apply the resizing factors
                obj["points"]["exterior"][point_idx][w_index] = the_w / width_factor
                obj["points"]["exterior"][point_idx][h_index] = the_h / height_factor
                
    return data

def resize_annotations(source_folder, target_folder, size=(512,512)):
    """Resize all labelme annotations (all polygone points and imageWiedth and imageHeight) 
        in `source_folder` to size `size` and store it in `target_folder`"""
    annotation_files = sorted(gglob(joiner(source_folder, "*.json")))
    makedirs(target_folder, exist_ok = True)

    for annotation_filename in tqdm(annotation_files):
        data = open_json(annotation_filename)
        filename = Path(annotation_filename).name
        target_filename = joiner(target_folder, filename)
        
        width_factor = data["size"]["width"] / size[0]
        height_factor = data["size"]["height"] / size[1]

        data = _resize_annotation_data(data, width_factor, height_factor)
        data["size"]["width"] = size[0]
        data["size"]["height"] = size[1]
        save_dict(data, target_filename)
    print(f"Resized annotations saved in {target_folder}")

In [4]:
# Filtering and Normalizing images

import numpy as np
import cv2
from tqdm import tqdm 
import os
from glob import glob

def filtering_normalizing(input_folder, output_folder):
    for image_path in tqdm(glob(os.path.join(input_folder, "*.jpg"))):
        # Load image
        img = cv2.imread(image_path)

        # Step 1: Noise filtering using Gaussian Blur
        denoised_img = cv2.GaussianBlur(img, (5, 5), 0)

        # Step 2: Normalize the image to range [0, 1] (min-max normalization)
        normalized_img = denoised_img / 255.0

        # Step 3: Save the processed image
        # Optional: Convert back to uint8 if needed
        output_image_path = os.path.join(output_folder, os.path.basename(image_path))
        normalized_uint8_img = np.uint8(normalized_img * 255)  # Convert back to [0, 255] range for saving
        cv2.imwrite(output_image_path, normalized_uint8_img)

    print(f"Images Processed and saved in {output_folder}")

In [5]:
import os
from glob import glob

# Paths
source_image_folder1 = "train/img"  # Original training images folder
source_image_folder2 = "test/img"  # Original testing images folder
resize_image_folder1 = "target folder/train/img"   # Resized training images output folder
resize_image_folder2 = "target folder/test/img"   # Resized testing images output folder
source_annotation_folder1 = "train/ann"  # Original train annotations (LabelMe JSONs) folder
source_annotation_folder2 = "test/ann"  # Original test annotations (LabelMe JSONs) folder
resize_annotation_folder1 = "target folder/train/ann"   # Resized train annotations output folder
resize_annotation_folder2 = "target folder/test/ann"   # Resized test annotations output folder
filtered_image_folder1 = "filtered/train" # Filtered and Normalized train images folder
filtered_image_folder2 = "filtered/test" # Filtered and Normalized test images folder
mask_output_folder1 = "mask/train"  # Folder where train masks will be saved
mask_output_folder2 = "mask/test"  # Folder where test masks will be saved

# Step 1: Resize images
resize_images(source_folder=source_image_folder1, target_folder=resize_image_folder1, size=(512, 512))
resize_images(source_folder=source_image_folder2, target_folder=resize_image_folder2, size=(512, 512))

# Step 2: Resize annotations
resize_annotations(source_folder=source_annotation_folder1, target_folder=resize_annotation_folder1, size=(512, 512))
resize_annotations(source_folder=source_annotation_folder2, target_folder=resize_annotation_folder2, size=(512, 512))

# Step 3: Filtering and Normalizing images 
os.makedirs(filtered_image_folder1, exist_ok=True)
os.makedirs(filtered_image_folder2, exist_ok=True)
filtering_normalizing(resize_image_folder1, filtered_image_folder1)
filtering_normalizing(resize_image_folder2, filtered_image_folder2)

# Step 3: Convert resized annotations to multi-channel masks
os.makedirs(mask_output_folder1, exist_ok=True)  # Ensure the mask output folder exists
os.makedirs(mask_output_folder2, exist_ok=True)

annotation_files1 = glob(os.path.join(resize_annotation_folder1, "*.json"))
annotation_files2 = glob(os.path.join(resize_annotation_folder2, "*.json"))

for annotation_file in tqdm(annotation_files1):
    # Convert each resized annotation to a multi-channel mask
    annotation_data = open_json(annotation_file)  # Load annotation
    mask = labelme2mask(annotation_data)  # Convert to mask

    # Save the mask as a NumPy array (.npy) or any other format like PNG
    mask_filename = os.path.splitext(os.path.basename(annotation_file))[0] + "_mask.npy"
    mask_output_path = os.path.join(mask_output_folder1, mask_filename)
    
    np.save(mask_output_path, mask)  # Save the mask
print(f"Saved mask for annotation_files in {mask_output_folder1}")
    
for annotation_file in tqdm(annotation_files2):
    # Convert each resized annotation to a multi-channel mask
    annotation_data = open_json(annotation_file)  # Load annotation
    mask = labelme2mask(annotation_data)  # Convert to mask

    # Save the mask as a NumPy array (.npy) or any other format like PNG
    mask_filename = os.path.splitext(os.path.basename(annotation_file))[0] + "_mask.npy"
    mask_output_path = os.path.join(mask_output_folder2, mask_filename)
    
    np.save(mask_output_path, mask)  # Save the mask
print(f"Saved mask for annotation_file in {mask_output_folder2}")

100%|██████████| 6935/6935 [08:23<00:00, 13.76it/s]


Resized images saved in target folder/train/img


100%|██████████| 2010/2010 [02:29<00:00, 13.41it/s]


Resized images saved in target folder/test/img


100%|██████████| 6935/6935 [01:35<00:00, 72.57it/s] 


Resized annotations saved in target folder/train/ann


100%|██████████| 2010/2010 [00:19<00:00, 105.49it/s]


Resized annotations saved in target folder/test/ann


100%|██████████| 6935/6935 [02:46<00:00, 41.54it/s]


Images Processed and saved in filtered/train


100%|██████████| 2010/2010 [00:47<00:00, 41.91it/s]


Images Processed and saved in filtered/test


100%|██████████| 6935/6935 [05:58<00:00, 19.35it/s]


Saved mask for annotation_files in mask/train


100%|██████████| 2010/2010 [01:35<00:00, 20.95it/s]

Saved mask for annotation_file in mask/test





In [5]:
from tensorflow.keras.utils import to_categorical
import cv2

def preprocessed_data_generator(image_folder, mask_folder, batch_size, num_classes=19):
    """Optimized generator to load preprocessed images and masks directly."""
    
    image_files = sorted(glob(os.path.join(image_folder, "*.jpg")))
    mask_files = sorted(glob(os.path.join(mask_folder, "*.npy")))

    # Precompute total steps to avoid recalculating each epoch
    num_samples = len(image_files)
    if num_samples == 0:
        raise ValueError("No image files found in the folder")

    print(f"Found {num_samples} samples.")

    while True:
        indices = np.random.permutation(num_samples)  # Shuffle data for each epoch
        for i in range(0, num_samples, batch_size):
            batch_images = []
            batch_masks = []
            
            batch_indices = indices[i:i + batch_size]
            for idx in batch_indices:
                # Load the preprocessed image (check for existence)
                image_path = image_files[idx]
                mask_path = mask_files[idx]

                try:
                    image = cv2.imread(image_path)
                    mask = np.load(mask_path, allow_pickle=True)
                except Exception as e:
                    print(f"Error loading files: {image_path} or {mask_path} - {e}")
                    continue

                # Convert mask to one-hot encoding
                mask = to_categorical(mask, num_classes=num_classes)

                # Append to batch
                batch_images.append(image)
                batch_masks.append(mask)

            # Ensure batch is not empty
            if len(batch_images) > 0 and len(batch_masks) > 0:
                # Debug: print batch details to track progress
                print(f"Yielding batch {i//batch_size + 1} with {len(batch_images)} images.")
                yield np.array(batch_images), np.array(batch_masks)

            else:
                print(f"Empty batch encountered at batch {i//batch_size + 1}, skipping.")

In [8]:
import os
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Input

# Define the model architecture
def create_damage_detection_model(input_shape=(512, 512, 3)):
    inputs = Input(shape=input_shape)

    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)

    up1 = UpSampling2D(size=(2, 2))(conv3)
    conv4 = Conv2D(64, (3, 3), activation='relu', padding='same')(up1)

    up2 = UpSampling2D(size=(2, 2))(conv4)
    conv5 = Conv2D(32, (3, 3), activation='relu', padding='same')(up2)

    output = Conv2D(19, (1, 1), activation='softmax')(conv5)

    model = Model(inputs=inputs, outputs=output)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    return model

In [3]:
model = create_damage_detection_model()
model.summary()

In [9]:
import numpy as np
from glob import glob
import os

# Define paths to preprocessed images and masks
train_image_folder = "filtered/train"
train_mask_folder = "mask/train"

# Create the training data generator
train_generator = preprocessed_data_generator(train_image_folder, train_mask_folder, 32)

# Train the model using the generator
steps_per_epoch = len(glob(os.path.join(train_image_folder, "*.npy"))) // 32
model.fit(train_generator, steps_per_epoch=steps_per_epoch, epochs=15)

Found 6935 samples.


MemoryError: Unable to allocate 722. MiB for an array with shape (4980736, 19) and data type float64