In [4]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import cv2
from pathlib import Path
from itertools import combinations
from tqdm import tqdm
import os
import gdown
from sklearn.cluster import DBSCAN
import gc
from tensorflow.keras import layers, models
from tensorflow import keras
import tensorflow as tf
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Conv2DTranspose, concatenate, Dropout, BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split

In [5]:

import gdown

url = "https://drive.google.com/file/d/1xGBYnRgJkYVBnnQpTphWkmEyU77etqPd/view?usp=sharing"  # Direct link to the file
output = "/content/GTSDB.zip"
gdown.download(url=url, output=output, fuzzy=True)

# Unzip the downloaded file
import zipfile

with zipfile.ZipFile(output, 'r') as zip_ref:
    zip_ref.extractall('/content/')

# Adjust the path to where you've extracted the gt.txt file and images in Colab
gt_path = '/content/GTSDB/FullIJCNN2013/gt.txt'
images_path = Path('/content/GTSDB/FullIJCNN2013/FullIJCNN2013')
output_path = Path('/content/ProcessedImages')
augmented_path = Path('/content/AugmentedImages')

output_path.mkdir(parents=True, exist_ok=True)
augmented_path.mkdir(parents=True, exist_ok=True)

# Load the ground truth file
columns = ['Filename', 'LeftCol', 'TopRow', 'RightCol', 'BottomRow', 'ClassID']
gt_data = pd.read_csv(gt_path, sep=';', header=None, names=columns)

# Display the first few rows of the dataframe
print(gt_data.head())

Downloading...
From (original): https://drive.google.com/uc?id=1xGBYnRgJkYVBnnQpTphWkmEyU77etqPd
From (redirected): https://drive.google.com/uc?id=1xGBYnRgJkYVBnnQpTphWkmEyU77etqPd&confirm=t&uuid=918555f4-c020-4ae1-86b4-1f0961334d01
To: /content/GTSDB.zip
100%|██████████| 1.72G/1.72G [00:21<00:00, 79.9MB/s]


    Filename  LeftCol  TopRow  RightCol  BottomRow  ClassID
0  00000.ppm      774     411       815        446       11
1  00001.ppm      983     388      1024        432       40
2  00001.ppm      386     494       442        552       38
3  00001.ppm      973     335      1031        390       13
4  00002.ppm      892     476      1006        592       39


In [6]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import cv2
from pathlib import Path
from tqdm import tqdm
import os
import tensorflow as tf
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Conv2DTranspose, concatenate, Dropout, BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split

# Enhanced data augmentation
data_gen_args = dict(rotation_range=30,
                     width_shift_range=0.2,
                     height_shift_range=0.2,
                     shear_range=0.2,
                     zoom_range=0.2,
                     horizontal_flip=True,
                     vertical_flip=True,
                     fill_mode='nearest')

image_datagen = ImageDataGenerator(**data_gen_args)
mask_datagen = ImageDataGenerator(**data_gen_args)

# Load the ground truth file
gt_path = '/content/GTSDB/FullIJCNN2013/gt.txt'
images_path = Path('/content/GTSDB/FullIJCNN2013/FullIJCNN2013')
columns = ['Filename', 'LeftCol', 'TopRow', 'RightCol', 'BottomRow', 'ClassID']
gt_data = pd.read_csv(gt_path, sep=';', header=None, names=columns)

# Split the data into training, validation, and test sets
train_data, temp_data = train_test_split(gt_data, test_size=0.3, random_state=42)
validation_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

# Save the data splits
train_data.to_csv('/content/train_data.csv', index=False)
validation_data.to_csv('/content/validation_data.csv', index=False)
test_data.to_csv('/content/test_data.csv', index=False)


In [7]:

# Preprocessing function
def preprocess_image_new(image_path, bounding_boxes, image_size=(512, 512)):
    image = cv2.imread(str(image_path))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    original_size = image.shape[:2]
    image = cv2.resize(image, (image_size[1], image_size[0]))

    mask = np.zeros((image_size[0], image_size[1]), dtype=np.uint8)
    for bbox in bounding_boxes:
        left, top, right, bottom = bbox
        left = int(left * image_size[1] / original_size[1])
        top = int(top * image_size[0] / original_size[0])
        right = int(right * image_size[1] / original_size[1])
        bottom = int(bottom * image_size[0] / original_size[0])
        cv2.rectangle(mask, (left, top), (right, bottom), 1, -1)

    return image, mask

