<a href="https://colab.research.google.com/github/deconasser/UdemyCourse/blob/main/Unet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install tensorflow
!pip install segmentation_models

Collecting segmentation_models
  Downloading segmentation_models-1.0.1-py3-none-any.whl (33 kB)
Collecting keras-applications<=1.0.8,>=1.0.7 (from segmentation_models)
  Downloading Keras_Applications-1.0.8-py3-none-any.whl (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.7/50.7 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting image-classifiers==1.0.0 (from segmentation_models)
  Downloading image_classifiers-1.0.0-py3-none-any.whl (19 kB)
Collecting efficientnet==1.0.0 (from segmentation_models)
  Downloading efficientnet-1.0.0-py3-none-any.whl (17 kB)
Installing collected packages: keras-applications, image-classifiers, efficientnet, segmentation_models
Successfully installed efficientnet-1.0.0 image-classifiers-1.0.0 keras-applications-1.0.8 segmentation_models-1.0.1


In [3]:
!pip install albumentations



In [1]:
import os
os.environ["SM_FRAMEWORK"] = "tf.keras"

from tensorflow import keras
import segmentation_models as sm
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
import albumentations as A
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard
from albumentations import Compose, HorizontalFlip, RandomRotate90

import numpy as np
import cv2
from google.colab.patches import cv2_imshow
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from albumentations import (
    Compose,
    RandomRotate90,
    Flip,
    Transpose,
    ElasticTransform,
    GridDistortion,
    OpticalDistortion,
    RandomBrightnessContrast,
    HorizontalFlip,
    VerticalFlip,
    RandomGamma,
    RGBShift,
)

Segmentation Models: using `tf.keras` framework.


In [2]:
def preprocess_gt(gt_image):
    gt_image = np.array(gt_image, dtype=np.uint8)
    num_classes = 3
    gt_one_hot = np.zeros((gt_image.shape[0], gt_image.shape[1], num_classes), dtype=np.float32)

    # Lớp 0: Màu đen (background)
    gt_one_hot[:, :, 0] = (gt_image == 0).all(axis=-1).astype(np.float32)
    # Lớp 1: Màu xanh lá cây (lành tính)
    gt_one_hot[:, :, 1] = (gt_image[:, :, 1] == 255).astype(np.float32) # channel 1 (G) == 255
    # Lớp 2: Màu đỏ (ác tính)
    gt_one_hot[:, :, 2] = (gt_image[:, :, 0] == 255).astype(np.float32) # channel 0 (R) == 255

    return gt_one_hot

def load_data(image_path, mask_path, img_size=(256, 256)):
    images = []
    masks = []

    image_files = sorted(os.listdir(image_path))
    mask_files = sorted(os.listdir(mask_path))

    for img_file, mask_file in zip(image_files, mask_files):
        img = load_img(os.path.join(image_path, img_file), target_size=img_size)
        img = img_to_array(img) / 255.0

        mask = load_img(os.path.join(mask_path, mask_file), target_size=img_size)
        mask = img_to_array(mask)

        mask = preprocess_gt(mask)

        images.append(img)
        masks.append(mask)

    images = np.array(images, dtype=np.float32)
    masks = np.array(masks, dtype=np.float32)

    return images, masks

# Đường dẫn tới dữ liệu
train_images_path = '/content/drive/MyDrive/Unet/data/train'
train_masks_path = '/content/drive/MyDrive/Unet/data/train_gt'

In [3]:
train_images, train_masks = load_data(train_images_path, train_masks_path)

In [4]:
test = load_img("/content/drive/MyDrive/Unet/data/train_gt/0081835cf877e004e8bfb905b78a9139.jpeg")
test = img_to_array(test)
test = preprocess_gt(test)
print(test[0][0])

[1. 0. 0.]


In [5]:
def augment_data(images, masks):
  aug = Compose([
      HorizontalFlip(p=0.5),
      RandomRotate90(p=0.5),
      A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=45, p=0.5),
      A.RandomBrightnessContrast(p=0.5)
  ])
  augmented_images = []
  augmented_masks = []
  for img, mask in zip(images, masks):
      augmented = aug(image=img, mask=mask)
      augmented_images.append(augmented['image'])
      augmented_masks.append(augmented['mask'])
  return np.array(augmented_images), np.array(augmented_masks)

In [6]:
augmented_images, augmented_masks = augment_data(train_images, train_masks)

In [7]:
import tensorflow as tf
from tensorflow.keras import layers, models

