In [None]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
  try:
    # Currently, memory growth needs to be the same across GPUs
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Memory growth must be set before GPUs have been initialized
    print(e)

In [None]:
import psutil

def get_ram_usage():
    return psutil.virtual_memory().used / (1024 ** 3) 

ram_usage_gb = get_ram_usage()
print(f"RAM Usage: {ram_usage_gb:.2f} GB")


In [None]:
import psutil

# Get memory usage information
memory = psutil.virtual_memory()

# Total RAM in bytes
total_memory = memory.total

# RAM used in bytes
used_memory = memory.used

# RAM free in bytes
free_memory = memory.available

# RAM usage percentage
memory_percentage = memory.percent

print(f"Total Memory: {total_memory / (1024 ** 3)} GB")
print(f"Used Memory: {used_memory / (1024 ** 3)} GB")
print(f"Free Memory: {free_memory / (1024 ** 3)} GB")
print(f"Memory Usage: {memory_percentage}%")

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

In [None]:
import os
HOME = os.getcwd()
print(HOME)

In [None]:
from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

In [10]:
from ultralytics import YOLO

from IPython.display import display, Image

In [None]:
from ultralytics import YOLO
#path to yolov8 pre-trained model weights
model = YOLO(f'...')
#path to images folder to be detected
results=model.predict(source='...', iou=0.7, conf=0.5, save=True ,device=0)



In [13]:
cls=[]
for result in results :
  boxes =result.boxes
  classes=boxes.cls
  cls.append(classes)

for i in range(len(cls)):
  cls[i]=cls[i].cpu().numpy()



In [None]:
box=[]

for result in results :
  boxes =result.boxes
  boxes=boxes.xywh
  box.append(boxes)


for i in range(len(box)):
  box[i]=box[i].cpu().numpy()


box[0]

In [15]:
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
import numpy as np

In [None]:
import numpy as np
bin_masks=[]
for j in range(len(box)):
   boxes1 = box[j]
   classes1=cls[j]
   boxes1_list=boxes1.tolist()
   classes1_list=classes1.tolist()
   binary_map = np.zeros((32,640,640), dtype=np.uint8)
   for box1, class1 in zip(boxes1_list,classes1_list):
       x,y,w,h=box1
       x1, y1 = round(x), round(y)
       x2, y2 = round(x+w), round(y+h)
       binary_map[int(class1), y1:y2, x1:x2] = 1

   bin_masks.append(binary_map)

del results, cls, box
bin_masks[0]



In [17]:
def remove_nan(bb):
    bb=np.nan_to_num(bb, nan=0)
    return bb


In [18]:
bin_masks=  list(map(remove_nan, bin_masks))

In [None]:
import os
#path to images of the dataset
images_folder = '...'
#path to labels of the dataset
masks_folder = '...'

image_files = os.listdir(images_folder)
mask_files = os.listdir(masks_folder)


image_files.sort()
mask_files.sort()

image_mask_pairs = []

for image_file in image_files:
    image_path = os.path.join(images_folder, image_file)
    mask_file = image_file.replace('.jpg', '.tiff')
    mask_path = os.path.join(masks_folder, mask_file)

    if os.path.isfile(mask_path):
        image_mask_pairs.append((image_path, mask_path))


for image_path, mask_path in image_mask_pairs:
    print("Image:", image_path)
    print("Mask:", mask_path)
    print()
del image_files, mask_files

In [20]:
from skimage import exposure

def apply_clahe(image, clip_limit=0.02):
    image_float = image.astype(float) / 255.0

    image_clahe = exposure.equalize_adapthist(image_float, clip_limit=clip_limit)

    image_clahe = (image_clahe * 255).astype(image.dtype)

    return image_clahe

In [None]:
from skimage import exposure
from PIL import Image
import tifffile as tiff

image_data = []

mask_data = []

for img_file, mask_file in image_mask_pairs:
    img = Image.open(img_file)
    mask= tiff.imread(mask_file)
    img_data = np.array(img,dtype=np.float16)
    img_data=apply_clahe(img_data)
    msk_data =np.array(mask,dtype=np.float16)
    image_data.append(img_data)
    mask_data.append(msk_data)


In [22]:
dataset= list(zip(image_data,mask_data,bin_masks))

In [23]:
len(dataset)

425

In [24]:
import cv2

