<a href="https://colab.research.google.com/github/ashpakshaikh26732/Unet-FCN/blob/main/Unet_FCN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**all packages**

In [99]:
import tensorflow as tf
import os

In [2]:
try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
  tf.config.experimental_connect_to_cluster(tpu)
  tf.tpu.experimental.initialize_tpu_system(tpu)
  strategy = tf.distribute.experimental.TPUStrategy(tpu)
  print(f'no of tpus : {strategy.num_replicas_in_sync}')
except ValueError :
  print('tpu failed to initilized')

tpu failed to initilized


**mixed precision training**

In [5]:
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**copy datasets from drive**

In [4]:
!cp /content/drive/MyDrive/Cityscapes/gtFine_trainvaltest.zip /content/
!cp /content/drive/MyDrive/Cityscapes/leftImg8bit_trainvaltest.zip /content

**extract trainig the data**

In [6]:
os.system("unzip -q /content/gtFine_trainvaltest.zip -d /content/ ")
os.system("unzip -q /content/leftImg8bit_trainvaltest.zip -d /content/")

256

In [7]:
print("Train Images:", len(os.listdir("/content/leftImg8bit/train/aachen")))  # Adjust path as needed
print("Train Labels:", len(os.listdir("/content/gtFine/train/aachen")))

Train Images: 174
Train Labels: 696


In [8]:
print("test images : ", len(os.listdir('/content/leftImg8bit/test')))

test images :  6


**Global Valrible**

In [9]:
img_height = 224
img_width = 224
batch_size = 16
# final_batch_size = batch_size * strategy.num_replicas_in_sync

**loading images and labels from directry**

In [10]:
def load_img():
    train_dir = '/content/leftImg8bit/train'
    train_image_cities = os.listdir(train_dir)
    train_images = []

    for city in train_image_cities:
        train_city = os.path.join(train_dir, city)
        images = [os.path.join(train_city, img) for img in os.listdir(train_city) if img.endswith('.png')]
        train_images.extend(images)

    return train_images

def load_labels():
    train_dir = '/content/gtFine/train'
    train_labels_cities = os.listdir(train_dir)
    train_labels = []
    for city in train_labels_cities:
        train_city_label = os.path.join(train_dir, city)
        labels = [f for f in os.listdir(train_city_label) if f.endswith('_gtFine_labelIds.png')]
        for label in labels:
            label_path = os.path.join(train_city_label, label)
            train_labels.append(label_path)
    return train_labels

**reading images from path**

In [22]:
def read_img(img_path):
    img = tf.io.read_file(img_path)
    img = tf.io.decode_png(img, channels=3)
    img = tf.image.resize(img, (img_height, img_width))
    img = img / 255.0
    return img

