<a href="https://colab.research.google.com/github/A-ManiMekhala/Code_debug/blob/main/Image_Segmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [120]:
!git clone https://github.com/Vikramshenoy97/Human-Segmentation-Dataset

fatal: destination path 'Human-Segmentation-Dataset' already exists and is not an empty directory.


In [121]:
import os
import time
import torch

from PIL import Image

from torch import nn
from torchvision import transforms

from torch.utils.data import DataLoader, Dataset
from torch.optim import Adam, AdamW, SGD

In [122]:
class SegmentationDataset(Dataset):
  def __init__(self, image_dir, mask_dir):
    self.image_dir=image_dir
    self.mask_dir=mask_dir
    self.transform = transforms.Compose([
        transforms.Resize((512,512)),
        transforms.ToTensor()
    ])

    valid_extension = {".jpg","jpeg",".png"}
    ##Filter condition
    self.images= [f for f in os.listdir(image_dir) if os.path.splitext(f)[1].lower() in valid_extension]

  def __len__(self):
    return len(self.images)

  def __getitem__(self,idx):
    ##we are retriving images based on index
    img_path= os.path.join(self.image_dir,self.images[idx])
    ##for eg: 1.jpg--> 1, jpg
    name,text= os.path.splitext(self.images[idx])
    ##Iam going to look for this .png in mask directory to get correct mask
    ## when I ask my data loader to get me an index of five in getitem, it will look
    ##for the image with the index of five and that particular name, with that png will be
    ##the mask path.
    mask_path= os.path.join(self.mask_dir,f"{name}.png")

    image=Image.open(img_path).convert("RGB")
    mask= Image.open(mask_path).convert("L") ## Lightness

    image=self.transform(image)
    mask=self.transform(mask)

    ##to get accurate mask
    mask= (mask>0.5).float()
    return image,mask


    ##our data set is ready


In [123]:
# Dataloader: It will help us to retrieve data in batches which we will then use
# for model training.

def get_dataloader(image_dir, mask_dir, batch_size=2, shuffle=True):
    dataset = SegmentationDataset(image_dir, mask_dir)  # ← pass the args here
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)



In [124]:
class DoubleConv(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()
    self.conv_op = nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
        nn.ReLU(inplace=True),
    )

  def forward(self, x):
    return self.conv_op(x)


