<a href="https://colab.research.google.com/github/hanugra/computer-vision-with-python/blob/master/segmentasi_ped.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
from sklearn.metrics import precision_recall_curve, auc, recall_score

import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import glob
import os
#tf.config.optimizer.set_jit(False)

from google.colab import drive
drive.mount('/content/drive')

#link_data = '/content/drive/MyDrive/S3/segmentasi ped/v2/'
#link_model = '/content/drive/MyDrive/S3/segmentasi ped/v3_filter/model_20240820 02:57.keras'

link_data = '/content/drive/MyDrive/S3/segmentasi ped/v4_target hm binary/'
folder_path = link_data + 'hasil/' + datetime.datetime.now().strftime("%Y%m%d_%H:%M") + '/'
if not os.path.exists(folder_path):
    os.makedirs(folder_path )
    print(f"Folder '{folder_path}' created.")
else:
    print(f"Folder '{folder_path}' already exists.")


Mounted at /content/drive
Folder '/content/drive/MyDrive/S3/segmentasi ped/v4_target hm binary/hasil/20240824_08:53/' created.


In [None]:
def load_dt(link):
    dt_load = np.load(link , allow_pickle=True)
    dt_ = dt_load['data']
    dt_ = dt_.tolist()
    return np.array(dt_)

def iou_metric(y_true, y_pred):
    intersection = np.sum((y_true * y_pred), axis=(1, 2, 3))
    union = np.sum(y_true, axis=(1, 2, 3)) + np.sum(y_pred, axis=(1, 2, 3)) - intersection
    iou = np.mean((intersection + 1e-7) / (union + 1e-7), axis=0)  # Adding a small epsilon to avoid division by zero
    return iou

def dice_coefficient(y_true, y_pred):
    intersection = np.sum((y_true * y_pred), axis=(1, 2, 3))
    dice = np.mean((2. * intersection + 1e-7) / (np.sum(y_true, axis=(1, 2, 3)) + np.sum(y_pred, axis=(1, 2, 3)) + 1e-7), axis=0)
    return dice

def calculate_ap(y_true, y_pred):
    precision, recall, _ = precision_recall_curve(y_true.flatten(), y_pred.flatten())
    ap = auc(recall, precision)
    return ap

def calculate_recall(y_true, y_pred):
    return recall_score(y_true.flatten(), y_pred.flatten(), average='binary')


def calculate_lra(y_true, y_pred, iou_threshold=0.5):
    intersection = np.logical_and(y_true, y_pred)
    union = np.logical_or(y_true, y_pred)
    iou = np.sum(intersection) / np.sum(union)
    return iou >= iou_threshold

def iou_metrics(y_true, y_pred):
    # Cast y_true to float32 to match y_pred's type
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred > 0.5, tf.float32)  # Apply threshold to get binary predictions

    # Calculate the intersection and union
    intersection = tf.reduce_sum(y_true * y_pred, axis=(1, 2, 3))
    union = tf.reduce_sum(y_true, axis=(1, 2, 3)) + tf.reduce_sum(y_pred, axis=(1, 2, 3)) - intersection

    # Compute IoU score
    iou = tf.reduce_mean((intersection + 1e-7) / (union + 1e-7), axis=0)  # Adding epsilon to avoid division by zero
    return iou

def dice_coefficients(y_true, y_pred):
    # Cast y_true to float32 to match y_pred's type
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred > 0.5, tf.float32)  # Apply threshold to get binary predictions

    # Calculate the intersection
    intersection = tf.reduce_sum(y_true * y_pred, axis=(1, 2, 3))

    # Calculate Dice coefficient
    dice = tf.reduce_mean((2. * intersection + 1e-7) / (tf.reduce_sum(y_true, axis=(1, 2, 3)) + tf.reduce_sum(y_pred, axis=(1, 2, 3)) + 1e-7), axis=0)
    return dice