def read_labels(label_path):
    label = tf.io.read_file(label_path)
    label = tf.io.decode_png(label, channels=1)
    label = tf.image.resize(label, (img_height, img_width), method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    return tf.cast(label, tf.uint8)


**augmentation**

In [23]:
def augment(image, label):

    label = tf.cast(label, tf.float32)

    if tf.random.uniform(()) > 0.5:
        image = tf.image.flip_left_right(image)
        label = tf.image.flip_left_right(label)

    image = tf.image.random_brightness(image, max_delta=0.1)

    combined = tf.concat([image, label], axis=-1)
    combined = tf.image.random_crop(combined, size=[224, 224, 6])

    image, label = tf.split(combined, num_or_size_splits=2, axis=-1)


    label = tf.cast(label, tf.uint8)

    return image, label


**creating dataset**

In [24]:
images = load_img()
labels = load_labels()
dataset = tf.data.Dataset.from_tensor_slices((images, labels))
dataset = dataset.map(lambda image_path , label_path :( read_img(image_path) , label_path)).map(lambda image , label_path : (image,read_labels(label_path=label_path)))
dataset.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.shuffle(buffer_size=1000).cache().batch(batch_size)
dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

In [25]:
# for image , label in dataset.take(1):
#   print(f'image shape : {image.shape} , label shape {label.shape}')
#   print(f'image datatype : {image.dtype}, label dtype {label.dtype}')

**FCN model**

In [73]:
class Block(tf.keras.models.Model):
    def __init__(self, n_conv, filters, kernel_size, activation, pool_size, pool_stride, block_name):
        super().__init__()
        self.conv_layers = [
            tf.keras.layers.Conv2D(filters=filters, kernel_size=kernel_size, padding='same',
                                   activation=activation, name=f"{block_name}_conv{i+1}")
            for i in range(n_conv)
        ]
        self.pool = tf.keras.layers.MaxPool2D(pool_size=pool_size, strides=pool_stride, name=f"{block_name}_pool")

    def call(self, inputs):
        x = inputs
        for conv in self.conv_layers:
            x = conv(x)
        x = self.pool(x)
        return x

In [74]:
class Encoder(tf.keras.models.Model):
  def __init__(self):
    super().__init__()
    self.block1 = Block(n_conv=2 , filters=64 , kernel_size = (3,3) , activation ='relu' , pool_size=(2,2) , pool_stride = (2,2) , block_name = 'block1')
    self.block2 = Block(n_conv=3 , filters = 128 , kernel_size = (3,3) , activation='relu' , pool_size = (2,2) , pool_stride = (2,2) , block_name = 'block2')
    self.block3 = Block(n_conv=3 , filters = 256 , kernel_size = (3,3) , activation='relu' , pool_size = (2,2) , pool_stride = (2,2) , block_name = 'block3')
    self.block4 = Block(n_conv=3 , filters = 512 , kernel_size = (3,3) , activation='relu' , pool_size = (2,2) , pool_stride = (2,2) , block_name = 'block4')
    self.block5 = Block(n_conv=3 , filters = 512 , kernel_size = (3,3) , activation='relu' , pool_size = (2,2) , pool_stride = (2,2) , block_name = 'block5')
  def call(self,inputs):
    p1 = self.block1(inputs)
    p2 = self.block2(p1)
    p3 = self.block3(p2)
    p4 = self.block4(p3)
    p5 = self.block5(p4)
    return p1 , p2 , p3 , p4 , p5

In [75]:
class Decoder(tf.keras.models.Model):
    def __init__(self, nclasses=19):
        super().__init__()


        self.upsample1 = tf.keras.layers.Conv2DTranspose(nclasses, kernel_size=(4, 4), strides=(2, 2), padding='same', use_bias=False)
        self.upsample2 = tf.keras.layers.Conv2DTranspose(nclasses, kernel_size=(4, 4), strides=(2, 2), padding='same', use_bias=False)
        self.upsample3 = tf.keras.layers.Conv2DTranspose(nclasses, kernel_size=(4, 4), strides=(2, 2), padding='same', use_bias=False)


        self.skip1 = tf.keras.layers.Conv2D(nclasses, kernel_size=(1, 1), activation='relu', padding='same')
        self.skip2 = tf.keras.layers.Conv2D(nclasses, kernel_size=(1, 1), activation='relu', padding='same')
        self.skip3 = tf.keras.layers.Conv2D(nclasses, kernel_size=(1, 1), activation='relu', padding='same')


        self.add1 = tf.keras.layers.Add()
        self.add2 = tf.keras.layers.Add()
        self.add3 = tf.keras.layers.Add()


        self.fcn32 = tf.keras.layers.Conv2DTranspose(nclasses, kernel_size=(32, 32), strides=(32, 32), padding='same', name='fcn32')
        self.fcn16 = tf.keras.layers.Conv2DTranspose(nclasses, kernel_size=(16, 16), strides=(16, 16), padding='same', name='fcn16')
        self.fcn8 = tf.keras.layers.Conv2DTranspose(nclasses, kernel_size=(8, 8), strides=(8, 8), padding='same', name='fcn8')
        self.fcn4 = tf.keras.layers.Conv2DTranspose(nclasses, kernel_size=(4, 4), strides=(4, 4), padding='same', name='fcn4')


        self.softmax = tf.keras.layers.Softmax(axis=-1)

    def call(self, inputs):
        p1, p2, p3, p4, p5 = inputs

        fcn32 = self.fcn32(p5)


        o = self.upsample1(p5)
        o = self.add1([o, self.skip1(p4)])
        fcn16 = self.fcn16(o)


        o = self.upsample2(o)
        o = self.add2([o, self.skip2(p3)])
        fcn8 = self.fcn8(o)


        o = self.upsample3(o)
        o = self.add3([o, self.skip3(p2)])
        fcn4 = self.fcn4(o)


        return self.softmax(fcn32), self.softmax(fcn16), self.softmax(fcn8), self.softmax(fcn4)

In [93]:
class FCN(tf.keras.models.Model):
  def __init__(self):
    super().__init__()
    self.Encoder = Encoder()
    self.Decoder = Decoder()
  def call(self, inputs):
    x=self.Encoder(inputs)
    x = self.Decoder(x)
    return x

**UNET**

In [77]:
class ConvBlock(tf.keras.models.Model):
  def __init__(self, n_filters,block_name , kernel_size=3 , strides = (1,1) , activation='relu'):
    super().__init__()
    self.conv = [tf.keras.layers.Conv2D(filters=n_filters , strides=strides , kernel_size=(kernel_size, kernel_size) , activation='relu', name=f"{block_name}_conv{i+1}", padding='same') for i in range (2)]
  def call(self, inputs):
    x = inputs
    for layer in self.conv:
      x = layer(x)
    return x

In [78]:
class EncoderBlock(tf.keras.models.Model):
  def __init__(self, n_filters , pool_size , dropout,block_name):
    super().__init__()
    self.convBlock = ConvBlock(n_filters=n_filters , block_name=block_name, strides=(1,1))
    self.pool = tf.keras.layers.MaxPooling2D(pool_size=pool_size)
    self.dropout = tf.keras.layers.Dropout(rate = dropout)
  def call (self, inputs) :
    f = self.convBlock(inputs)
    p = self.pool(f)
    p = self.dropout(p)
    return f , p

In [79]:
class Encoder (tf.keras.models.Model):
  def __init__(self):
    super().__init__()
    self.encoder1 = EncoderBlock(64 , pool_size=(2,2),dropout=0.3 , block_name = 'block1')
    self.encoder2 = EncoderBlock(128 , pool_size=(2,2),dropout=0.3 , block_name = 'block2')
    self.encoder3 = EncoderBlock(256 , pool_size=(2,2),dropout=0.3 , block_name = 'block3')
    self.encoder4 = EncoderBlock(512, pool_size=(2,2),dropout=0.3 , block_name = 'block4')
  def call(self, inputs):
    f1,p1=self.encoder1(inputs)
    f2,p2 = self.encoder2(p1)
    f3 , p3 = self.encoder3(p2)
    f4,p4 = self.encoder4(p3)
    return p4,(f1,f2,f3,f4)

In [80]:
class BottleNeck(tf.keras.models.Model):
  def __init__(self):
    super().__init__()
    self.convBlock=ConvBlock(1024, block_name = 'bottleneck' )
  def call(self, inputs):
    x = self.convBlock(inputs)
    return x

In [81]:
class DecoderBlock(tf.keras.models.Model):
  def __init__(self, n_filters ,dropout,block_name , kernel_size= (3,3),strides = (2,2)  ):
    super().__init__()
    self.convTranspose = tf.keras.layers.Conv2DTranspose(n_filters, kernel_size = kernel_size , strides=strides , padding = 'same')
    self.concat = tf.keras.layers.Concatenate()
    self.DropOut = tf.keras.layers.Dropout(dropout)
    self.convBlock = ConvBlock(n_filters=n_filters , block_name =block_name  )
  def call(self, inputs , convOutput):
    u=self.convTranspose(inputs)
    c = self.concat([u,convOutput])
    c= self.DropOut(c)
    c = self.convBlock(c)
    return c

In [82]:
class Decoder (tf.keras.models.Model):
  def __init__(self) :
    super().__init__()
    self.Decoder1 = DecoderBlock(n_filters=512 , dropout=0.3 , block_name='Decoder1')
    self.Decoder2 = DecoderBlock(n_filters=256 , dropout=0.3 , block_name='Decoder2')
    self.Decoder3 = DecoderBlock(n_filters=128 , dropout=0.3 , block_name='Decoder3')
    self.Decoder4 = DecoderBlock(n_filters=64 , dropout=0.3 , block_name='Decoder4')
    self.final_conv = tf.keras.layers.Conv2D(19, kernel_size=(1,1), activation='softmax')
  def call(self , inputs , conv):
    f1, f2 , f3, f4 = conv
    c6 = self.Decoder1(inputs, f4 )
    c7 = self.Decoder2(c6,f3)
    c8 = self.Decoder3(c7,f2)
    c9= self.Decoder4(c8, f1)
    outputs = self.final_conv(c9)
    return outputs


In [83]:
class  Unet (tf.keras.models.Model):
  def __init__(self):
    super().__init__()
    self.Encoder = Encoder()
    self.BottleNeck = BottleNeck()
    self.Decoder = Decoder()
  def call(self , inputs):
    encoder_output,convs = self.Encoder(inputs)
    bottleNeck_output = self.BottleNeck(encoder_output)
    output = self.Decoder(bottleNeck_output,convs)
    return output

***UNET ++***

In [84]:
class Unet_plus_plus_convolutional_block(tf.keras.models.Model):
  def __init__(self, n_filters , block_name , activation = 'relu',kernel_size=(3,3)  ):
    super().__init__()
    self.conv_layers = [tf.keras.layers.Conv2D(filters=n_filters , kernel_size= kernel_size , name=f"{block_name}_conv{i+1}",padding = 'same', activation='relu') for i in range (2)]
  def call(self, inputs) :
    x = inputs
    for layer in self.conv_layers:
      x=layer(x)
    return x

In [85]:
class Encoder_Block_for_unt_plus_plus(tf.keras.models.Model):
  def __init__(self,  n_filters , pool_size , dropout,block_name):
    super().__init__()
    self.conv = Unet_plus_plus_convolutional_block(n_filters=n_filters, block_name=block_name)
    self.pool = tf.keras.layers.MaxPooling2D(pool_size=pool_size)
    self.dropout = tf.keras.layers.Dropout(rate =dropout)
  def call(self,inputs):
    f=self.conv(inputs)
    p = self.pool(f)
    p = self.dropout(p)
    return f ,  p

In [86]:
class Encoder_for_unt_plus_plus(tf.keras.models.Model):
  def __init__(self):
    super().__init__()
    self.encoder1 = Encoder_Block_for_unt_plus_plus(n_filters=64, pool_size=(2,2) , dropout=0.3, block_name = 'encoder1')
    self.encoder2 = Encoder_Block_for_unt_plus_plus(n_filters=128, pool_size=(2,2) , dropout=0.3, block_name = 'encoder2')
    self.encoder3 = Encoder_Block_for_unt_plus_plus(n_filters=256, pool_size=(2,2) , dropout=0.3, block_name = 'encoder3')
    self.encoder4 = Encoder_Block_for_unt_plus_plus(n_filters=512, pool_size=(2,2) , dropout=0.3, block_name = 'encoder4')
  def call(self, inputs):
    f1,p1 = self.encoder1(inputs)
    f2,p2 = self.encoder2(p1)
    f3,p3 = self.encoder3(p2)
    f4,p4 = self.encoder4(p3)
    return (f1,f2,f3,f4),(p1,p2,p3,p4)

In [87]:
class BottleNeck_for_unet_plus_plus(tf.keras.models.Model):
  def __init__(self ):
    super().__init__()
    self.bottle_neck = Unet_plus_plus_convolutional_block(n_filters=1024 , block_name = 'bottle_neck'  )
  def call(self, inputs):
    x = self.bottle_neck(inputs)
    return x

In [88]:
class Decoder_Block_for_unet_plus_plus(tf.keras.models.Model):
  def __init__(self,n_filters  , dropout,block_name):
    super().__init__()
    self.conv = Unet_plus_plus_convolutional_block(n_filters=n_filters , block_name = block_name)
    self.dropout = tf.keras.layers.Dropout(rate = dropout)
    self.convTranspose = tf.keras.layers.Conv2DTranspose(n_filters, kernel_size = (3,3) , strides=(2,2) , padding = 'same')
    self.concat = tf.keras.layers.Concatenate()

  def call(self, conv_output_conc, inputs):




      if inputs.shape[1] <= 1 or inputs.shape[2] <= 1:

          target_size = (max(conv_output_conc.shape[1], 1), max(conv_output_conc.shape[2], 1))
          u = tf.image.resize(inputs, target_size)
      else:
          u = self.convTranspose(inputs)


      if u.shape[1:3] != conv_output_conc.shape[1:3]:

          if u.shape[1] * u.shape[2] < conv_output_conc.shape[1] * conv_output_conc.shape[2]:
              u = tf.image.resize(u, (conv_output_conc.shape[1], conv_output_conc.shape[2]))
          else:
              conv_output_conc = tf.image.resize(conv_output_conc, (u.shape[1], u.shape[2]))

      c = self.concat([u, conv_output_conc])
      c = self.dropout(c)
      c = self.conv(c)
      return c

In [89]:
class Decoder_for_unet_plus_plus(tf.keras.models.Model):
  def __init__(self,num_classes = 19):
    super().__init__()
    self.decoder11 = Decoder_Block_for_unet_plus_plus(n_filters=64  , dropout = 0.3, block_name = 'skip_conv_block_11')
    self.decoder12 = Decoder_Block_for_unet_plus_plus(n_filters=64  , dropout = 0.3, block_name = 'skip_conv_block_12')
    self.decoder13 = Decoder_Block_for_unet_plus_plus(n_filters=64  , dropout = 0.3, block_name = 'skip_conv_block_13')
    self.decoder14 = Decoder_Block_for_unet_plus_plus(n_filters=64  , dropout = 0.3, block_name = 'skip_conv_block_14')
    self.decoder21 = Decoder_Block_for_unet_plus_plus(n_filters=128  , dropout = 0.3, block_name = 'skip_conv_block_21')
    self.decoder22 = Decoder_Block_for_unet_plus_plus(n_filters=128  , dropout = 0.3, block_name = 'skip_conv_block_22')
    self.decoder23 = Decoder_Block_for_unet_plus_plus(n_filters=128  , dropout = 0.3, block_name = 'skip_conv_block_23')
    self.decoder31 = Decoder_Block_for_unet_plus_plus(n_filters=256  , dropout = 0.3, block_name = 'skip_conv_block_31')
    self.decoder32 = Decoder_Block_for_unet_plus_plus(n_filters=256  , dropout = 0.3, block_name = 'skip_conv_block_32')
    self.decoder41 = Decoder_Block_for_unet_plus_plus(n_filters=512  , dropout = 0.3, block_name = 'skip_conv_block_41')

    self.concat12=tf.keras.layers.Concatenate()
    self.concat13=tf.keras.layers.Concatenate()
    self.concat14=tf.keras.layers.Concatenate()

    self.concat22=tf.keras.layers.Concatenate()
    self.concat23=tf.keras.layers.Concatenate()

    self.concat32=tf.keras.layers.Concatenate()
    self.bottle_neck = BottleNeck_for_unet_plus_plus()
    self.output_layer14 = tf.keras.layers.Conv2D(
        filters=num_classes,
        kernel_size=(1,1),
        activation='sigmoid' if num_classes == 1 else 'softmax',
        padding='same',
        name='output_layer'
        )
    self.output_layer13 = tf.keras.layers.Conv2D(
        filters=num_classes,
        kernel_size=(1,1),
        activation='sigmoid' if num_classes == 1 else 'softmax',
        padding='same',
        name='output_layer'
        )
    self.output_layer12 = tf.keras.layers.Conv2D(
        filters=num_classes,
        kernel_size=(1,1),
        activation='sigmoid' if num_classes == 1 else 'softmax',
        padding='same',
        name='output_layer'
        )
    self.output_layer11 = tf.keras.layers.Conv2D(
        filters=num_classes,
        kernel_size=(1,1),
        activation='sigmoid' if num_classes == 1 else 'softmax',
        padding='same',
        name='output_layer'
        )

  def call(self, convs_cont , pool_cont):
    f1,f2,f3,f4 = convs_cont
    p1,p2,p3,p4 = pool_cont
    for i, feat in enumerate([f1, f2, f3, f4, p1, p2, p3, p4]):
        if feat.shape[1] < 1 or feat.shape[2] < 1:
            print(f"Warning: Feature map {i} has invalid dimensions: {feat.shape}")
    p5 = self.bottle_neck(p4)
    decoder41 = self.decoder41(f4,p5)
    decoder31 = self.decoder31(f3,p4)
    conc32 = self.concat32([f3 , decoder31])
    decoder32 = self.decoder32(conc32 , decoder41)
    decoder21 = self.decoder21(f2,p3)
    conc22 = self.concat22([f2,decoder21])
    decoder22 = self.decoder22(conc22 , decoder31)
    conc23 = self.concat23([f2,decoder21, decoder22])
    decoder23 = self.decoder23(conc23 , decoder32)
    decoder11 = self.decoder11(f1,p2)
    conc12 = self.concat12([f1,decoder11])
    decoder12 = self.decoder12(conc12 , decoder21)
    conc13 = self.concat13([f1,decoder11,decoder12])
    decoder13 = self.decoder13(conc13 , decoder22)
    conc14 = self.concat14([f1 , decoder11  , decoder12 , decoder13])
    decoder14 = self.decoder14(conc14 , decoder23)
    output14 = self.output_layer14(decoder14)
    output13 = self.output_layer13(decoder13)
    output12 = self.output_layer12(decoder12)
    output11 = self.output_layer11(decoder11)
    return output11,output12,output13,output14

In [90]:
class Unet_plus_plus(tf.keras.models.Model):
    def __init__(self, min_input_size=16):
        super().__init__()
        self.min_input_size = min_input_size
        self.encoder = Encoder_for_unt_plus_plus()
        self.decoder = Decoder_for_unet_plus_plus()

    def call(self, inputs):

        input_shape = inputs.shape
        if input_shape[1] < self.min_input_size or input_shape[2] < self.min_input_size:
            raise ValueError(f"Input image dimensions must be at least {self.min_input_size}x{self.min_input_size}")

        y, z = self.encoder(inputs)
        output11, output12, output13, output14 = self.decoder(y, z)
        return output11, output12, output13, output14

In [91]:
model122 = Unet_plus_plus()

In [45]:
model122.summary()

**testing code for model**

In [46]:
import tensorflow as tf
import numpy as np

input_shape = (1, 256,256, 3)
random_input = tf.random.normal(input_shape)

# Initialize model


# Run forward pass
outputs = model122(random_input)

# Print output shapes
output_names = ["fcn32", "fcn16", "fcn8", "fcn4", "fcn2"]
for name, out in zip(output_names, outputs):
    print(f"{name} output shape: {out.shape}")


fcn32 output shape: (1, 256, 256, 19)
fcn16 output shape: (1, 256, 256, 19)
fcn8 output shape: (1, 256, 256, 19)
fcn4 output shape: (1, 256, 256, 19)


In [47]:
model122.summary()


In [48]:
# model122.save('model.keras')

In [49]:
# from google.colab import files

In [50]:
# files.download('model.h5')

In [51]:
# files.download('model.keras')

**Initilizating instaces of the models**

In [94]:
Fcn =FCN()
Unet = Unet()
Unet_plus_plus = Unet_plus_plus()

**defining loss**

In [None]:
import tensorflow as tf

with strategy.scope():
    class DiceLoss(tf.keras.losses.Loss):
        def __init__(self, smooth=1e-6):
            super().__init__()
            self.smooth = smooth

        def call(self, y_true, y_pred):

            y_true_f = tf.keras.backend.flatten(tf.one_hot(tf.cast(y_true , tf.int32), depth=tf.shape(y_pred)[-1]))
            y_pred_f = tf.keras.backend.flatten((y_pred))

            intersection = tf.reduce_sum(y_true_f * y_pred_f)
            denominator = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f)

            dice_loss = 1 - (2.0 * intersection + self.smooth) / (denominator + self.smooth)
            return dice_loss

