<a href="https://colab.research.google.com/github/Abhi-gola/Fruit-classifier/blob/main/oxford_image_segmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!curl -O https://www.robots.ox.ac.uk/~vgg/data/pets/data/images.tar.gz
!curl -O https://www.robots.ox.ac.uk/~vgg/data/pets/data/annotations.tar.gz
!tar -xf images.tar.gz
!tar -xf annotations.tar.gz

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  755M  100  755M    0     0  27.8M      0  0:00:27  0:00:27 --:--:-- 29.1M
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 18.2M  100 18.2M    0     0  10.2M      0  0:00:01  0:00:01 --:--:-- 10.2M


In [None]:
!mkdir iiit_pet_dataset

In [None]:
H = 256 
W = 256
import os 
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, MaxPool2D, UpSampling2D, Concatenate

In [None]:
def read_data(data_path, file_path):
  df = pd.read_csv(file_path, sep=" ", header=None)
  names = df[0].values

  images = [os.path.join(data_path, f"images/{name}.jpg") for name in names]
  masks = [os.path.join(data_path, f"annotations/trimaps/{name}.png") for name in names]
   
  return images, masks

In [None]:
def load_data(path):
  train_valid_path = os.path.join(path, "annotations/trainval.txt")
  test_path = os.path.join(path, "annotations/test.txt")

  train_x, train_y = read_data(path, train_valid_path)
  test_x, test_y = read_data(path, test_path)

  train_x, valid_x = train_test_split(train_x, test_size=0.2, random_state=42)
  train_y, valid_y = train_test_split(train_y, test_size=0.2, random_state=42)

  return(train_x, train_y), (valid_x, valid_y), (test_x, test_y)

In [None]:
def read_image(x):
  x = cv2.imread(x, cv2.IMREAD_COLOR)
  x = cv2.resize(x, (W, H))
  x = x/255.0
  x = x.astype(np.float32)
  return x

In [None]:
def read_mask(x):
  x = cv2.imread(x, cv2.IMREAD_GRAYSCALE)
  x = cv2.resize(x, (W, H))
  x = x-1
  x = x.astype(np.int32)
  return x

In [None]:
def process(x, y):
  def f(x, y):
    x = x.decode()
    y = y.decode()

    image = read_image(x)
    mask = read_mask(y)

    return image, mask

  image, mask = tf.numpy_function(f, [x, y], [tf.float32, tf.int32])
  mask = tf.one_hot(mask, 3, dtype=tf.int32)
  image.set_shape([H, W, 3])
  mask.set_shape([H, W, 3])

  return image, mask

In [None]:
def tf_dataset(x, y, batch_size=8):
  dataset = tf.data.Dataset.from_tensor_slices((x, y))
  dataset = dataset.shuffle(buffer_size=5000)
  dataset = dataset.map(process)
  dataset = dataset.batch(batch_size)
  dataset = dataset.repeat()
  dataset = dataset.prefetch(2)
  return dataset 

In [None]:
def conv_block(inputs, filters, pooling):
  x = Conv2D(filters, 3, activation="relu", padding='same')(inputs)
  x = BatchNormalization()(x)

  x = Conv2D(filters, 3, activation='relu', padding='same')(inputs)
  x = BatchNormalization()(x)

  if pooling:
    p = MaxPool2D(2,2)(x)
    return x, p
  else:
    return x

In [None]:
def unet_model(shape, num_classes):
  inputs = Input(shape)

  #Encoder
  x1, p1 = conv_block(inputs, 16, pooling=True)
  x2, p2 = conv_block(p1, 32, pooling=True)
  x3, p3 = conv_block(p2, 48, pooling=True)
  x4, p4 = conv_block(p3, 64, pooling=True)

  #Bridge
  b1 = conv_block(p4, 128, pooling=False)

  #Decoder 
  u1 = UpSampling2D((2,2), interpolation="bilinear")(b1)
  c1 = Concatenate()([u1, x4])
  x5 = conv_block(c1, 64, pooling=False)

  u2 = UpSampling2D((2,2), interpolation="bilinear")(x5)
  c2 = Concatenate()([u2, x3])
  x6 = conv_block(c2, 48, pooling=False)

  u3 = UpSampling2D((2,2), interpolation="bilinear")(x6)
  c3 = Concatenate()([u3, x2])
  x7 = conv_block(c3, 32, pooling=False)

  u4 = UpSampling2D((2,2), interpolation="bilinear")(x7)
  c4 = Concatenate()([u4, x1])
  x8 = conv_block(c4, 16, pooling=False)

  output = Conv2D(num_classes, 1, padding='same', activation='softmax')(x8)
  return Model(inputs, output)

