<a href="https://colab.research.google.com/github/ZeynepRuveyda/Egg_Counter/blob/main/Egg_counter_UNET.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import zipfile
import cv2
from skimage import io
import tensorflow as tf
from tensorflow.python.keras import Sequential
from tensorflow.keras import layers, optimizers
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.applications.resnet101 import ResNet101
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.initializers import glorot_uniform
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint, LearningRateScheduler
from IPython.display import display
from tensorflow.keras import backend as K
from sklearn.preprocessing import StandardScaler, normalize
import os
import glob
import random
from google.colab import files 
import plotly.graph_objects as go
from sklearn.model_selection import train_test_split
from keras_preprocessing.image import ImageDataGenerator
%matplotlib inline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# ResUnet

In [None]:
%cd /content/drive/My Drive/eggs_files

In [None]:

image_path = []
mask_path = []
for i in range(1,12):
  image_path.append(str(i)+"_1.png")
  mask_path.append(str(i)+"_1_mask.png")
  image_path.append(str(i)+"_2.png")
  mask_path.append(str(i)+"_2_mask.png")

In [None]:
df = {"image_path":image_path,"mask_path":mask_path}
df = pd.DataFrame(df)
df.reset_index(drop=True, inplace=True)
df.head()

In [None]:
def check_size(img):
  img = cv2.imread(img)
  
  return print(img.shape)

In [None]:
for i in mask_path:
  check_size(i)

In [None]:
X_test = pd.DataFrame({"id":["1_test"],"image_path":["1_test.png"]})

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_val = train_test_split(df, test_size=0.10)


In [None]:
train_ids = list(X_train.image_path)
train_mask = list(X_train.mask_path)

val_ids = list(X_val.image_path)
val_mask= list(X_val.mask_path)

In [None]:
from utilities import DataGenerator

training_generator = DataGenerator(train_ids,train_mask)
validation_generator = DataGenerator(val_ids,val_mask)

In [None]:
def resblock(X, f):
  

  X_copy = X


  X = Conv2D(f, kernel_size = (1,1) ,strides = (1,1),kernel_initializer ='he_normal')(X)
  X = BatchNormalization()(X)
  X = Activation('relu')(X) 

  X = Conv2D(f, kernel_size = (3,3), strides =(1,1), padding = 'same', kernel_initializer ='he_normal')(X)
  X = BatchNormalization()(X)


  X_copy = Conv2D(f, kernel_size = (1,1), strides =(1,1), kernel_initializer ='he_normal')(X_copy)
  X_copy = BatchNormalization()(X_copy)

  X = Add()([X,X_copy])
  X = Activation('relu')(X)

  return X

In [None]:
def upsample_concat(x, skip):
  x = UpSampling2D((2,2))(x)
  merge = Concatenate()([x, skip])

  return merge

In [None]:
input_shape = (1024,1024,3)

# Input tensor shape
X_input = Input(input_shape)

# Stage 1
conv1_in = Conv2D(16,3,activation= 'relu', padding = 'same', kernel_initializer ='he_normal')(X_input)
conv1_in = BatchNormalization()(conv1_in)
conv1_in = Conv2D(16,3,activation= 'relu', padding = 'same', kernel_initializer ='he_normal')(conv1_in)
conv1_in = BatchNormalization()(conv1_in)
pool_1 = MaxPool2D(pool_size = (2,2))(conv1_in)

# Stage 2
conv2_in = resblock(pool_1, 32)
pool_2 = MaxPool2D(pool_size = (2,2))(conv2_in)

# Stage 3
conv3_in = resblock(pool_2, 64)
pool_3 = MaxPool2D(pool_size = (2,2))(conv3_in)

# Stage 4
conv4_in = resblock(pool_3, 128)
pool_4 = MaxPool2D(pool_size = (2,2))(conv4_in)
# Stage 5
conv5_in = resblock(pool_4, 256)
pool_5 = MaxPool2D(pool_size = (2,2))(conv5_in)
# Stage 6
conv6_in = resblock(pool_5, 512)
pool_6 = MaxPool2D(pool_size = (2,2))(conv6_in)


# Stage 7 (Bottle Neck)
conv7_in = resblock(pool_6, 1024)

# Upscale stage 1
up_1 = upsample_concat(conv7_in, conv6_in)
up_1 = resblock(up_1, 512)

# Upscale stage 2
up_2 = upsample_concat(up_1, conv5_in)
up_2 = resblock(up_2, 256)

# Upscale stage 3
up_3 = upsample_concat(up_2, conv4_in)
up_3 = resblock(up_3, 128)

# Upscale stage 4
up_4 = upsample_concat(up_3, conv3_in)
up_4 = resblock(up_4, 64)

# Upscale stage 5
up_5 = upsample_concat(up_4, conv2_in)
up_5 = resblock(up_5, 32)

# Upscale stage 6
up_6 = upsample_concat(up_5, conv1_in)
up_6 = resblock(up_6, 16)


# Final Output
output = Conv2D(1, (1,1), padding = "same", activation = "sigmoid")(up_6)

model_seg = Model(inputs = X_input, outputs = output )

In [None]:
model_seg.summary()

In [None]:
from utilities import focal_tversky, tversky_loss, tversky

In [None]:
adam = tf.keras.optimizers.Adam(lr = 4.0000e-05)
model_seg.compile(optimizer = adam, loss = focal_tversky, metrics = [tversky])

In [None]:
earlystopping = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=20)
checkpointer = ModelCheckpoint(filepath="seg-weights.hdf5", verbose=1, save_best_only=True)

In [None]:
seg = model_seg.fit(training_generator, epochs = 300, validation_data = validation_generator, callbacks = [checkpointer, earlystopping])

In [None]:
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(seg.history['loss']);
plt.plot(seg.history['val_loss']);
plt.title("SEG Model focal tversky Loss");
plt.ylabel("focal tversky loss");
plt.xlabel("Epochs");
plt.legend(['train', 'val']);

plt.subplot(1,2,2)
plt.plot(seg.history['tversky']);
plt.plot(seg.history['val_tversky']);
plt.title("SEG Model tversky score");
plt.ylabel("tversky Accuracy");
plt.xlabel("Epochs");
plt.legend(['train', 'val']);

In [None]:
model_json = model_seg.to_json()
with open("seg-model.json","w") as json_file:
  json_file.write(model_json)

In [None]:
from utilities import focal_tversky, tversky_loss, tversky

with open('seg-model.json', 'r') as json_file:
    json_savedModel= json_file.read()

model_seg = tf.keras.models.model_from_json(json_savedModel)
model_seg.load_weights('seg-weights.hdf5')
adam = tf.keras.optimizers.Adam(lr = 0.05, epsilon = 0.1)
model_seg.compile(optimizer = adam, loss = focal_tversky, metrics = [tversky])

In [None]:
from utilities import prediction

image_id, mask= prediction(X_test, model_seg)

In [None]:
df_pred = pd.DataFrame({'image_path': image_id,'predicted_mask': mask})
df_pred

In [None]:
df_pred = X_test.merge(df_pred, on = 'image_path')
df_pred.head()

In [None]:
fig, axs = plt.subplots(2, figsize=(30, 50))
for i in range(len(df_pred)):

  # read the images and convert them to RGB format
  img = io.imread(df_pred.image_path[i])
  img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  axs[0].title.set_text("eggs")
  axs[0].imshow(img)

 

  # Obtain the predicted mask for the image 
  predicted_mask = np.asarray(df_pred.predicted_mask[i])[0].squeeze().round()
  axs[1].title.set_text("AI Predicted Mask")
  axs[1].imshow(predicted_mask)
    


fig.tight_layout()