with strategy.scope():
    class SemanticSegmentationLoss(tf.keras.losses.Loss):
        def __init__(self , alpha = 1.0 , beta = 1.0):
            super().__init__()
            self.dice_loss = DiceLoss()
            self.alpha = alpha
            self.beta = beta

        def call(self, y_true, y_pred):

            ce = tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)
            dice_loss = self.dice_loss(y_true, y_pred)
            return self.alpha * ce +self.beta * dice_loss


In [None]:
with strategy.scope():
  class DeepSupervisionLoss(tf.keras.losses.Loss):
    def __init__(self, weights=None, smooth=1e-6):
      super().__init__()
      self.smooth = smooth
      self.weights = weights if weights else [1.0]
      self.ce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

    def dice_loss(self, y_true, y_pred):
      y_true_f = tf.keras.backend.flatten(tf.one_hot(tf.cast(y_true, tf.int32), depth=tf.shape(y_pred)[-1]))
      y_pred_f = tf.keras.backend.flatten((y_pred))
      intersection = tf.reduce_sum(y_true_f * y_pred_f)
      denominator = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f)
      return 1 - (2.0 * intersection + self.smooth) / (denominator + self.smooth)

    def call(self, y_true, y_pred):
      total_loss = 0.0
      num_outputs = len(y_pred)
      for i in range(num_outputs):
        ce = self.ce_loss(y_true, y_pred[i])
        dice = self.dice_loss(y_true, y_pred[i])
        total_loss += self.weights[i] * (ce + dice)
      return total_loss


