In [1]:
pip install tensorflow_docs

Note: you may need to restart the kernel to use updated packages.


In [2]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfdata
import tensorflow_docs as tfdocs
import tensorflow_docs.plots
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import *
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.models import *
from tensorflow.keras.optimizers import RMSprop
import os
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input


AUTOTUNE = tf.data.experimental.AUTOTUNE


def normalize(input_image, input_mask):
    input_image = preprocess_input(input_image)
    input_mask = tf.cast(input_mask, tf.float32) / 255.0  # Normalize mask to the range [0, 1]

    return input_image, input_mask



@tf.function
def load_image(image_file, mask_file, train=True):
    input_image = tf.io.read_file(image_file)
    input_image = tf.image.decode_png(input_image, channels=3)
    input_image = tf.image.resize(input_image, (224, 224))

    input_mask = tf.io.read_file(mask_file)
    input_mask = tf.image.decode_png(input_mask, channels=1)
    input_mask = tf.image.resize(input_mask, (224, 224))

    if train and tf.random.uniform(()) > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)

    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask


class UNet(object):
    def __init__(self,
                 input_size=(224, 224, 3),
                 num_classes=2,
                 output_channels=1):  # Update output channels based on your mask format
        self.pretrained_model = MobileNetV2(
            input_shape=input_size,
            include_top=False,
            weights='imagenet')

        self.target_layers = [
            'block_1_expand_relu',
            'block_3_expand_relu',
            'block_6_expand_relu',
            'block_13_expand_relu',
            'block_16_project'
        ]

        self.input_size = input_size
        self.num_classes = num_classes
        self.output_channels = output_channels
        

        self.model = self._create_model()
        loss = SparseCategoricalCrossentropy(from_logits=True)
        self.model.compile(optimizer=RMSprop(),
                           loss=loss,
                           metrics=['accuracy'])

    @staticmethod
    def _upsample(filters, size, dropout=False):
        init = tf.random_normal_initializer(0.0, 0.02)

        layers = Sequential()
        layers.add(Conv2DTranspose(filters=filters,
                                   kernel_size=size,
                                   strides=2,
                                   padding='same',
                                   kernel_initializer=init,
                                   use_bias=False))

        layers.add(BatchNormalization())

        if dropout:
            layers.add(Dropout(rate=0.5))

        layers.add(ReLU())

        return layers

    def _create_model(self):
        layers = [self.pretrained_model.get_layer(l).output
                  for l in self.target_layers]
        down_stack = Model(inputs=self.pretrained_model.input,
                           outputs=layers)
        down_stack.trainable = False

        up_stack = []

        for filters in (512, 256, 128, 64):
            up_block = self._upsample(filters, 4)
            up_stack.append(up_block)

        inputs = Input(shape=self.input_size)
        x = inputs

        skip_layers = down_stack(x)
        x = skip_layers[-1]
        skip_layers = reversed(skip_layers[:-1])

        for up, skip_connection in zip(up_stack, skip_layers):
            x = up(x)
            x = Concatenate()([x, skip_connection])

        init = tf.random_normal_initializer(0.0, 0.02)
        output = Conv2DTranspose(
            filters=self.output_channels,
            kernel_size=3,
            strides=2,
            padding='same',
            kernel_initializer=init)(x)

        return Model(inputs, outputs=output)

    @staticmethod
    def _plot_model_history(model_history, metric, ylim=None):
        plt.style.use('seaborn-darkgrid')
        plotter = tfdocs.plots.HistoryPlotter()
        plotter.plot({'Model': model_history}, metric=metric)

        plt.title(f'{metric.upper()}')
        if ylim is None:
            plt.ylim([0, 1])
        else:
            plt.ylim(ylim)

        plt.savefig(f'{metric}.png')
        plt.close()

    def train(self, train_dataset, epochs, steps_per_epoch,
              validation_dataset, validation_steps):
        hist = \
            self.model.fit(train_dataset,
                           epochs=epochs,
                           steps_per_epoch=steps_per_epoch,
                           validation_steps=validation_steps,
                           validation_data=validation_dataset)

        self._plot_model_history(hist, 'loss', [0., 2.0])
        self._plot_model_history(hist, 'accuracy')

    @staticmethod
    def _process_mask(mask):
        mask = (mask.numpy() * 127.5).astype('uint8')
        mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB)

        return mask

    def _save_image_and_masks(self, image,
                              ground_truth_mask,
                              prediction_mask,
                              image_id):
        image = (image.numpy() * 255.0).astype('uint8')
        gt_mask = self._process_mask(ground_truth_mask)
        pred_mask = self._process_mask(prediction_mask)

        mosaic = np.hstack([image, gt_mask, pred_mask])
        mosaic = cv2.cvtColor(mosaic, cv2.COLOR_RGB2BGR)

        cv2.imwrite(f'out3_{image_id}.jpg', mosaic)

    @staticmethod
    def _create_mask(prediction_mask):
        prediction_mask = tf.argmax(prediction_mask, axis=-1)
        prediction_mask = prediction_mask[..., tf.newaxis]

        return prediction_mask[0]

    def _save_predictions(self, dataset, sample_size=1):
        for id, (image, mask) in \
                enumerate(dataset.take(sample_size), start=1):
            pred_mask = self.model.predict(image)
            pred_mask = self._create_mask(pred_mask)

            image = image[0]
            ground_truth_mask = mask[0]

            self._save_image_and_masks(image,
                                       ground_truth_mask,
                                       pred_mask,
                                       image_id=id)

    def evaluate(self, test_dataset, sample_size=5):
        result = self.model.evaluate(test_dataset)
        print(f'Accuracy: {result[1] * 100:.2f}%')

        self._save_predictions(test_dataset, sample_size)