In [125]:
class DownSample(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()
    self.conv= DoubleConv(in_channels,out_channels)
    self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

  def forward(self,x):
    down= self.conv(x)
    p= self.pool(down)

    return down,p



In [126]:
class UpSample(nn.Module):
  def __init__(self, in_channels, out_channels):
    super().__init__()

    self.up= nn.ConvTranspose2d(in_channels, in_channels//2, kernel_size=2, stride=2)
    self.conv=DoubleConv(in_channels, out_channels)

  def forward(self,x1,x2):
      ##Here we are utilizing 2 parts --> previous encoder part and also the upsampling part.
    x1= self.up(x1)
      ##x2--> belongs to the down part that returned by the downsample.

      ## Now concatinating x1, x2
    x = torch.cat([x1,x2],1)
    return self.conv(x)








In [127]:
class UNet(nn.Module):
  ## arranging operations line by line
  def __init__(self, in_channels,num_classes):
    super().__init__()
    self.down_conv_1= DownSample(in_channels,64)
    self.down_conv_2= DownSample(64,128)
    self.down_conv_3= DownSample(128,256)
    self.down_conv_4= DownSample(256,512)

    self.bottle_neck = DoubleConv(512, 1024)

    self.up_conv_1= UpSample(1024,512)
    self.up_conv_2= UpSample(512,256)
    self.up_conv_3= UpSample(256,128)
    self.up_conv_4= UpSample(128,64)

    self.out= nn.Conv2d(in_channels=64, out_channels=num_classes, kernel_size=1)

  def forward(self,x):
    down_1, p1 = self.down_conv_1(x)
    down_2, p2 = self.down_conv_2(p1)
    down_3, p3 = self.down_conv_3(p2)
    down_4, p4 = self.down_conv_4(p3)

    b= self.bottle_neck(p4)

    up_1= self.up_conv_1(b, down_4)
    up_2= self.up_conv_2(up_1,down_3)
    up_3= self.up_conv_3(up_2, down_2)
    up_4= self.up_conv_4(up_3, down_1)

    out= self.out(up_4)
    return out









In [128]:
##loss function
class DiceLoss(nn.Module):
  def __init__(self, smooth=1e-6):   ## smoooth --> because , we cannot get zero division error
    super(DiceLoss, self).__init__()
    self.smooth= smooth

  def forward(self,inputs, targets):
    inputs=inputs.view(-1)
    targets= targets.view(-1)

    intersection = (inputs* target).sum()
    dice_score = (2. * intersection + self. smooth)/ (inputs.sum()+ targets.sum() + self.smooth)


    return 1- dice_score


In [129]:
class BCEWithDiceLoss(nn.Module):
  def __init__ (self, smooth=1e-6):
    super(BCEWithDiceLoss, self).__init__()
    self.bce= nn.BCEWithLogitsLoss()
    self.dice= DiceLoss()

  def forward(self, inputs, targets):
    bce_loss= self.bce(inputs, targets)
    dice_loss= self.dice(inputs, targets)
    return 0.5 * bce_loss + dice_loss

In [130]:
#training loop
def train(model, dataloader, epochs=2, lr=0.01,save_path="unet_model", load_path=None):
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  if load_path and os.path.exists(load_path):
    print(f"Loading model weights from {load_path}")
    model.load_state_dict(torch.load(load_path, map_location=device))
  else:
    print(f"No checkpoint found, training from scratch.")

  print(device)
  model.to(device)

  criterion = BCEWithDiceLoss()
  criterion = nn.BCEWithLogitsLoss()

  optimizer = SGD(model.parameters(), lr=lr)

  for epoch in range(epochs):
    model.train()
    epoch_loss=0

    for images, masks in dataloader:
      images, masks = images.to(device), masks.to(device)
      optimizer.zero_grad()

      output = model(images)

      loss = criterion(output, masks)
      loss.backward()
      optimizer.step()

      epoch_loss += loss.item()

    avg_loss = epoch_loss /len(dataloader)
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}, LR:{lr} ")

    if epoch %10 ==0 and epoch >0:
      torch.save(model.state_dict(), f"{save_path}.pth")

  torch.save(model.state_dict(), f"{save_path}_final.pth")
  print(f"Model Saved to {save_path}")

In [131]:
dataloader = get_dataloader("/content/Human-Segmentation-Dataset/Training_Images", "/content/Human-Segmentation-Dataset/Ground_Truth", batch_size=8, shuffle=True)


In [132]:
model = UNet(in_channels=3, num_classes=1)

In [137]:
train(model, dataloader, epochs=2, lr=0.001)

No checkpoint found, training from scratch.
cuda
Epoch [1/2], Loss: 0.6602, LR:0.001 
Epoch [2/2], Loss: 0.6590, LR:0.001 
Model Saved to unet_model


In [135]:
import numpy as np

# Load model and predict with stats
def predict(model_path, input_image_path):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Load model
    model = UNet(in_channels=3, num_classes=1)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()

    # Track start time
    total_start_time = time.time()

    # Image preprocessing
    preprocess_start_time = time.time()
    image = Image.open(input_image_path).convert("RGB")
    transform = transforms.Compose([
        transforms.Resize((512, 512)),
        transforms.ToTensor(),
    ])
    image_tensor = transform(image).unsqueeze(0).to(device)
    preprocess_end_time = time.time()

    # Model inference
    inference_start_time = time.time()
    with torch.no_grad():
        output = model(image_tensor)
        output = torch.sigmoid(output)
    inference_end_time = time.time()

    # Postprocessing
    postprocess_start_time = time.time()
    mask = output.squeeze(0).squeeze(0).cpu().numpy()
    mask = (mask > 0.4).astype(np.uint8) * 255
    mask_image = Image.fromarray(mask)

    combined = Image.new("RGB", (512 * 2, 512))
    combined.paste(image.resize((512, 512)), (0, 0))
    combined.paste(mask_image.convert("RGB"), (512, 0))
    combined.save("output.jpg")
    postprocess_end_time = time.time()

    # Calculate timing stats
    total_end_time = time.time()

    preprocess_time = preprocess_end_time - preprocess_start_time
    inference_time = inference_end_time - inference_start_time
    postprocess_time = postprocess_end_time - postprocess_start_time
    total_time = total_end_time - total_start_time

    # Print stats
    print("\nPrediction completed! Stats:")
    print(f"  Image Preprocessing Time: {preprocess_time:.4f} seconds")
    print(f"  Model Inference Time: {inference_time:.4f} seconds")
    print(f"  Postprocessing Time: {postprocess_time:.4f} seconds")
    print(f"  Total Prediction Time: {total_time:.4f} seconds")
    print("Prediction saved as output.jpg")



In [136]:
predict(model_path="/content/unet_model_final.pth", input_image_path="/content/Human-Segmentation-Dataset/Training_Images/101.jpg")

Using device: cuda

Prediction completed! Stats:
  Image Preprocessing Time: 0.0100 seconds
  Model Inference Time: 0.0022 seconds
  Postprocessing Time: 0.1153 seconds
  Total Prediction Time: 0.1275 seconds
Prediction saved as output.jpg


In [None]:
predict(model_path="/content/unet_model_80.pth", input_image_path="/content/Human-Segmentation-Dataset/Training_Images/101.jpg")