NameError: name 'strategy' is not defined

In [None]:
with strategy.scope():
    optimizer = mixed_precision.LossScaleOptimizer(tf.keras.optimizers.Adam(learning_rate=1e-5), dynamic=True)

**IOU metric**

In [11]:
class IOUMetric(tf.keras.metrics.Metric):
    def __init__(self, n_classes=19, smooth=1e-6, name="iou", **kwargs):
        super().__init__(name=name, **kwargs)
        self.n_classes = n_classes
        self.smooth = smooth
        self.iou_sum = self.add_weight(name="iou_sum", initializer="zeros")
        self.count = self.add_weight(name="count", initializer="zeros")
    @tf.function
    def update_state(self, y_true, y_pred, sample_weight=None):

        y_true = tf.one_hot(tf.cast(y_true, tf.int32), depth=self.n_classes)
        # y_pred = tf.nn.softmax(y_pred, axis=-1)


        intersection = tf.reduce_sum(y_true * y_pred, axis=[0, 1, 2])
        union = tf.reduce_sum(y_true, axis=[0, 1, 2]) + tf.reduce_sum(y_pred, axis=[0, 1, 2]) - intersection


        iou_per_class = (intersection + self.smooth) / (union + self.smooth)


        valid_classes = tf.cast(union > 0, tf.float32)
        num_valid_classes = tf.reduce_sum(valid_classes)


        mean_iou = tf.reduce_sum(iou_per_class * valid_classes) / tf.maximum(num_valid_classes, 1.0)


        self.iou_sum.assign_add(mean_iou)
        self.count.assign_add(1)
    @tf.function
    def result(self):
        return self.iou_sum / tf.maximum(self.count, 1.0)

    def reset_state(self):
        self.iou_sum.assign(0)
        self.count.assign(0)

    def get_config(self):
        config = super().get_config()
        config.update({"n_classes": self.n_classes, "smooth": self.smooth})
        return config