# Path to your custom dataset
IMAGE_DIR = '/kaggle/input/dataset33/data befor split/images'
MASK_DIR = '/kaggle/input/dataset33/data befor split/masks'

AUTOTUNE = tf.data.experimental.AUTOTUNE

# Load custom dataset and create train and test datasets
image_files = sorted([os.path.join(IMAGE_DIR, file) for file in os.listdir(IMAGE_DIR)])
mask_files = sorted([os.path.join(MASK_DIR, file) for file in os.listdir(MASK_DIR)])

# Split dataset into train and test
split_ratio = 0.8
split_index = int(len(image_files) * split_ratio)

train_image_files = image_files[:split_index]
train_mask_files = mask_files[:split_index]
test_image_files = image_files[split_index:]
test_mask_files = mask_files[split_index:]
BATCH_SIZE=64
# Calculate STEPS_PER_EPOCH
STEPS_PER_EPOCH = len(train_image_files) // BATCH_SIZE
# Calculate VALIDATION_STEPS
VALIDATION_STEPS = len(test_image_files) // BATCH_SIZE


# Create train and test datasets
train_dataset = (tf.data.Dataset.from_tensor_slices((train_image_files, train_mask_files))
                 .map(load_image, num_parallel_calls=AUTOTUNE)
                 .cache()
                 .shuffle(len(train_image_files))
                 .batch(BATCH_SIZE)
                 .repeat()
                 .prefetch(buffer_size=AUTOTUNE))

test_dataset = (tf.data.Dataset.from_tensor_slices((test_image_files, test_mask_files))
                .map(lambda image, mask: load_image(image, mask, train=False), num_parallel_calls=AUTOTUNE)
                .batch(BATCH_SIZE))

# Initialize and train UNet model
unet = UNet()
unet.train(train_dataset,
           epochs=10,
           steps_per_epoch=STEPS_PER_EPOCH,
           validation_dataset=test_dataset,
           validation_steps=VALIDATION_STEPS)

# Evaluate the trained model
unet.evaluate(test_dataset)


2024-03-03 15:53:06.217746: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-03-03 15:53:06.217805: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-03-03 15:53:06.219226: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


Epoch 1/10
[1m 1/42[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m28:58[0m 42s/step - accuracy: 0.8088 - loss: 0.0000e+00

I0000 00:00:1709481234.160608     594 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m57s[0m 361ms/step - accuracy: 0.8102 - loss: 0.0000e+00 - val_accuracy: 0.8219 - val_loss: 0.0000e+00
Epoch 2/10
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 343ms/step - accuracy: 0.8115 - loss: 0.0000e+00 - val_accuracy: 0.8106 - val_loss: 0.0000e+00
Epoch 3/10
[1m 1/42[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m7s[0m 190ms/step - accuracy: 0.8161 - loss: 0.0000e+00

  self.gen.throw(typ, value, traceback)


[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 286ms/step - accuracy: 0.8108 - loss: 0.0000e+00 - val_accuracy: 0.8171 - val_loss: 0.0000e+00
Epoch 4/10
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 192ms/step - accuracy: 0.8102 - loss: 0.0000e+00 - val_accuracy: 0.8069 - val_loss: 0.0000e+00
Epoch 5/10
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 279ms/step - accuracy: 0.8131 - loss: 0.0000e+00 - val_accuracy: 0.8146 - val_loss: 0.0000e+00
Epoch 6/10
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 192ms/step - accuracy: 0.8111 - loss: 0.0000e+00 - val_accuracy: 0.8051 - val_loss: 0.0000e+00
Epoch 7/10
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 276ms/step - accuracy: 0.8103 - loss: 0.0000e+00 - val_accuracy: 0.8134 - val_loss: 0.0000e+00
Epoch 8/10
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 191ms/step - accuracy: 0.8115 - loss: 0.0000e+00 - val_accuracy: 0.8042 - val_loss:

  plt.style.use('seaborn-darkgrid')
  plt.style.use('seaborn-darkgrid')


[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 337ms/step - accuracy: 0.8117 - loss: 0.0000e+00
Accuracy: 81.22%
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 30ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 41ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 43ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step


In [3]:
# Save model weights
unet.model.save_weights('/kaggle/working/unet_weights.weights.h5')