In [1]:
!git clone https://jakabfarkas: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@github.com/Frostforus/DTU_Deep_Learning_Image_Segmentation_Project.git


Cloning into 'DTU_Deep_Learning_Image_Segmentation_Project'...
remote: Enumerating objects: 3406, done.[K
remote: Counting objects: 100% (737/737), done.[K
remote: Compressing objects: 100% (367/367), done.[K
remote: Total 3406 (delta 367), reused 733 (delta 366), pack-reused 2669[K
Receiving objects: 100% (3406/3406), 310.87 MiB | 13.11 MiB/s, done.
Resolving deltas: 100% (1228/1228), done.
Updating files: 100% (3010/3010), done.


In [2]:
import os
import cv2
import matplotlib.pyplot as plt
from PIL import Image
import json
import numpy as np
import imgaug as ia
from imgaug import augmenters as iaa
import re
import colorsys
from scipy.ndimage import zoom

In [3]:
data = [
    ["orange", "hood", 10],
    ["dark green", "front door", 20],
    ["yellow", "rear door", 30],
    ["cyan", "frame", 40],
    ["purple", "rear quarter panel", 50],
    ["light green", "trunk lid", 60],
    ["blue", "fender", 70],
    ["pink", "bumper", 80],
    ["no color", "rest of car", 90],
    ["white", "background", 0]
]

color_to_hue = {
    "orange": 30,
    "dark green": 120,
    "yellow": 60,
    "cyan": 180,
    "purple": 270,
    "light green": 150,
    "blue": 210,
    "pink": 330,
    #TODO: change this to black later
    "no color": 0,
    "white": 1,
}

#, row[1] this can be put into the thing too
class_decode_dict = {row[2]: color_to_hue.get(row[0], row[0]) for row in data}
class_decode_dict

{10: 30,
 20: 120,
 30: 60,
 40: 180,
 50: 270,
 60: 150,
 70: 210,
 80: 330,
 90: 0,
 0: 1}

In [4]:
def filter_files_by_regex(directory, pattern):
    # Use os.listdir() to get a list of all files and directories in the specified directory
    file_list = os.listdir(directory)

    # Use a list comprehension to filter the files based on the regex pattern
    filtered_files = [filename for filename in file_list if re.match(pattern, filename)]

    return filtered_files

In [5]:
def load_numpy_arrays(directory, file_names,hue_mapping=class_decode_dict, max_load = 0):
    # Create an empty list to store the numpy arrays
    x = []
    y = []

    # Iterate through the file names
    for filename in file_names:
        if len(x) == max_load: break

        # Load the numpy array from the file path
        array = np.load(directory + filename)
        # Extract the first 3 channels for x
        x.append(array[..., :3])


        # Extract y
        hue_values = np.vectorize(hue_mapping.get)(array[..., -1])

        # Convert hue values to RGB values
        y_temp = np.empty_like(array[..., :3], dtype=np.uint8)
        for i in range(256):
            for j in range(256):
                if hue_values[i, j] == 1:
                    y_temp[i, j] = (0,0,0)

                elif hue_values[i, j] == 0:
                    y_temp[i, j] = (array[i, j, 0], array[i, j, 1], array[i, j, 2])

                else:
                    hsv_color = (hue_values[i, j] / 360.0, 1, 1)  # Convert hue to HSV format
                    rgb_color = colorsys.hsv_to_rgb(*hsv_color)  # Convert to RGB
                    y_temp[i, j] = (int(rgb_color[0] * 255), int(rgb_color[1] * 255), int(rgb_color[2] * 255))

        y.append(y_temp)




    return x,y

In [6]:
'''
from google.colab import drive
drive.mount('/content/drive')
directory_path = 'drive/My Drive/carseg_data/new_arrays/'
# regex_pattern = r'black_5_doors_[0-9]*\.npy'
regex_pattern = r'photo_[0-9]*\.npy'
black_car_image_file_names = sorted(filter_files_by_regex(directory_path, regex_pattern))
'''
from google.colab import drive
import os
drive.mount('/content/drive')
directory_path = 'drive/My Drive/carseg_data/hyperparam_opt/'
# Get all file names in the directory
all_files = os.listdir(directory_path)

# Filter out directories (if any)
all_files = [file for file in all_files if os.path.isfile(os.path.join(directory_path, file))]

# Sort the file names
all_files = sorted(all_files)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def show_image(image):
    # Extract the image data from the last channel (the alpha channel, for example)
    image_data = image[:, :]

    # Display the image using Matplotlib
    plt.imshow(image_data)  # Use 'gray' colormap for grayscale images
    plt.axis('off')  # Turn off the axis labels and ticks
    plt.show()

In [None]:
data

[['orange', 'hood', 10],
 ['dark green', 'front door', 20],
 ['yellow', 'rear door', 30],
 ['cyan', 'frame', 40],
 ['purple', 'rear quarter panel', 50],
 ['light green', 'trunk lid', 60],
 ['blue', 'fender', 70],
 ['pink', 'bumper', 80],
 ['no color', 'rest of car', 90],
 ['white', 'background', 0]]

In [7]:
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Conv2D, Input, MaxPooling2D, concatenate, Dropout,\
                                    Lambda, Conv2DTranspose, Add
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
import numpy as np
import tensorflow as tf
import os


imshape = (256, 256, 3)
n_classes = len(data)


def preprocess_input(x):
    x /= 255.
    x -= 0.5
    x *= 2.
    return x


def dice(y_true, y_pred, smooth=1.):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
def unet(pretrained=False, base=4):

    if pretrained:
        path = os.path.join('models', model_name+'.model')
        if os.path.exists(path):
            model = load_model(path, custom_objects={'dice': dice})
            model.summary()
            return model
        else:
            print('Failed to load existing model at: {}'.format(path))

    if n_classes == 1:
        loss = 'binary_crossentropy'
        final_act = 'sigmoid'
    elif n_classes > 1:
        loss = 'categorical_crossentropy'
        final_act = 'softmax'

    b = base
    i = Input((imshape[0], imshape[1], imshape[2]))
    s = Lambda(lambda x: preprocess_input(x)) (i)

    c1 = Conv2D(2**b, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (s)
    c1 = Dropout(0.1) (c1)
    c1 = Conv2D(2**b, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c1)
    p1 = MaxPooling2D((2, 2)) (c1)

    c2 = Conv2D(2**(b+1), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p1)
    c2 = Dropout(0.1) (c2)
    c2 = Conv2D(2**(b+1), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c2)
    p2 = MaxPooling2D((2, 2)) (c2)

    c3 = Conv2D(2**(b+2), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p2)
    c3 = Dropout(0.2) (c3)
    c3 = Conv2D(2**(b+2), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c3)
    p3 = MaxPooling2D((2, 2)) (c3)

    c4 = Conv2D(2**(b+3), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p3)
    c4 = Dropout(0.2) (c4)
    c4 = Conv2D(2**(b+3), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c4)
    p4 = MaxPooling2D(pool_size=(2, 2)) (c4)

    c5 = Conv2D(2**(b+4), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p4)
    c5 = Dropout(0.3) (c5)
    c5 = Conv2D(2**(b+4), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c5)

    u6 = Conv2DTranspose(2**(b+3), (2, 2), strides=(2, 2), padding='same') (c5)
    u6 = concatenate([u6, c4])
    c6 = Conv2D(2**(b+3), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u6)
    c6 = Dropout(0.2) (c6)
    c6 = Conv2D(2**(b+3), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c6)

    u7 = Conv2DTranspose(2**(b+2), (2, 2), strides=(2, 2), padding='same') (c6)
    u7 = concatenate([u7, c3])
    c7 = Conv2D(2**(b+2), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u7)
    c7 = Dropout(0.2) (c7)
    c7 = Conv2D(2**(b+2), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c7)

    u8 = Conv2DTranspose(2**(b+1), (2, 2), strides=(2, 2), padding='same') (c7)
    u8 = concatenate([u8, c2])
    c8 = Conv2D(2**(b+1), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u8)
    c8 = Dropout(0.1) (c8)
    c8 = Conv2D(2**(b+1), (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c8)

    u9 = Conv2DTranspose(2**b, (2, 2), strides=(2, 2), padding='same') (c8)
    u9 = concatenate([u9, c1], axis=3)
    c9 = Conv2D(2**b, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u9)
    c9 = Dropout(0.1) (c9)
    c9 = Conv2D(2**b, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c9)

    o = Conv2D(n_classes, (1, 1), activation=final_act) (c9)

    model = Model(inputs=i, outputs=o)
    model.compile(optimizer=Adam(1e-4),
                  loss=loss,
                  metrics=[dice])
   # model.summary()

    return model

In [8]:
import numpy as np
import pickle
import os
from PIL import Image

import imgaug as ia
from imgaug import augmenters as iaa
import cv2
import json
import tensorflow as tf


ia.seed(1)

seq = iaa.Sequential([
    iaa.Fliplr(0.5),
    iaa.Multiply((1.2, 1.5)),
    iaa.Affine(
        #scale={"x": (0.8, 1.2), "y": (0.8, 1.2)},
        rotate=(-90, 90)
    ),
    iaa.Sometimes(0.5,
        iaa.GaussianBlur(sigma=(0, 8))
    )
], random_order=True)


class DataGenerator(tf.keras.utils.Sequence):
    # Generates data for Keras
    def __init__(self, paths, batch_size=32, shuffle=True, augment=False):
        self.paths = paths
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.augment = augment
        self.on_epoch_end()


    def __len__(self):
        # Denotes the number of batches per epoch
        return int(np.floor(len(self.paths) / self.batch_size))


    def __getitem__(self, index):
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        paths = [self.paths[k] for k in indexes]

        X, y = self.__data_generation(paths)

        return X, y


    def on_epoch_end(self):
        # Updates indexes after each epoch
        self.indexes = np.arange(len(self.paths))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)


    #TODO: This is where the data is generated I need to change this to the one I wrote above^
    def __data_generation(self, paths):

        X = np.empty((self.batch_size, imshape[0], imshape[1], imshape[2]), dtype=np.float32)
        Y = np.empty((self.batch_size, imshape[0], imshape[1], n_classes),  dtype=np.float32)

        for i, path in enumerate(paths):

            array = np.load(path)

            im = array[...,:3]

            mask = np.zeros((imshape[0], imshape[1], n_classes))

            classes = np.array([0,10,20,30,40,50,60,70,80,90])

            for j, current_class in enumerate(classes):
                a, b = np.where(array[...,-1]==current_class)
                mask[a,b,np.repeat(j,a.size)] = 1

            X[i,] = im
            Y[i,] = mask

        return X, Y

In [9]:
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.losses import binary_crossentropy, categorical_crossentropy

def binary_crossentropy_loss(y_true, y_pred):
    return binary_crossentropy(y_true, y_pred)

def categorical_cross_entropy_loss(y_true, y_pred):
    return categorical_crossentropy(y_true, y_pred)

def DiceLoss(targets, inputs, smooth=1e-6):
    inputs = K.flatten(inputs)
    targets = K.flatten(targets)
    intersection = K.sum(targets * inputs)
    return 1-(2*intersection + smooth) / (K.sum(targets) + K.sum(inputs) + smooth)

def DiceBCELoss(targets, inputs, smooth=1e-6):

    # Calculate BCE loss
    BCE = binary_crossentropy_loss(targets, inputs)

    # Calculate intersection and dice loss
    dice_loss = DiceLoss(targets, inputs)

    # Combine BCE and dice loss
    Dice_BCE = BCE + dice_loss

    return Dice_BCE

In [None]:
'''
import os
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam, SGD, RMSprop



def sorted_fns(dir):
    return sorted(os.listdir(dir), key=lambda x: int(x.split('.')[0][-4:]))

all_paths = [os.path.join(directory_path, x) for x in sorted_fns(directory_path)]

# Split the data into training and validation sets
train_paths, val_paths = train_test_split(all_paths, test_size=0.18934911, random_state=42)

model = unet(pretrained=False, base=4)

tg = DataGenerator(paths=train_paths, batch_size=32, augment=True)

model.compile(optimizer=Adam(learning_rate=0.001),
                  loss=DiceLoss,
                  metrics=[dice])


# Define EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_dice', patience=10, restore_best_weights=True)

# Define validation generator
val_generator = DataGenerator(paths=val_paths, batch_size=32, augment=False)

'''

In [None]:
'''
model.fit(tg,
          steps_per_epoch=len(tg),
          epochs=100, verbose=1,
          validation_data=val_generator,
          validation_steps=len(val_generator)
          # callbacks=[checkpoint, train_val, tb_mask]
          )

model.fit_generator(generator=tg,
                        steps_per_epoch=len(tg),
                        epochs=500,  # Adjust the number of epochs based on your needs
                        validation_data=val_generator,
                        validation_steps=len(val_generator),
                        verbose=1,
                        callbacks=[early_stopping])
'''

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
 7/75 [=>............................] - ETA: 36s - loss: 0.2250 - dice: 0.9098

KeyboardInterrupt: ignored

In [None]:
savemodelto = "drive/My Drive/carseg_data/"
model.save(savemodelto+'model.keras')

In [None]:
!pip install h5py



In [None]:
from keras.models import model_from_json

# serialize model to JSON
model_json = model.to_json()
with open("model.json", "w") as json_file:
    json_file.write(model_json)

model.save_weights("model.h5")

In [None]:
print("Model saved.")

Model saved.


In [13]:
from sklearn.model_selection import ParameterGrid
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.optimizers import Adam, SGD, RMSprop


# Split the data into training and validation sets
def sorted_fns(dir):
    return sorted(os.listdir(dir), key=lambda x: int(x.split('.')[0][-4:]))

all_paths = [os.path.join(directory_path, x) for x in sorted_fns(directory_path)]

# Split the data into training and validation sets
train_paths, val_paths = train_test_split(all_paths, test_size=0.2, random_state=42)

# Define hyperparameter grid
param_grid = {
    'optimizer': [Adam, RMSprop, SGD],  # Use optimizer classes, not strings (RMSprop)
    'learning_rate': [1e-5, 1e-3, 1e-4], #1e-2,
    'batch_size': [32],
    'loss': [DiceLoss, categorical_cross_entropy_loss, DiceBCELoss] #binary_crossentropy_loss,DiceLoss
}

# Generate all combinations of hyperparameters
param_combinations = list(ParameterGrid(param_grid))

# Perform grid search
best_params = None
best_loss = float('inf')
best_model_number = None
model_counter = 1

for params in param_combinations:

    print(f"\nTraining model with parameters: {params}")

    model = unet(pretrained=False, base=4)

    tg = DataGenerator(paths=train_paths, batch_size=params['batch_size'], augment=True)

    model.compile(optimizer=params['optimizer'](learning_rate=params['learning_rate']),
                  loss=params['loss'],
                  metrics=[dice])

    # Define EarlyStopping callback
    early_stopping = EarlyStopping(monitor='val_loss',min_delta=0.005,mode="min", patience=5, restore_best_weights=True)

    # Define validation generator
    val_generator = DataGenerator(paths=val_paths, batch_size=params['batch_size'], augment=False)

    model.fit_generator(generator=tg,
                        steps_per_epoch=len(tg),
                        epochs=100,  # Adjust the number of epochs based on your needs
                        validation_data=val_generator,
                        validation_steps=len(val_generator),
                        verbose=1)
                        #callbacks=[early_stopping])

    # Evaluate on the validation set
    val_loss = model.evaluate_generator(generator=val_generator, steps=len(val_generator))

    if val_loss[0] < best_loss:
        best_loss = val_loss[0]
        best_params = params
        best_model_number = model_counter

    model_counter += 1

# Print the best hyperparameters and the corresponding model number
print(f"Best Hyperparameters for Model {best_model_number}:", best_params)



Training model with parameters: {'batch_size': 32, 'learning_rate': 1e-05, 'loss': <function DiceLoss at 0x7a27eac69990>, 'optimizer': <class 'keras.src.optimizers.adam.Adam'>}


  model.fit_generator(generator=tg,


Epoch 1/100


KeyboardInterrupt: ignored

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
%tensorboard --logdir logs/fit

In [None]:
class_to_rgb = {
    1: [255,128,0],
    2: [0,128,0],
    3: [255,255,0],
    4: [0,255,255],
    5: [128,0,255],
    6: [0,255,128],
    7: [0,128,255],
    8: [255,0,128],
    9: [255,255,255],
    0: [0,0,0],
}

In [None]:
def classToRGB(_paddedPrediction,_class_to_rgb):
  for x in range(0,_paddedPrediction.shape[0]):
    for y in range(0,_paddedPrediction.shape[1]):
      _paddedPrediction[x][y] = _class_to_rgb[_paddedPrediction[x][y][0]]

In [None]:
# Predict all images loaded in
for i in range(0,len(numpy_arraysx)):
  # Image to predict
  image=numpy_arraysx[i].reshape(1,256,256,3)
  prediction = np.argmax(model.predict(image,verbose=0).squeeze(),axis=2)

  paddedPrediction = np.pad(prediction[...,np.newaxis], ((0,0),(0,0),(0,2)), mode='constant', constant_values=1)
  classToRGB(paddedPrediction,class_to_rgb)
  # Predictions:
  # Input and expected output:
  show_image(numpy_arraysx[i])
  show_image(numpy_arraysy[i])
  show_image(paddedPrediction)

In [None]:
def show_segmentation_on_car_image(model, new_image_path, class_to_rgb):
    # Load the new car image
    new_image = Image.open(new_image_path)

    # Resize the image to match the input dimensions of your model
    new_image = new_image.resize((256, 256))
    new_image_array = np.array(new_image) / 255.0  # Normalize pixel values
    new_image_array = np.expand_dims(new_image_array, axis=0)  # Add batch dimension

    # Make predictions on the new image
    prediction = np.argmax(model.predict(new_image_array, verbose=0).squeeze(), axis=2)

    # Pad the prediction and map class values to RGB colors
    padded_prediction = np.pad(prediction[..., np.newaxis], ((0, 0), (0, 0), (0, 2)), mode='constant', constant_values=1)
    classToRGB(padded_prediction, class_to_rgb)

    # Visualize the original image and the segmented image
    plt.subplot(1, 2, 1)
    plt.imshow(new_image)
    plt.title('Original Image')

    plt.subplot(1, 2, 2)
    plt.imshow(padded_prediction)
    plt.title('Segmented Image')

    plt.show()

# Example usage
new_image_path = 'drive/My Drive/carseg_data/images/photo/no_segmentation/0168.jpg'
show_segmentation_on_car_image(model, new_image_path, class_to_rgb)