**per class IOU metrics**

In [12]:
class PerClassIOUMetric(tf.keras.metrics.Metric):
    def __init__(self, n_classes=19, smooth=1e-6, name="iou_per_class", **kwargs):
        super().__init__(name=name, **kwargs)
        self.n_classes = n_classes
        self.smooth = smooth
        self.sum_union = self.add_weight(name="sum_union", shape=(n_classes,), initializer="zeros")
        self.sum_intersection = self.add_weight(name="sum_intersection", shape=(n_classes,), initializer="zeros")
    @tf.function
    def update_state(self, y_true, y_pred, sample_weight=None):

        y_true = tf.one_hot(tf.cast(y_true, dtype=tf.int32), depth=self.n_classes)


        intersection = tf.reduce_sum(y_true * y_pred, axis=[0, 1, 2])
        union = tf.reduce_sum(y_true, axis=[0, 1, 2]) + tf.reduce_sum(y_pred, axis=[0, 1, 2]) - intersection


        self.sum_union.assign_add(union)
        self.sum_intersection.assign_add(intersection)
    @tf.function
    def result(self):
        return (self.sum_intersection + self.smooth) / (self.sum_union + self.smooth)

    def reset_state(self):
        self.sum_intersection.assign(tf.zeros_like(self.sum_intersection))
        self.sum_union.assign(tf.zeros_like(self.sum_union))