def weighted_dice_loss(y_true, y_pred):
  smooth = 1.0
  class_weights = tf.constant([0.5, 2.0, 2.0], dtype=tf.float32)

  # Apply class weights
  y_true_weighted = y_true * class_weights
  y_pred_weighted = y_pred * class_weights

  y_true_f = tf.keras.backend.flatten(y_true_weighted)
  y_pred_f = tf.keras.backend.flatten(y_pred_weighted)
  intersection = tf.keras.backend.sum(y_true_f * y_pred_f)

  return 1 - (2. * intersection + smooth) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth)


def iou_metric(y_true, y_pred):
  y_true_f = tf.keras.backend.flatten(y_true)
  y_pred_f = tf.keras.backend.flatten(y_pred)
  intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
  union = tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) - intersection
  return intersection / union

def dice_coefficient(y_true, y_pred):
  smooth = 1.0
  y_true_f = tf.keras.backend.flatten(y_true)
  y_pred_f = tf.keras.backend.flatten(y_pred)
  intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
  return (2. * intersection + smooth) / (tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth)

def unet_model(input_size=(256, 256, 3)):
    inputs = layers.Input(input_size)

    # Encoder
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = layers.MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = layers.Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = layers.Conv2D(512, 3, activation='relu', padding='same')(conv4)
    pool4 = layers.MaxPooling2D(pool_size=(2, 2))(conv4)


    # Decoder

    up7 = layers.Conv2D(256, 2, activation='relu', padding='same')(layers.UpSampling2D(size=(2, 2))(conv4))
    merge7 = layers.concatenate([conv3, up7], axis=3)
    conv7 = layers.Conv2D(256, 3, activation='relu', padding='same')(merge7)
    conv7 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv7)

    up8 = layers.Conv2D(128, 2, activation='relu', padding='same')(layers.UpSampling2D(size=(2, 2))(conv7))
    merge8 = layers.concatenate([conv2, up8], axis=3)
    conv8 = layers.Conv2D(128, 3, activation='relu', padding='same')(merge8)
    conv8 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv8)

    up9 = layers.Conv2D(64, 2, activation='relu', padding='same')(layers.UpSampling2D(size=(2, 2))(conv8))
    merge9 = layers.concatenate([conv1, up9], axis=3)
    conv9 = layers.Conv2D(64, 3, activation='relu', padding='same')(merge9)
    conv9 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv9)

    # Final layer with softmax activation
    segmentation_output = layers.Conv2D(3, 1, activation='softmax', name='segmentation')(conv9)

    model = models.Model(inputs=inputs, outputs=segmentation_output)
    model.compile(optimizer=Adam(learning_rate=1e-4), loss=weighted_dice_loss, metrics=[iou_metric, dice_coefficient])

    return model

model = unet_model()


In [8]:
import datetime
# Tạo thư mục lưu trữ kết quả huấn luyện
log_dir = os.path.join("/content/drive/MyDrive/Unet/logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
os.makedirs(log_dir, exist_ok=True)

# Callbacks
checkpoint_callback = ModelCheckpoint(
    filepath='best_model.keras',
    monitor='val_loss',
    save_best_only=True,
    mode='min',
    verbose=1
)

early_stopping_callback = EarlyStopping(
    monitor='val_loss',
    patience=10,
    mode='min',
    verbose=1
)

tensorboard_callback = TensorBoard(
    log_dir=log_dir,
    histogram_freq=1
)

callbacks = [checkpoint_callback, early_stopping_callback, tensorboard_callback]


In [9]:
X_train, X_val, y_train, y_val = train_test_split(augmented_images, augmented_masks, test_size=0.2, random_state=42)

history = model.fit(
    X_train,
    y_train,
    epochs=50,
    batch_size=16,
    validation_data=(X_val, y_val),
    callbacks=callbacks,
    verbose=1
)

Epoch 1/50
Epoch 1: val_loss improved from inf to 0.52417, saving model to best_model.keras
Epoch 2/50
Epoch 2: val_loss did not improve from 0.52417
Epoch 3/50
Epoch 3: val_loss did not improve from 0.52417
Epoch 4/50
Epoch 4: val_loss did not improve from 0.52417
Epoch 5/50
Epoch 5: val_loss did not improve from 0.52417
Epoch 6/50
Epoch 6: val_loss did not improve from 0.52417
Epoch 7/50
Epoch 7: val_loss did not improve from 0.52417
Epoch 8/50
Epoch 8: val_loss did not improve from 0.52417
Epoch 9/50
Epoch 9: val_loss did not improve from 0.52417
Epoch 10/50
Epoch 10: val_loss did not improve from 0.52417
Epoch 11/50
Epoch 11: val_loss did not improve from 0.52417
Epoch 11: early stopping