def resize_img(input_image,input_mask,input_bb):
  input_image = cv2.resize(input_image, (512,512), interpolation=cv2.INTER_NEAREST)
  input_bb = np.transpose(input_bb, axes=[1, 2, 0])
  input_bb = cv2.resize(input_bb, (512,512), interpolation=cv2.INTER_NEAREST)
  input_mask = np.transpose(input_mask, axes=[1, 2, 0])
  input_mask = cv2.resize(input_mask, (512,512), interpolation=cv2.INTER_NEAREST)
  return input_image,input_mask,input_bb



In [25]:
def augment(input_image,input_mask,input_bb):
    if np.random.uniform() > 0.5:
        input_image = np.fliplr(input_image)
        input_mask = np.fliplr(input_mask)
        input_bb = np.fliplr(input_bb)
        
    return input_image,input_mask,input_bb

In [26]:
def normalize(image_data):
    image_data = (image_data - np.min(image_data)) / (np.max(image_data) - np.min(image_data))
    return image_data

In [27]:
def load_image_train(datapoint):
    input_image=datapoint[0]
    input_mask= datapoint[1]
    input_bb =datapoint[2]
    input_image, input_mask, input_bb = resize_img(input_image, input_mask, input_bb)
    input_image = normalize(input_image)
    input_image, input_mask, input_bb = augment(input_image, input_mask, input_bb)


    return input_image, input_mask, input_bb

In [28]:
new_dataset=list(map(load_image_train,dataset))

In [None]:
import random

# Define the dataset and categories
dataset = new_dataset
categories = [24, 72, 15, 32, 37, 30, 33, 140, 7, 35]

# Initialize the test and train datasets
test_dataset = []
train_dataset = []

# Set the random seed for reproducibility
random.seed(42)
sum = 0
# Loop through each category
for i, category_count in enumerate(categories):
    # Shuffle the images for the current category
    random.shuffle(dataset[sum:category_count])  # Shuffle only within the current category
    
    # Calculate the number of images for testing (20%)
    num_test_images = int(0.2 * category_count)
    
    # Split the images into test and train
    test_images = dataset[sum:sum + num_test_images]
    train_images = dataset[sum + num_test_images:sum + category_count]
    
    # Update the test and train datasets
    test_dataset.extend(test_images)
    train_dataset.extend(train_images)
    
    # Print the category and the number of images in the test dataset
    print(f'Category {i + 1}: {num_test_images} images in the test dataset')
    sum+=category_count


In [31]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.colors import ListedColormap

In [None]:
obj=train_dataset[10]
image=obj[0]
image.shape
mask=obj[1]
bb=obj[2]

In [None]:
pil_image = tf.keras.preprocessing.image.array_to_img(image)
plt.imshow(pil_image)
plt.axis('off')

In [None]:
colormap = ListedColormap(['#000000'] * 32)

combined_image = np.zeros((512,512, 3), dtype=np.uint16)

for i in range(mask.shape[2]):
    image = mask[:,:,i]

    if i == 0 or i == 1 or i == 8 or i == 9 or i == 16 or i == 17 or i == 24 or i == 25:
      colormap.colors[i] = '#2828a2'
    elif i == 2 or i == 10 or i == 18 or i == 26:
      colormap.colors[i] = '#09c5c5'
    elif i == 3 or i == 4 or i == 11 or i == 12 or i == 19 or i == 20 or i == 27 or i == 28:
      colormap.colors[i] = '#1caf1c'
    elif i == 5 or i == 6 or i == 7:
      colormap.colors[i] = '#eeee25'
    elif i == 13 or i == 14 or i == 15:
      colormap.colors[i] = '#eeee25'
    elif i == 21 or i == 22 or i == 23:
      colormap.colors[i] = '#eeee25'
    elif i == 29 or i == 30 or i == 31:
      colormap.colors[i] = '#eeee25'

    hex_color = colormap.colors[i]
    rgb_color = np.array([int(hex_color[i:i+2], 16) for i in (1, 3, 5)], dtype=np.uint16)

    mask_new = np.where(image > 0, 1, 0)

    rgb_color_tiled = np.tile(rgb_color, (512, 512, 1))

    colored_image = mask_new[:, :, np.newaxis] * rgb_color_tiled

    combined_image = combined_image + colored_image

    combined_image = combined_image.astype(np.uint8)