**Dice Coeficient Metrics**

In [13]:
class Dice_Coefficent_matrics(tf.keras.metrics.Metric):
  def __init__(self, n_classes = 19 , smooth = 1e-6 , name = 'dice coefficent' , **kwargs):
    super().__init__(name = name , **kwargs)
    self.n_classes = n_classes
    self.smooth = smooth
    self.sum_dice_coefficent =self.add_weight(name="sum_dice_coeffient", initializer="zeros")
    self.count =self.add_weight(name="count", initializer="zeros")
  @tf.function
  def update_state(self, y_true , y_pred , sample_weight = None):
    y_true = tf.one_hot(tf.cast(y_true , dtype= tf.int32) , depth=self.n_classes)

    intersection = tf.reduce_sum (y_true * y_pred , axis = [0,1,2])
    total_area = tf.reduce_sum(y_pred, axis=[0,1,2])  + tf.reduce_sum(y_true , axis=[0,1,2])

    dice_score = 2 * (intersection+ self.smooth)/(total_area +self.smooth)
    valid_classes = tf.cast(total_area > 0, tf.float32)
    num_valid_classes = tf.reduce_sum(valid_classes)

    mean_dice = tf.reduce_sum(dice_score * valid_classes) / tf.maximum(num_valid_classes, 1.0)

    self.sum_dice_coefficent.assign_add(dice_score)
    self.count.assign_add(1)
  @tf.function
  def result(self ):
    return self.sum_dice_coefficent/tf.maximum(self.count, 1.0)
  def reset_state(self):
    self.sum_dice_coefficent.assign(0)
    self.count.assign(0)