def evaluate_model(model_name, history,waktu):
    # Train the model with the learning rate scheduler

    # Print history keys
    print(f"History keys for {model_name}: {history.history.keys()}")

    # Plotting training and validation loss and accuracy
    plt.figure(figsize=(12, 6))

    # Loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title(f'{model_name} - Loss')
    plt.legend()

    # Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title(f'{model_name} - Accuracy')
    plt.legend()

    plt.savefig(folder_path + 'acc_performa.png', dpi=300)
    plt.show()


    # Plotting iou and dice score
    plt.figure(figsize=(12, 6))

    # Loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['iou_metrics'], label='Training IoU')
    plt.plot(history.history['val_iou_metrics'], label='Validation IoU')
    plt.xlabel('Epochs')
    plt.ylabel('IoU')
    plt.title(f'{model_name} - IoU')
    plt.legend()

    # Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history['dice_coefficients'], label='Training Dice')
    plt.plot(history.history['val_dice_coefficients'], label='Validation Dice')
    plt.xlabel('Epochs')
    plt.ylabel('Dice')
    plt.title(f'{model_name} - Dice')
    plt.legend()

    plt.savefig(folder_path + 'iou_performa.png', dpi=300)
    plt.show()



In [None]:
def unet_model(input_size=(256, 256, 9)):
    inputs = layers.Input(input_size)

    # Encoder
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = layers.MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = layers.MaxPooling2D(pool_size=(2, 2))(conv2)

    # Bottleneck
    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = layers.Conv2D(256, 3, activation='relu', padding='same')(conv3)

    # Decoder
    up4 = layers.UpSampling2D(size=(2, 2))(conv3)
    up4 = layers.concatenate([up4, conv2])
    conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(up4)
    conv4 = layers.Conv2D(128, 3, activation='relu', padding='same')(conv4)

    up5 = layers.UpSampling2D(size=(2, 2))(conv4)
    up5 = layers.concatenate([up5, conv1])
    conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(up5)
    conv5 = layers.Conv2D(64, 3, activation='relu', padding='same')(conv5)

    # Output layer
    outputs = layers.Conv2D(2, 1, activation='sigmoid')(conv5)  # 2 output channels for left and right segmentation

    model = models.Model(inputs=[inputs], outputs=[outputs])
    return model

In [None]:
#link_data = '/kaggle/input/ped-segmentasi/v2/'

#id_vid = 2
#input_image = load_dt(link_data + str(id_vid) + '_input.npz')
#print('input loaded')
#target_mask = load_dt(link_data + str(id_vid) + '_target.npz')
#print('target loaded')

#link_image = sorted(glob.glob(link_data + '*_input.npz'))
#link_target = sorted(glob.glob(link_data + '*_target.npz'))

input_image = []
target_mask = []
for id_f in range(1,34):
  temp_input_image = load_dt(link_data + str(id_f) + '_input.npz')
  print('input loaded', id_f)
  temp_target_mask = load_dt(link_data + str(id_f) + '_target.npz')
  print('target loaded', id_f)
  input_image.extend(temp_input_image)
  target_mask.extend(temp_target_mask)
#  if len(input_image) < 3000:
#    input_image.extend(temp_input_image)
#    target_mask.extend(temp_target_mask)
#  else:
#    break

input_image = np.array(input_image)
target_mask = np.array(target_mask)

#X_train, X_test, y_train, y_test = train_test_split(input_image, target_mask, test_size=0.2, random_state=42)
X_train, X_temp, y_train, y_temp = train_test_split(input_image, target_mask, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

print('data splitted')
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape, X_val.shape, y_val.shape)
waktu = datetime.datetime.now().strftime("%Y%m%d_%H:%M")

# Instantiate and compile the model
model = unet_model()
#model.load_weights(link_data + 'model_20240822 07:55.keras')
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', iou_metrics, dice_coefficients])
model.summary()

lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, min_lr=1e-6)
model_checkpoint = ModelCheckpoint(link_data + 'model_'+str(waktu) + '.keras', monitor='val_loss', save_best_only=True, verbose=1)
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1)

history = model.fit(X_train, y_train,
                    epochs=8,
                    #initial_epoch=5,
                    validation_data= (X_val, y_val),
                    batch_size=64,
                    callbacks=[lr_scheduler,early_stopping,model_checkpoint]
        )
model.save(folder_path+'model_unet.h5')
model.save_weights(folder_path+'weight_unet.h5')

