<a href="https://colab.research.google.com/github/aysensahin/drain_project/blob/main/model_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Liquid Volume Detection From Drain Images

* Mount drive to achieve the dataset.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

* Model Training

In [None]:
import os
import numpy as np
import cv2
import torchvision.models.segmentation
import torch
import torchvision.transforms as tf

learning_rate=1e-5
width=height=900 # image width and height
batchSize= 3

train_folder="/content/drive/MyDrive/drain_train"  #Add drive link of the dataset
# Create list of images
ListofImages=os.listdir(os.path.join(train_folder, "Image"))

# Transform image
transformImg=tf.Compose([tf.ToPILImage(),tf.Resize((height,width)),tf.ToTensor(),tf.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])
transformAnn=tf.Compose([tf.ToPILImage(),tf.Resize((height,width),tf.InterpolationMode.NEAREST),tf.ToTensor()])

# Read image
def ReadRandom(): # Load random image and its annotation
    idx=np.random.randint(0,len(ListofImages)) # Select random image
    Img=cv2.imread(os.path.join(train_folder, "Image", ListofImages[idx]))[:,:,0:3]
    Filled =  cv2.imread(os.path.join(train_folder, "Segmentation/3_Blood", ListofImages[idx].replace("jpg","png")),0)  #blood
    Vessel1 =  cv2.imread(os.path.join(train_folder, "Segmentation/1_Drain1", ListofImages[idx].replace("jpg","png")),0)  #jackson pratt drain
    Vessel2 =  cv2.imread(os.path.join(train_folder, "Segmentation/2_Drain2", ListofImages[idx].replace("jpg","png")),0)  #hemovac drain
    AnnotationMap = np.zeros(Img.shape[0:2],np.float32)
    if Vessel1 is not None:  AnnotationMap[ Vessel1 == 1 ] = 1
    if Vessel2 is not None:  AnnotationMap[ Vessel2 == 1 ] = 2
    if Filled is not None:  AnnotationMap[ Filled  == 1 ] = 3
    Img=transformImg(Img)
    AnnotationMap=transformAnn(AnnotationMap)
    return Img,AnnotationMap

# Load batch of images
def LoadBatch(): # Load batch of images
    images = torch.zeros([batchSize,3,height,width])
    ann = torch.zeros([batchSize, height, width])
    for i in range(batchSize):
        images[i],ann[i]=ReadRandom()
    return images, ann

# Load and set net and optimizer
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
Net = torchvision.models.segmentation.deeplabv3_resnet50(pretrained=True) # Load net
Net.classifier[4] = torch.nn.Conv2d(256, 4, kernel_size=(1, 1), stride=(1, 1)) # Change final layer to 4 classes which are blood,drain1,drain2,background
Net=Net.to(device)
optimizer=torch.optim.Adam(params=Net.parameters(),lr=learning_rate) # Create adam optimizer

# Train
for itr in range(10000): # Training loop
   images,ann=LoadBatch() # Load training batch
   images=torch.autograd.Variable(images,requires_grad=False).to(device) # Load image
   ann = torch.autograd.Variable(ann, requires_grad=False).to(device) # Load annotation
   Prediction=Net(images)['out'] # Make prediction
   Net.zero_grad()
   criterion = torch.nn.CrossEntropyLoss() # Set loss function
   Loss=criterion(Prediction,ann.long()) # Calculate cross entropy loss
   Loss.backward() # Backpropogate loss
   optimizer.step() # Apply gradient descent change to weight
   seg = torch.argmax(Prediction[0], 0).cpu().detach().numpy()  # Get  prediction classes
   print(itr,") Loss=",Loss.data.cpu().numpy())
   if itr % 1000 == 0: #Save model weights
        print("Saving Model" +str(itr) + ".torch")
        torch.save(Net.state_dict(),   str(itr) + ".torch")

In [None]:
torch.save(Net.state_dict(), "9999.torch")

* Obtaining segmented output images using trained model

In [None]:
import cv2
import torchvision.models.segmentation
import torch
import torchvision.transforms as tf
import matplotlib.pyplot as plt

modelPath = "/content/drive/MyDrive/trainedmodel.torch"   #Path to trained model (9999.torch)
imagePath = "/content/image.jpg"   #Path of original image
height=width=900
transformImg = tf.Compose([tf.ToPILImage(), tf.Resize((height, width)), tf.ToTensor(),tf.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
Net = torchvision.models.segmentation.deeplabv3_resnet50(pretrained=True)
Net.classifier[4] = torch.nn.Conv2d(256, 4, kernel_size=(1, 1), stride=(1, 1))
Net = Net.to(device)  # Set net to GPU or CPU
Net.load_state_dict(torch.load(modelPath)) # Load trained model
Net.eval() # Set to evaluation mode
Img = cv2.imread(imagePath) # load test image
height_orgin , width_orgin ,d = Img.shape # Get image original size
plt.imshow(Img[:,:,::-1])  # Show imageplt.show()
Img = transformImg(Img)  # Transform to pytorch
Img = torch.autograd.Variable(Img, requires_grad=False).to(device).unsqueeze(0)
with torch.no_grad():
    Prediction = Net(Img)['out']  # Run net
# resize to original size
Prediction = tf.Resize((height_orgin,width_orgin))(Prediction[0])
#Convert probability to class map
seg = torch.argmax(Prediction, 0).cpu().detach().numpy()
plt.imshow(seg)  # display image
plt.imsave('/content/imagesegmented.png', seg)
plt.show()



*   Detect volume with pixel counting



In [None]:
import numpy as np
import cv2
from matplotlib import pyplot as plt
from PIL import Image
import math

img = Image.open('/content/IMG_0408segmented.png')

drain = 0
blood = 0

for pixel in img.getdata():
    if pixel == (48, 103, 141, 255): # (32, 144, 140, 255) drain2 // (48, 103, 141, 255) drain1
        drain += 1
    elif pixel == (253, 231, 36, 255): # (253, 231, 36, 255)
        blood += 1
volume = math.floor(blood / (blood + drain) * 267)  #390 for drain2 without bottom //  267 for drain1
print(f"Estimated volume of blood is {volume}ml")