**per class_dice_coefficent metrics**

In [14]:
class PerClassDiceCoefficient(tf.keras.metrics.Metric):
    def __init__(self, n_classes=19, smooth=1e-6, name="dice_per_class", **kwargs):
        super().__init__(name=name, **kwargs)
        self.n_classes = n_classes
        self.smooth = smooth
        self.sum_intersection = self.add_weight(name="sum_intersection", shape=(n_classes,), initializer="zeros")
        self.sum_union = self.add_weight(name="sum_union", shape=(n_classes,), initializer="zeros")
    @tf.function
    def update_state(self, y_true, y_pred, sample_weight=None):

        y_true = tf.one_hot(tf.cast(y_true, dtype=tf.int32), depth=self.n_classes)

        intersection = tf.reduce_sum(y_true * y_pred, axis=[0, 1, 2])
        sum_areas = tf.reduce_sum(y_true, axis=[0, 1, 2]) + tf.reduce_sum(y_pred, axis=[0, 1, 2])

        self.sum_intersection.assign_add(intersection)
        self.sum_union.assign_add(sum_areas)
    @tf.function
    def result(self):
        return (2.0 * self.sum_intersection + self.smooth) / (self.sum_union + self.smooth)

    def reset_state(self):
        self.sum_intersection.assign(tf.zeros_like(self.sum_intersection))
        self.sum_union.assign(tf.zeros_like(self.sum_union))


**Pixel Accurcy Metrics**

In [15]:
class Pixel_accurcy_metrics(tf.keras.metrics.Metric):
  def __init__(self, n_classes = 19  , name = 'pixel_accurcy_metrics' , **kwargs):
    super().__init__(name = name , **kwargs)
    self.n_classes = n_classes
    self.sum_pixel_wise_accurcy = self.add_weight(name="sum_pixel_wise_accurcy", initializer="zeros")
    self.count = self.add_weight(name="count", initializer="zeros")
  @tf.function
  def update_state(self, y_true , y_pred  , sample_weight  = None) :
    y_true = tf.one_hot(tf.cast(y_true , dtype=tf.int32) , depth = self.n_classes)
    y_pred = tf.one_hot(tf.argmax(y_pred , axis=-1) , depth = self.n_classes)
    correct_pixel = tf.reduce_sum(y_true * y_pred)
    total_pixel = tf.reduce_prod(tf.shape(y_true)[:-1])
    pixel_accuracy = (correct_pixel)/tf.maximum(total_pixel,1.0)
    self.sum_pixel_wise_accurcy.assign_add(pixel_accuracy)
    self.count.assign_add(1)
  @tf.function
  def result(self):
    return self.sum_pixel_wise_accurcy/tf.maximum(self.count, 1.0)
  def reset_state(self):
    self.sum_pixel_wise_accurcy.assign(0)
    self.count.assign(0)

**optimizer**

In [18]:
with strategy.scope():
  optimizer_fcn = tf.optimizers.AdamW(learning_rate=1e-4 , weight_decay=1e-5)
  optimizer_unet = tf.optimizers.AdamW(learning_rate=1e-4 , weight_decay=1e-5)
  optimizer_unet_plus_plus = tf.optimizers.AdamW(learning_rate=1e-4 , weight_decay=1e-5)

NameError: name 'strategy' is not defined

In [95]:
optimizer_fcn = tf.optimizers.AdamW(learning_rate=1e-4 , weight_decay=1e-5)
optimizer_unet = tf.optimizers.AdamW(learning_rate=1e-4 , weight_decay=1e-5)
optimizer_unet_plus_plus = tf.optimizers.AdamW(learning_rate=1e-4 , weight_decay=1e-5)

**checkPoint dirs**

In [96]:
checkpoint_dir_fcn = '/content/drive/MyDrive/CityScapes_model_checkpoints/fcn'
checkpoint_dir_unet = '/content/drive/MyDrive/CityScapes_model_checkpoints/unet'
checkpoint_dir_unet_plus_plus = '/content/drive/MyDrive/CityScapes_model_checkpoints/unet_plus_plus'

**custom callbacks**