input loaded 1
target loaded 1
input loaded 2
target loaded 2
input loaded 3
target loaded 3
input loaded 4
target loaded 4
input loaded 5
target loaded 5
input loaded 6
target loaded 6
input loaded 7
target loaded 7
input loaded 8
target loaded 8
input loaded 9
target loaded 9
input loaded 10
target loaded 10
input loaded 11
target loaded 11
input loaded 12
target loaded 12
input loaded 13
target loaded 13
input loaded 14
target loaded 14
input loaded 15
target loaded 15
input loaded 16
target loaded 16
input loaded 17
target loaded 17
input loaded 18
target loaded 18
input loaded 19
target loaded 19
input loaded 20
target loaded 20
input loaded 21
target loaded 21
input loaded 22
target loaded 22
input loaded 23
target loaded 23
input loaded 24
target loaded 24
input loaded 25
target loaded 25
input loaded 26
target loaded 26
input loaded 27
target loaded 27
input loaded 28
target loaded 28
input loaded 29
target loaded 29
input loaded 30
target loaded 30
input loaded 31
target loade

  saving_api.save_model(


In [None]:

evaluate_model("Unet Segmentation", history, waktu)

#y_pred = model.predict(X_test)

import time

start_time = time.time()
# Perform the prediction
y_pred = model.predict(X_test)
end_time = time.time()
# Calculate time taken per frame
time_per_frame = end_time - start_time
num_frames = X_test.shape[0]
time_per_frame = time_per_frame / num_frames
fps = 1 / time_per_frame

y_pred_thresholded = (y_pred > 0.5).astype(np.float32)

iou_score = iou_metric(y_test, y_pred_thresholded)
dice_score = dice_coefficient(y_test, y_pred_thresholded)
ap = calculate_ap(y_test, y_pred_thresholded)
recall = calculate_recall(y_test, y_pred_thresholded)
lra = calculate_lra(y_test, y_pred_thresholded)

print(f'IoU Score: {iou_score}')
print(f'Dice Coefficient: {dice_score}')
print(f'Average Precision (AP): {ap}')
print(f'Recall: {recall}')
print(f'Localization Recall Accuracy (LRA) : {lra}')
print(f"FPS: {fps:.2f}")

np.savez_compressed(folder_path +'y_pred.npz', data=y_pred)
np.savez_compressed(folder_path +'y_test.npz', data=y_test)
np.savez_compressed(folder_path +'X_test.npz', data=X_test)

IoU Score: 0.9499472930850135
Dice Coefficient: 0.9742287301469023
Average Precision (AP): 0.9759489072444717
Recall: 0.9844867415913383
Localization Recall Accuracy (LRA) : True
FPS: 17.17


In [None]:
model.save(folder_path+'model_unet.keras')
def check_pred():
  y_pred = load_dt(link_data + 'hasil/20240823 04:25y_pred_unet.npz')
  y_test = load_dt(link_data + 'hasil/20240823 04:25_y_test_unet.npz')
  X_test = load_dt(link_data + '17_input.npz')
  y_pred_thresholded = (y_pred > 0.5).astype(np.float32)


  iou_score = iou_metric(y_test, y_pred_thresholded)
  dice_score = dice_coefficient(y_test, y_pred_thresholded)
  ap = calculate_ap(y_test, y_pred_thresholded)
  recall = calculate_recall(y_test, y_pred_thresholded)
  lra = calculate_lra(y_test, y_pred_thresholded)

  model = unet_model()
  model.summary()

  model = tf.keras.models.load_model(link_data + 'hasil/model_20240823 04:25.keras', custom_objects={
      'dice_coefficients': dice_coefficients,
      'iou_metrics': iou_metrics
  })


  start_time = time.time()
  # Perform the prediction
  output = model.predict(X_test)
  end_time = time.time()
  # Calculate time taken per frame
  time_per_frame = end_time - start_time
  num_frames = X_test.shape[0]
  time_per_frame = total_time / num_frames
  fps = 1 / time_per_frame
  print(f"FPS: {fps:.2f}")

  print(f'IoU Score: {iou_score}')
  print(f'Dice Coefficient: {dice_score}')
  print(f'Average Precision (AP): {ap}')
  print(f'Recall: {recall}')
  print(f'Localization Recall Accuracy (LRA) : {lra}')



In [None]:
import matplotlib.pyplot as plt

def plot_dt(dt1, dt2):
  plt.figure(figsize=(6, 6))
  plt.imshow(dt1,cmap='viridis')
  plt.figure(figsize=(6, 6))
  plt.imshow(dt2,cmap='viridis')

idx = 177
def plot_frame(idx):
  print('RGB image')
  plt.figure(figsize=(6, 6))
  plt.imshow(X_test[idx,:,:,0:3])

  print('Ground truth')
  plot_dt(y_test[idx,:,:,0], y_test[idx,:,:,1])
  print('Prediction')
  plot_dt(y_pred[idx,:,:,0], y_pred[idx,:,:,1])
  print('Treshold prediction')
  plot_dt(y_pred_thresholded[idx,:,:,0], y_pred_thresholded[idx,:,:,1])