plt.imshow(combined_image)
plt.axis('off')
plt.show()


In [None]:
colormap = ListedColormap(['#000000'] * 32)

combined_image = np.zeros((512,512, 3), dtype=np.uint16)

for i in range(bb.shape[2]):
    image = bb[:,:,i]

    if i == 0 or i == 1 or i == 8 or i == 9 or i == 16 or i == 17 or i == 24 or i == 25:
      colormap.colors[i] = '#2828a2'
    elif i == 2 or i == 10 or i == 18 or i == 26:
      colormap.colors[i] = '#09c5c5'
    elif i == 3 or i == 4 or i == 11 or i == 12 or i == 19 or i == 20 or i == 27 or i == 28:
      colormap.colors[i] = '#1caf1c'
    elif i == 5 or i == 6 or i == 7:
      colormap.colors[i] = '#eeee25'
    elif i == 13 or i == 14 or i == 15:
      colormap.colors[i] = '#eeee25'
    elif i == 21 or i == 22 or i == 23:
      colormap.colors[i] = '#eeee25'
    elif i == 29 or i == 30 or i == 31:
      colormap.colors[i] = '#eeee25'

    hex_color = colormap.colors[i]
    rgb_color = np.array([int(hex_color[i:i+2], 16) for i in (1, 3, 5)], dtype=np.uint16)

    mask_new = np.where(image > 0, 1, 0)

    rgb_color_tiled = np.tile(rgb_color, (512,512, 1))

    colored_image = mask_new[:, :, np.newaxis] * rgb_color_tiled

    combined_image = combined_image + colored_image

    combined_image = combined_image.astype(np.uint8)

plt.imshow(combined_image)
plt.axis('off')
plt.show()


In [42]:
def convert_to_array(data):
    image=data[0]
    bb=data[2]
    image=image
    final_array=np.concatenate((bb,image),axis=2)
    return final_array

In [43]:
train_list=list(map(convert_to_array,train_dataset))

In [44]:
# Test DataSet
test_list = list(map(convert_to_array,test_dataset))

In [48]:
image,mask,bb = zip(*train_dataset)

In [49]:
# Test DataSet
image_test,mask_test,bb_test = zip(*test_dataset)

