In [None]:
import cv2
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import Sequence
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers
from tensorflow.keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda

In [2]:
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda


def multi_unet_model(n_classes=2, IMG_HEIGHT=256, IMG_WIDTH=256, IMG_CHANNELS=3):
#Build the model
    inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
    s = inputs

    #Contraction path
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(s)
    c1 = Dropout(0.1)(c1)  # Original 0.1
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)
    
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p1)
    c2 = Dropout(0.1)(c2)  # Original 0.1
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)
     
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p2)
    c3 = Dropout(0.1)(c3)
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)
     
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p3)
    c4 = Dropout(0.1)(c4)
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c4)
    p4 = MaxPooling2D(pool_size=(2, 2))(c4)
     
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p4)
    c5 = Dropout(0.3)(c5)
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)
    
    #Expansive path 
    u6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = concatenate([u6, c4])
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u6)
    c6 = Dropout(0.1)(c6)
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c6)
     
    u7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, c3])
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u7)
    c7 = Dropout(0.2)(c7)
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c7)
     
    u8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c7)
    u8 = concatenate([u8, c2])
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u8)
    c8 = Dropout(0.1)(c8)  # Original 0.1
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c8)
     
    u9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(c8)
    u9 = concatenate([u9, c1], axis=3)
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u9)
    c9 = Dropout(0.1)(c9)  # Original 0.1
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c9)
     
    outputs = Conv2D(n_classes, (1, 1), activation='softmax')(c9)
     
    model = Model(inputs=[inputs], outputs=[outputs])
    
    #NOTE: Compile the model in the main program to make it easy to test with various loss functions
    model.compile(optimizer='adam', loss=['categorical_crossentropy'], metrics=['accuracy'])
    
    model.summary()
    
    return model

In [3]:
model = multi_unet_model()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 256, 256, 16  448         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 dropout (Dropout)              (None, 256, 256, 16  0           ['conv2d[0][0]']                 
                                )                                                             

In [None]:
# To convert all the image form PNG to JPG
# you compile this cell if and only if the images in your dataset is not in JPG but in PNG
# This cell will convert. Otherwise pass
import os
from PIL import Image

def convert_images_to_jpg(directory):
    for filename in os.listdir(directory):
        if filename.endswith(".png") or filename.endswith(".jpeg") or filename.endswith(".gif"):
            img_path = os.path.join(directory, filename)
            img = Image.open(img_path)
            new_filename = os.path.splitext(filename)[0] + ".jpg"
            new_img_path = os.path.join(directory, new_filename)
            img.convert("RGB").save(new_img_path, "JPEG")
            os.remove(img_path)
            print(f"Converted {filename} to JPG")

# Specify the directory containing the images
directory = "C:/Users/DELL E7470/Desktop/Code/Fish_Dataset/Red Sea Bream/Red Sea Bream GT"

# Call the function to convert images to JPG
convert_images_to_jpg(directory)


In [4]:
num_classes=2
H=256
W=256

def read_image(x):
    x = cv2.imread(x, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x/255.0
    x = x.astype(np.float32)
    return x

#def read_image(x):
#    x = cv2.imread(x, cv2.IMREAD_COLOR)
#    if x is not None:
#        x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB color format
#        x = cv2.resize(x, (W, H))
#        x = x / 255.0
#        x = x.astype(np.float32)
#    return x


def read_mask(x):
    x = cv2.imread(x, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (W, H))
    x = x.astype(np.int32)
    return x


def tf_dataset(x,y, batch=4):
    dataset = tf.data.Dataset.from_tensor_slices((x,y)) # Dataset object from Tensorflow
    dataset = dataset.shuffle(buffer_size=100) 
    dataset = dataset.map(preprocess) # Applying preprocessing to every batch in the Dataset object
    dataset = dataset.batch(batch) # Determine batch-size
    dataset = dataset.repeat()
    dataset = dataset.prefetch(2) # Optimization
    return dataset
        

def preprocess(x,y):
    def f(x,y):
        x = x.decode()
        y = y.decode()
        image = read_image(x)
        mask = read_mask(y)
        return image, mask
    
    image, mask = tf.numpy_function(f,[x,y],[tf.float32, tf.int32])
    mask = tf.one_hot(mask, num_classes, dtype=tf.int32)
    image.set_shape([H, W, 3])    # In the Images, number of channels = 3. 
    mask.set_shape([H, W, num_classes])    # In the Masks, number of channels = number of classes. 
    return image, mask


In [5]:
#root_dir = '../input/semantic-drone-dataset/dataset/semantic_drone_dataset'
img_path = "C:/Users/DELL E7470/Desktop/Code/Fish_Dataset/Red Sea Bream/Red Sea Bream"
mask_path ="C:/Users/DELL E7470/Desktop/Code/Fish_Dataset/Red Sea Bream/Red Sea Bream GT"

names = list(map(lambda x: x.replace('.jpg', ''), os.listdir(img_path)))

In [6]:
X_trainval, X_test = train_test_split(names, test_size=0.1, random_state=19)
X_train, X_val = train_test_split(X_trainval, test_size=0.2, random_state=19)

print(f"Train Size : {len(X_train)} images")
print(f"Val Size   :  {len(X_val)} images")
print(f"Test Size  :  {len(X_test)} images")

y_train = X_train
y_test = X_test
y_val = X_val

img_train = [os.path.join(img_path, f"{name}.jpg") for name in X_train]
mask_train = [os.path.join(mask_path, f"{name}.png") for name in y_train]
img_val = [os.path.join(img_path, f"{name}.jpg") for name in X_val]
mask_val = [os.path.join(mask_path, f"{name}.png") for name in y_val]
img_test = [os.path.join(img_path, f"{name}.jpg") for name in X_test]
mask_test = [os.path.join(mask_path, f"{name}.png") for name in y_test]

Train Size : 720 images
Val Size   :  180 images
Test Size  :  100 images


In [7]:
batch_size=3

train_dataset = tf_dataset(img_train, mask_train, batch = batch_size)
valid_dataset = tf_dataset(img_val, mask_val, batch = batch_size)

train_steps = len(img_train)//batch_size
valid_steps = len(img_val)//batch_size

In [8]:
train_dataset.take(0)

<TakeDataset element_spec=(TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None, 256, 256, 2), dtype=tf.int32, name=None))>