def data_generator_new(gt_data, images_path, batch_size, image_size=(512, 512), augment=False):
    while True:
        batch_images = []
        batch_masks = []

        for i in range(batch_size):
            random_index = np.random.randint(0, len(gt_data))
            row = gt_data.iloc[random_index]
            image_path = images_path / row['Filename']
            bounding_boxes = [(row['LeftCol'], row['TopRow'], row['RightCol'], row['BottomRow'])]
            image, mask = preprocess_image_new(image_path, bounding_boxes, image_size)

            if augment:
                seed = np.random.randint(1e6)
                image = image_datagen.random_transform(image, seed=seed)

                # Expand the mask to 3 channels for augmentation
                mask = np.expand_dims(mask, axis=-1)
                mask = np.repeat(mask, 3, axis=-1)
                mask = mask_datagen.random_transform(mask, seed=seed)
                # Reduce the mask back to 1 channel
                mask = mask[..., 0]

            batch_images.append(image)
            batch_masks.append(mask)

        batch_images = np.array(batch_images) / 255.0
        batch_masks = np.array(batch_masks, dtype='float32')[..., np.newaxis]

        yield batch_images, batch_masks

def validation_data_generator_new(gt_data, images_path, batch_size, image_size=(512, 512)):
    while True:
        batch_images = []
        batch_masks = []

        for i in range(batch_size):
            random_index = np.random.randint(0, len(gt_data))
            row = gt_data.iloc[random_index]
            image_path = images_path / row['Filename']
            bounding_boxes = [(row['LeftCol'], row['TopRow'], row['RightCol'], row['BottomRow'])]
            image, mask = preprocess_image_new(image_path, bounding_boxes, image_size)

            batch_images.append(image)
            batch_masks.append(mask)

        batch_images = np.array(batch_images) / 255.0
        batch_masks = np.array(batch_masks, dtype='float32')[..., np.newaxis]

        yield batch_images, batch_masks
def conv_block_new(input_tensor, num_filters):
    encoder = layers.Conv2D(num_filters, (3, 3), padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.0001))(input_tensor)
    encoder = layers.BatchNormalization()(encoder)
    encoder = layers.Activation('relu')(encoder)
    encoder = layers.Conv2D(num_filters, (3, 3), padding='same', kernel_regularizer=tf.keras.regularizers.l2(0.0001))(encoder)
    encoder = layers.BatchNormalization()(encoder)
    encoder = layers.Activation('relu')(encoder)
    return encoder

def encoder_block_new(input_tensor, num_filters):
    encoder = conv_block_new(input_tensor, num_filters)
    encoder_pool = layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder)
    return encoder_pool, encoder

def decoder_block_new(input_tensor, concat_tensor, num_filters):
    decoder = layers.Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor)
    decoder = layers.concatenate([concat_tensor, decoder], axis=-1)
    decoder = conv_block_new(decoder, num_filters)
    return decoder

# Define the IoU metric function with the smooth variable
def iou_metric(y_true, y_pred, smooth=1):
    y_true = tf.keras.backend.flatten(y_true)
    y_pred = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true * y_pred)
    union = tf.keras.backend.sum(y_true) + tf.keras.backend.sum(y_pred) - intersection
    return (intersection + smooth) / (union + smooth)

def dice_coef(y_true, y_pred, smooth=1):
    y_true_f = tf.keras.backend.flatten(y_true)
    y_pred_f = tf.keras.backend.flatten(y_pred)
    intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth)

def weighted_binary_crossentropy(y_true, y_pred):
    weights = (y_true * 2.0) + 1.0
    bce = tf.keras.backend.binary_crossentropy(y_true, y_pred)
    weighted_bce = tf.keras.backend.mean(bce * weights)
    return weighted_bce

def combined_loss(y_true, y_pred):
    return weighted_binary_crossentropy(y_true, y_pred) + dice_coef_loss(y_true, y_pred)

def dice_coef_loss(y_true, y_pred):
    return 1 - dice_coef(y_true, y_pred)

def build_resunet(input_shape):
    inputs = Input(shape=input_shape)

    # Load pre-trained ResNet50 model + higher level layers
    resnet50 = ResNet50(weights='imagenet', include_top=False, input_tensor=inputs)

    # Skip connections
    skip1 = resnet50.get_layer('conv1_relu').output  # 256x256x64
    skip2 = resnet50.get_layer('conv2_block3_out').output  # 128x128x256
    skip3 = resnet50.get_layer('conv3_block4_out').output  # 64x64x512
    skip4 = resnet50.get_layer('conv4_block6_out').output  # 32x32x1024

    # Bridge
    bridge = resnet50.get_layer('conv5_block3_out').output  # 16x16x2048

    # Decoder
    d1 = Conv2DTranspose(1024, (2, 2), strides=(2, 2), padding='same')(bridge)
    d1 = concatenate([d1, skip4])
    d1 = conv_block_new(d1, 1024)

    d2 = Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(d1)
    d2 = concatenate([d2, skip3])
    d2 = conv_block_new(d2, 512)

    d3 = Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(d2)
    d3 = concatenate([d3, skip2])
    d3 = conv_block_new(d3, 256)

    d4 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(d3)
    d4 = concatenate([d4, skip1])
    d4 = conv_block_new(d4, 64)

    d5 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(d4)
    d5 = conv_block_new(d5, 32)

    outputs = Conv2D(1, (1, 1), activation='sigmoid')(d5)

    model = Model(inputs=[inputs], outputs=[outputs])

    model.compile(optimizer=Adam(learning_rate=1e-4), loss=combined_loss, metrics=[dice_coef, iou_metric])

    model.summary()
    return model