In [97]:
class Custom_checkpoint_Callabck(tf.keras.callbacks.Callback):
  def __init__(self, checkpoint_dir_fcn , checkpoint_dir_unet , checkpoint_dir_unet_plus_plus , fcn , unet , unet_plus_plus , optimizer_fcn ,optimizer_unet ,optimizer_unet_plus_plus  ) :
    super().__init__()
    self.checkpoint_dir_fcn = checkpoint_dir_fcn
    self.checkpoint_dir_unet = checkpoint_dir_unet
    self.checkpoint_dir_unet_plus_plus = checkpoint_dir_unet_plus_plus
    self.checkpoint_fcn = tf.train.Checkpoint(
        model = fcn ,
        optimizer = optimizer_fcn,
        epoch = tf.Variable(0 , dtype = tf.int64)
    )
    self.checkpoint_manager_for_fcn = tf.train.CheckpointManager(
        self.checkpoint_fcn ,
        self.checkpoint_dir_fcn ,
        max_to_keep=3
    )
    self.checkpoint_unet = tf.train.Checkpoint(
        model = unet ,
        optimizer = optimizer_unet,
        epoch = tf.Variable(0 , dtype = tf.int64)
    )
    self.checkpoint_manager_for_unet = tf.train.CheckpointManager(
        self.checkpoint_unet ,
        self.checkpoint_dir_unet ,
        max_to_keep=3
    )
    self.checkpoint_unet_plus_plus = tf.train.Checkpoint(
        model = unet_plus_plus ,
        optimizer = optimizer_unet_plus_plus,
        epoch = tf.Variable(0 , dtype = tf.int64)
    )
    self.checkpoint_manager_for_unet_plus_plus = tf.train.CheckpointManager(
        self.checkpoint_unet_plus_plus ,
        self.checkpoint_dir_unet_plus_plus ,
        max_to_keep=3
    )
  def on_epoch_end(self , epoch, logs = None):
    self.checkpoint_fcn.epoch.assign(epoch+1)
    self.checkpoint_manager_for_fcn.save()
    print(f"\nCheckpoint saved for epoch {epoch + 1} at {self.checkpoint_manager_for_fcn.latest_checkpoint} for fcn model")
    self.checkpoint_unet.epoch.assign(epoch+1)
    self.checkpoint_manager_for_unet.save()
    print(f"\nCheckpoint saved for epoch {epoch + 1} at {self.checkpoint_manager_for_unet.latest_checkpoint} for unet model")
    self.checkpoint_unet_plus_plus.epoch.assign(epoch+1)
    self.checkpoint_manager_for_unet_plus_plus.save()
    print(f"\nCheckpoint saved for epoch {epoch + 1} at {self.checkpoint_manager_for_unet_plus_plus.latest_checkpoint} for unet_plus_plus model")
  def load_latest_model(self):
    if self.checkpoint_manager_for_fcn.latest_checkpoint and self.checkpoint_manager_for_unet.latest_checkpoint and self.checkpoint_manager_for_unet_plus_plus.latest_checkpoint :

      self.checkpoint_fcn.restore(self.checkpoint_manager_for_fcn.latest_checkpoint)
      self.checkpoint_unet.restore(self.checkpoint_manager_for_unet.latest_checkpoint)
      self.checkpoint_unet_plus_plus.restore(self.checkpoint_manager_for_unet_plus_plus.latest_checkpoint)
      start_epoch =self.checkpoint_fcn.epoch.numpy()
      print(f"Restored from checkpoint: {self.checkpoint_manager_for_fcn.latest_checkpoint}, "
                  f"resuming from epoch {start_epoch}")
      print(f"Restored from checkpoint: {self.checkpoint_manager_for_unet.latest_checkpoint}, "
                  f"resuming from epoch {start_epoch}")
      print(f"Restored from checkpoint: {self.checkpoint_manager_for_unet_plus_plus.latest_checkpoint}, "
                  f"resuming from epoch {start_epoch}")
      return start_epoch
    else :
      print('no checkpoint found starting from epoch 0')
      return 0


In [98]:
Custom_checkpoint_Callabck = Custom_checkpoint_Callabck(checkpoint_dir_fcn=checkpoint_dir_fcn , checkpoint_dir_unet=checkpoint_dir_unet , checkpoint_dir_unet_plus_plus=checkpoint_dir_unet_plus_plus , fcn=Fcn , unet=Unet , unet_plus_plus=Unet_plus_plus ,optimizer_fcn=optimizer_fcn,optimizer_unet=optimizer_unet,optimizer_unet_plus_plus=optimizer_unet_plus_plus)

In [100]:
class custom_early_stoping_callback(tf.keras.callbacks.Callback):
  def __init__(self, patiance = 15 , min_delta=1e-6):
    self.patiance = patiance
    self.min_delta =min_delta
    self.best_loss = float('inf')
    self.wait = 0
    self.stoped_epoch = 0
  def early_stoping(self, epoch , val_loss):
    if val_loss < self.best_loss - self.min_delta :
      self.best_loss = val_loss
      self.wait = 0
    else :
      self.wait +=1
      if (self.wait >= self.patiance):
        self.stoped_epoch = epoch
        print(f'early stoped at epoch = {self.stoped_epoch}')
        return True
    return False

In [None]:
class custom_learning_rate_sceduler(tf.keras.callbacks.Callback):