In [9]:
num_classes=2
H=256
W=256


def test_dataset(x, batch=4):
    dataset = tf.data.Dataset.from_tensor_slices(x)
    dataset = dataset.map(preprocess_test)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(2)
    return dataset
        

def preprocess_test(x):
    def f(x):
        x = x.decode()
        image = read_image(x)
        return image
    
    image = tf.convert_to_tensor(tf.numpy_function(f, [x] , [tf.float32]))
    image = tf.reshape(image, (H, W, 3))    # In the Images, number of channels = 3.  
    return image

In [10]:
es = tf.keras.callbacks.EarlyStopping(min_delta=0.001, patience=10)

In [11]:
history = model.fit(train_dataset,
          steps_per_epoch=train_steps,
          validation_data=valid_dataset,
          validation_steps=valid_steps,
          epochs=1 #just to check it works properly
         )

UnknownError: Graph execution error:

error: OpenCV(4.6.0) C:\b\abs_d8ltn27ay8\croot\opencv-suite_1676452046667\work\modules\imgproc\src\resize.cpp:4052: error: (-215:Assertion failed) !ssize.empty() in function 'cv::resize'

Traceback (most recent call last):

  File "C:\Users\DELL E7470\anaconda3\lib\site-packages\tensorflow\python\ops\script_ops.py", line 271, in __call__
    ret = func(*args)

  File "C:\Users\DELL E7470\anaconda3\lib\site-packages\tensorflow\python\autograph\impl\api.py", line 642, in wrapper
    return func(*args, **kwargs)

  File "C:\Users\DELLE7~1\AppData\Local\Temp\__autograph_generated_filed2ku4b4p.py", line 19, in f
    mask = ag__.converted_call(ag__.ld(read_mask), (ag__.ld(y),), None, fscope_1)

  File "C:\Users\DELL E7470\anaconda3\lib\site-packages\tensorflow\python\autograph\impl\api.py", line 335, in converted_call
    return _call_unconverted(f, args, kwargs, options, False)

  File "C:\Users\DELL E7470\anaconda3\lib\site-packages\tensorflow\python\autograph\impl\api.py", line 459, in _call_unconverted
    return f(*args)

  File "C:\Users\DELL E7470\AppData\Local\Temp\ipykernel_7068\2037341966.py", line 24, in read_mask
    x = cv2.resize(x, (W, H))

cv2.error: OpenCV(4.6.0) C:\b\abs_d8ltn27ay8\croot\opencv-suite_1676452046667\work\modules\imgproc\src\resize.cpp:4052: error: (-215:Assertion failed) !ssize.empty() in function 'cv::resize'



	 [[{{node PyFunc}}]]
	 [[IteratorGetNext]] [Op:__inference_train_function_3246]

In [None]:
# Train the model
model.fit(original_images, mask_images, epochs=10, batch_size=32)

In [None]:
# Evaluate the model
evaluation = model.evaluate(original_images, mask_images, batch_size=32)

In [None]:
# Compute additional metrics

predicted_masks = model.predict(original_images)
iou = tf.keras.metrics.MeanIoU(num_classes=2)
iou.update_state(mask_images, predicted_masks)
iou_result = iou.result().numpy()

# Print evaluation metrics
print("Loss: ", evaluation[0])
print("Accuracy: ", evaluation[1])
print("IoU: ", iou_result)

In [None]:
# Save the trained model
model.save('unet_model.h5')