# Instantiate the model
input_shape_new = (512, 512, 3)
unet_model_new = build_resunet(input_shape_new)

# Define learning rate scheduler callback
reduce_lr = ReduceLROnPlateau(monitor='loss', factor=0.5, patience=3, min_lr=0.00001)

# Define early stopping and model checkpoint callbacks
callbacks = [
    reduce_lr,
    EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
    ModelCheckpoint('/content/best_model.h5', monitor='val_loss', save_best_only=True)
]

# Define the data generators
batch_size_new = 16
steps_per_epoch_new = (len(train_data) // batch_size_new) + 1
validation_steps_new = (len(validation_data) // batch_size_new) + 1

train_generator_new = data_generator_new(train_data, images_path, batch_size_new, augment=True)
validation_generator_new = validation_data_generator_new(validation_data, images_path, batch_size_new)


Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 512, 512, 3)]        0         []                            
                                                                                                  
 conv1_pad (ZeroPadding2D)   (None, 518, 518, 3)          0         ['input_1[0][0]']             
                                                                                                  
 conv1_conv (Conv2D)         (None, 256, 256, 64)         9472      ['conv1_pad[0][0]']           
                                                                                                  
 conv1_bn (BatchNormalizati  (None, 256, 256, 64) 

In [8]:

# Train the model with the callback
history_new = unet_model_new.fit(train_generator_new, steps_per_epoch=steps_per_epoch_new, epochs=20,
                                 validation_data=validation_generator_new, validation_steps=validation_steps_new,
                                 callbacks=callbacks)


Epoch 1/20

  saving_api.save_model(


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [12]:

# Save the model
unet_model_new.save('/content/enhanced_unet_model.h5')


In [13]:

# Evaluate the model
test_steps_new = (len(test_data) // batch_size_new) + 1
test_generator_new = validation_data_generator_new(test_data, images_path, batch_size_new)
loss_new, dice_coef_new, iou_new = unet_model_new.evaluate(test_generator_new, steps=test_steps_new)
print(f"Loss: {loss_new}")
print(f"Dice Coefficient: {dice_coef_new}")
print(f"IOU: {iou_new}")

# Function to plot training and validation metrics
def plot_metrics(history):
    plt.figure(figsize=(20, 6))

    # Plot loss
    plt.subplot(1, 3, 1)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.legend()
    plt.title('Loss')

    # Plot IoU
    plt.subplot(1, 3, 2)
    plt.plot(history.history['iou_metric'], label='Train IoU')
    plt.plot(history.history['val_iou_metric'], label='Validation IoU')
    plt.legend()
    plt.title('IoU')

    # Plot Dice Coefficient
    plt.subplot(1, 3, 3)
    plt.plot(history.history['dice_coef'], label='Train Dice Coefficient')
    plt.plot(history.history['val_dice_coef'], label='Validation Dice Coefficient')
    plt.legend()
    plt.title('Dice Coefficient')

    plt.show()

# Plot the metrics
plot_metrics(history_new)

# Function to visualize predictions
def visualize_predictions_new(generator, model, num_images=5):
    for _ in range(num_images):
        images, masks = next(generator)
        predictions = model.predict(images)

        for i in range(len(images)):
            plt.figure(figsize=(12, 4))

            plt.subplot(1, 3, 1)
            plt.imshow(images[i])
            plt.title('Original Image')

            plt.subplot(1, 3, 2)
            plt.imshow(masks[i].squeeze(), cmap='gray')
            plt.title('Ground Truth')

            plt.subplot(1, 3, 3)
            plt.imshow(predictions[i].squeeze(), cmap='gray')
            plt.title('Predicted Mask')

            plt.show()

# Visualize predictions
visualize_predictions_new(test_generator_new, unet_model_new)

# Freezing and saving the model
from tensorflow.keras.models import load_model

# Function to load and freeze the model
def load_and_freeze_model(model_path):
    model = load_model(model_path, custom_objects={'dice_coef': dice_coef, 'iou_metric': iou_metric, 'combined_loss': combined_loss})
    model.trainable = False
    return model


Output hidden; open in https://colab.research.google.com to view.

In [14]:
# Save the model
unet_model_new.save('/content/drive/MyDrive/models/enhanced_unet_model.h5')

# Load and freeze the best model
frozen_model = load_and_freeze_model('/content/best_model.h5')

# Save the frozen model
frozen_model.save('/content/drive/MyDrive/models/frozen_enhanced_unet_modelxl.h5')