In [53]:
drop_rate = 0.12
from tensorflow.keras.metrics import  Precision, Recall
from tensorflow import keras
def get_model(img_size, num_classes):
    inputs = keras.Input(shape=img_size + (35,))
    
    # ... (Encoder) ...
    inputs0=inputs[ :, :, :, 32:]
    inputs1=inputs[ :, :, :, :32]
    skip_connections = []  # To store feature maps from each encoder block
    # Entry block
    x = keras.layers.Conv2D(64, 3, strides=1, padding="same")(inputs0)
    x = keras.layers.Activation("relu")(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.SpatialDropout2D(drop_rate)(x)

    x = keras.layers.Conv2D(64, 3, strides=1, padding="same")(x)
    x = keras.layers.Activation("relu")(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.SpatialDropout2D(drop_rate)(x)

    skip_connections.append(x)
    
    for filters in [128,256,512,1024]: #1024
        x = keras.layers.MaxPooling2D(3, strides=2, padding="same")(x)
        x = keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = keras.layers.Activation("relu")(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.SpatialDropout2D(drop_rate)(x)
        
        x = keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = keras.layers.Activation("relu")(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.SpatialDropout2D(drop_rate)(x)
        skip_connections.append(x)

        
    skip_connections.pop()
    for filters in [512,256,128]: #512
        x = keras.layers.Conv2DTranspose(filters, 3,strides=2, padding="same")(x)
        x = keras.layers.Activation("relu")(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.SpatialDropout2D(drop_rate)(x)
        skip_connection = skip_connections.pop()
        x = keras.layers.concatenate([x, skip_connection])  
        
        x = keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = keras.layers.Activation("relu")(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.SpatialDropout2D(drop_rate)(x)
        
        x = keras.layers.Conv2D(filters, 3, padding="same")(x)
        x = keras.layers.Activation("relu")(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.SpatialDropout2D(drop_rate)(x)

    filters=64   
    x = keras.layers.Conv2DTranspose(filters, 3,strides=2, padding="same")(x)
    x = keras.layers.Activation("relu")(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.SpatialDropout2D(drop_rate)(x)
    skip_connection = skip_connections.pop()
    x = keras.layers.concatenate([x, skip_connection])  
        
    x = keras.layers.Conv2D(filters, 3, padding="same")(x)
    x = keras.layers.Activation("relu")(x)
    x = keras.layers.BatchNormalization()(x)
    x = keras.layers.SpatialDropout2D(drop_rate)(x)
        
    x = keras.layers.Conv2D(filters, 3, padding="same")(x)
    x = keras.layers.Activation("relu")(x)
    x = keras.layers.BatchNormalization()(x)

    outputs = keras.layers.Conv2D(num_classes, 1, activation="softmax", padding="same")(
        x
    )
    model = keras.Model(inputs, outputs)
    return model


In [54]:
import tensorflow as tf

def dice_loss_with_l2_regularization(target, predicted, epsilon=1e-7, l2_weight=0.1):
    intersection = tf.reduce_sum(predicted * target, axis=[1, 2]) 
    predicted_square = tf.square(predicted)
    target_square = tf.square(target)
    union = tf.reduce_sum(predicted_square, axis=[1, 2]) + tf.reduce_sum(target_square, axis=[1, 2])
    dice = (2 * intersection + epsilon) / (union + epsilon)
    mean_dice_loss = tf.reduce_mean(dice)
    
    l2_norm = tf.reduce_sum(tf.square(predicted - target), axis=[1, 2])
    l2_regularization = l2_weight * tf.reduce_mean(l2_norm)
    
    total_loss = mean_dice_loss + l2_regularization
    return total_loss


In [55]:
import tensorflow as tf

def dice_coef(target, predicted, epsilon=1e-7):
    predicted = tf.where(predicted<0.51,0.00,1.00)
    intersection = tf.reduce_sum(predicted * target, axis=[1, 2]) 
    predicted_square = tf.square(predicted)
    target_square = tf.square(target)
    union = tf.reduce_sum(predicted_square, axis=[1, 2]) + tf.reduce_sum(target_square, axis=[1, 2])
    dice = (2 * intersection + epsilon) / (union + epsilon)
    mean_dice_loss = -tf.reduce_mean(dice)
    return -mean_dice_loss

In [None]:
import random

# Define the dataset and categories
categories = [20, 58, 12, 26, 30, 24, 27, 112, 6, 28]


# Initialize the test and train datasets
train_x = [] 
valid_x = [] 
train_y = [] 
valid_y = []

# Set the random seed for reproducibility
random.seed(42)
sum = 0
# Loop through each category
for i, category_count in enumerate(categories):
    # Shuffle the images for the current category
    random.shuffle(dataset[sum:category_count])  # Shuffle only within the current category
    
    # Calculate the number of images for testing (20%)
    num_test_images = int(0.1 * category_count)
    
    # Split the images into test and train
    test_images = train_list[sum:sum + num_test_images]
    train_images = train_list[sum + num_test_images:sum + category_count]

    testy_images = mask[sum:sum + num_test_images]
    trainy_images = mask[sum + num_test_images:sum + category_count]
    
    # Update the test and train datasets
    valid_x.extend(test_images)
    train_x.extend(train_images)

    valid_y.extend(testy_images)
    train_y.extend(trainy_images)
    
    # Print the category and the number of images in the test dataset
    print(f'Category {i + 1}: {num_test_images} images in the valid dataset')
    sum+=category_count


In [61]:
 with tf.device('/CPU:0'):
    train_dataset=tf.data.Dataset.from_tensor_slices((train_x, train_y))
    valid_dataset=tf.data.Dataset.from_tensor_slices((valid_x, valid_y))

In [62]:
def cast_to_tensor(data, label):
    label=tf.cast(label, tf.float32)
    return data, label
with tf.device('/CPU:0'):
    train_dataset= train_dataset.map(cast_to_tensor)
    valid_dataset= valid_dataset.map(cast_to_tensor)

In [63]:
 train_dataset1= train_dataset.batch(2)
 valid_dataset1= valid_dataset.batch(2)

In [66]:
from sklearn.model_selection import train_test_split

test_x, nil_x, test_y, nil_y= train_test_split(test_list, mask_test, test_size=0.00001)

In [67]:
# Test DataSet
with tf.device('/CPU:0'):
    test_dataset=tf.data.Dataset.from_tensor_slices((test_x, test_y))

In [68]:
# Test DataSet
def cast_to_tensor(data, label):
    label=tf.cast(label, tf.float32)
    return data, label
with tf.device('/CPU:0'):
    test_dataset= test_dataset.map(cast_to_tensor)

In [69]:
# Test DataSet
test_dataset1= test_dataset.batch(2)

In [77]:
# Build model
import tensorflow as tf
from tensorflow.keras.metrics import  Precision, Recall
from tensorflow.keras.callbacks import Callback


class ReduceLearningRate(Callback):
    def __init__(self, monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1):
        super(ReduceLearningRate, self).__init__()
        self.monitor = monitor
        self.factor = factor
        self.patience = patience
        self.min_lr = min_lr
        self.verbose = verbose
        self.wait = 0
        self.best_loss = float('inf')
        self.reduce_lr = self.min_lr

    def on_epoch_end(self, epoch, logs=None):
        current_loss = logs.get(self.monitor)

        if current_loss is None:
            raise ValueError(f"Monitoring metric '{self.monitor}' is not available.")

        if current_loss < self.best_loss:
            self.best_loss = current_loss
            self.wait = 0
            self.reduce_lr = self.model.optimizer.learning_rate
        else:
            self.wait += 1
            if self.wait >= self.patience:
                if self.reduce_lr > self.min_lr:
                    self.reduce_lr = self.reduce_lr*self.factor
                    self.model.optimizer.learning_rate.assign(self.reduce_lr)
                    if self.verbose > 0:
                        print(f"\nEpoch {epoch + 1}: Reducing learning rate to {self.reduce_lr}")
                self.wait = 0

lr_callback = ReduceLearningRate(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)



model = get_model(img_size=(512, 512), num_classes=32)

model.compile(
    optimizer=keras.optimizers.Adam(0.0003),
    loss =dice_loss_with_l2_regularization,
     metrics = [ Precision(), Recall(),dice_coef]
)



In [None]:
#Train the model, doing validation at the end of each epoch.
history = model.fit(
    train_dataset1, 
    epochs=60, 
    validation_data=valid_dataset1, 
    callbacks=[lr_callback] 
)


In [None]:
object= test_dataset1.take(1)
def extract_image(image,label):
    return image
image=object.map(extract_image)
predictions=model.predict(image)
print("img")
print(predictions)

In [None]:
output=predictions[0,:,:,:]
output= np.where(output<0.5,0,1)

In [None]:
channel = output[:,:,2]
plt.imshow(channel, cmap="gray")
plt.show()

In [None]:
colormap = ListedColormap(['#000000'] * 32)

combined_image = np.zeros((512,512, 3), dtype=np.uint16)

for i in range(output.shape[2]):
    image = output[:,:,i]

    if i == 0 or i == 1 or i == 8 or i == 9 or i == 16 or i == 17 or i == 24 or i == 25:
      colormap.colors[i] = '#2828a2'
    elif i == 2 or i == 10 or i == 18 or i == 26:
      colormap.colors[i] = '#09c5c5'
    elif i == 3 or i == 4 or i == 11 or i == 12 or i == 19 or i == 20 or i == 27 or i == 28:
      colormap.colors[i] = '#1caf1c'
    elif i == 5 or i == 6 or i == 7:
      colormap.colors[i] = '#eeee25'
    elif i == 13 or i == 14 or i == 15:
      colormap.colors[i] = '#eeee25'
    elif i == 21 or i == 22 or i == 23:
      colormap.colors[i] = '#eeee25'
    elif i == 29 or i == 30 or i == 31:
      colormap.colors[i] = '#eeee25'

    hex_color = colormap.colors[i]
    rgb_color = np.array([int(hex_color[i:i+2], 16) for i in (1, 3, 5)], dtype=np.uint16)

    mask_new = np.where(image > 0, 1, 0)

    rgb_color_tiled = np.tile(rgb_color, (512,512, 1))

    colored_image = mask_new[:, :, np.newaxis] * rgb_color_tiled

    combined_image = combined_image + colored_image

    combined_image = combined_image.astype(np.uint8)

plt.imshow(combined_image)
plt.axis('off')
plt.show()

In [87]:
#save the weights of the model
model.save('...')