In [1]:
#import
import glob
from torch.utils.data import Dataset,DataLoader
import matplotlib.pyplot as plt
import cv2
import pandas as pd
import numpy as np
from transformers import SamProcessor,SamModel
import monai
from datasets import Dataset as dictToDataset

  from .autonotebook import tqdm as notebook_tqdm
2024-02-16 04:44:56.998566: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
images_data_path = "dataset/images"
mask_data_path = "dataset/labels_colored"
labels_path = "dataset/labels_class_dict.csv"

In [3]:
labels = pd.read_csv(labels_path)
labels = labels.loc[labels['class_names'] == 'foreground'].values
to_remove = np.delete(labels,0).astype(np.uint8)

In [4]:
labels = pd.read_csv(labels_path)
input = labels.to_dict()
hashmap = {}

for category,r,g,b in zip(input['class_names'].values(),input['r'].values(),input['g'].values(),input['b'].values()):
    hashmap[(r,g,b)] = category

In [5]:
from PIL import Image
images_path = []
mask_path = []

for path in glob.glob(images_data_path + '/*'):
    images_path.append(path)
    
for path in glob.glob(mask_data_path + '/*'):
    mask_path.append(path)
    
images_path.sort()
mask_path.sort()

dataset_dict = {
    "image":[Image.open(path) for path in images_path ],
    "label":[Image.open(path) for path in mask_path]
}
dataset = dictToDataset.from_dict(dataset_dict)

In [6]:
def get_bounding_box(ground_truth_mask):
    y_indices, x_indices = np.where(ground_truth_mask > 0)
    x_min, x_max = np.min(x_indices), np.max(x_indices)
    y_min, y_max = np.min(y_indices), np.max(y_indices)
    # add perturbation to bounding box coordinates
    H, W = ground_truth_mask.shape
    x_min = max(0, x_min - np.random.randint(0, 20))
    x_max = min(W, x_max + np.random.randint(0, 20))
    y_min = max(0, y_min - np.random.randint(0, 20))
    y_max = min(H, y_max + np.random.randint(0, 20))
    bbox = [x_min, y_min, x_max, y_max]

    return bbox
    

In [7]:
class DataSetLoader(Dataset):
    def __init__(self,dataset,processor):
        self.dataset = dataset
        self.processor = processor
    
    def __len__(self):
        return len(self.dataset["image"])

    def __getitem__(self,idx):
        item = self.dataset[idx]
        
        img = item["image"]
        
        mask = np.array(item["label"]) 
        remove = np.all(mask == (44,222,180),axis=-1)
                
        mask[remove] = [255,255,255]
        mask[~remove] = [0,0,0]
        
        mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY) 
        prompt = get_bounding_box(mask)
        
        inputs = self.processor(img,input_boxes=[[prompt]],return_tensors='pt')
        inputs = {k:v.squeeze(0) for k,v in inputs.items()}
        inputs["ground_truth_mask"] = mask
        
        return inputs

In [8]:
processor = SamProcessor.from_pretrained("facebook/sam-vit-base")
train_dataset = DataSetLoader(dataset=dataset,processor=processor)
train_dataloader = DataLoader(train_dataset,batch_size=2,shuffle=True,drop_last=False)

In [9]:
model = SamModel.from_pretrained("facebook/sam-vit-base")
for name,param in model.named_parameters():
    if name.startswith("vision_encoder") or name.startswith("prompt_encoder"):
        param.requires_grad_(False)

In [10]:
from torch.optim import Adam
optimizer = Adam(model.parameters(),lr=1e-5,weight_decay=0)
seg_loss = monai.losses.DiceCELoss(sigmoid=True, squared_pred=True, reduction='mean')

In [11]:
from tqdm import tqdm
import math
epochs = 1
model.to("cuda")
model.train()

for epoch in range(epochs):
    epoch_losses = []
    for batch in tqdm(train_dataloader):
        outputs = model(pixel_values=batch["pixel_values"].to("cuda"),
                        input_boxes=batch["input_boxes"].to("cuda"),
                        multimask_output=False)
        
        predicted_masks = outputs.pred_masks.squeeze(1)
        ground_truth_masks = batch["ground_truth_mask"].float().to("cuda")
        loss = seg_loss(predicted_masks,ground_truth_masks.unsqueeze(1))
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_losses.append(loss.item())
    
    print(f"For Epoch: {epoch} Epoch loss is: {math.mean(epoch_losses)}")
    

RuntimeError: No CUDA GPUs are available