In [None]:
np.random.seed(42)
tf.random.set_seed(42)
path = "/content/iiit_pet_dataset"

In [None]:
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(path)
print("no. of training images", len(train_x))
print("no. of validation images", len(valid_x))
print("no. of test images", len(test_x))

no. of training images 2944
no. of validation images 736
no. of test images 3669


In [None]:
shape = (256, 256, 3)
num_classes = 3
learning_rate = 1e-3
batch_size = 32
epochs = 50

train_dataset = tf_dataset(train_x, train_y, batch_size=batch_size)
valid_dataset = tf_dataset(valid_x, valid_y, batch_size=batch_size)

train_steps = len(train_x)//batch_size
valid_steps = len(valid_x)//batch_size

model = unet_model(shape, num_classes)
model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate), metrics=['accuracy'])

callbacks = [ModelCheckpoint("model.h5", verbose=1, save_best_only=True), 
             ReduceLROnPlateau(monitor="val_loss", patience=8, factor=0.1, verbose=1, min_lr=1e-6)]

In [None]:
model.fit(train_dataset, steps_per_epoch=train_steps, validation_data=valid_dataset,
          validation_steps=valid_steps, epochs=epochs, callbacks=callbacks)

Epoch 1/50
Epoch 00001: val_loss improved from inf to 0.85365, saving model to model.h5
Epoch 2/50
Epoch 00002: val_loss did not improve from 0.85365
Epoch 3/50
Epoch 00003: val_loss did not improve from 0.85365
Epoch 4/50
Epoch 00004: val_loss improved from 0.85365 to 0.71502, saving model to model.h5
Epoch 5/50
Epoch 00005: val_loss improved from 0.71502 to 0.63674, saving model to model.h5
Epoch 6/50
Epoch 00006: val_loss improved from 0.63674 to 0.45467, saving model to model.h5
Epoch 7/50
Epoch 00007: val_loss did not improve from 0.45467
Epoch 8/50
Epoch 00008: val_loss did not improve from 0.45467
Epoch 9/50
Epoch 00009: val_loss improved from 0.45467 to 0.41633, saving model to model.h5
Epoch 10/50
Epoch 00010: val_loss did not improve from 0.41633
Epoch 11/50
Epoch 00011: val_loss did not improve from 0.41633
Epoch 12/50
Epoch 00012: val_loss did not improve from 0.41633
Epoch 13/50
Epoch 00013: val_loss did not improve from 0.41633
Epoch 14/50
Epoch 00014: val_loss did not im

<keras.callbacks.History at 0x7faec84a6750>

In [None]:
!mkdir segmented_images_

In [None]:
from tqdm import tqdm
model = tf.keras.models.load_model("model.h5")

for x, y in tqdm(zip(test_x, test_y), total=len(test_x)):
  name = x.split("/")[-1]

  x = read_image(x)

  # read mask
  y = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
  y = cv2.resize(y, (W, H))
  y = y-1
  y = np.expand_dims(y, axis=-1)
  y = y*(255/num_classes)
  y = y.astype(np.int32)
  y = np.concatenate([y, y, y], axis=2)

  #prediction
  pred = model.predict(np.expand_dims(x, axis=0))[0]
  pred = np.argmax(pred, axis=-1)
  pred = np.expand_dims(pred, axis=-1)
  pred = pred*(255/num_classes)
  pred = pred.astype(np.int32)
  pred = np.concatenate([pred, pred, pred], axis=2)

  x = x*255.0
  x = x.astype(np.int32)

  h, w, _ = x.shape
  line = np.ones((h, 10, 3))*255
  result_image = np.concatenate([x, line, y, line, pred], axis=1)
  cv2.imwrite(f"segmented_images_/{name}", result_image)

100%|██████████| 3669/3669 [05:21<00:00, 11.42it/s]
