In [1]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from torchmetrics.functional import accuracy
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms as transform
from torchvision.transforms import InterpolationMode
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from torchmetrics.functional import accuracy
from monai.networks.nets import UNet
from monai.losses import DiceLoss,DiceCELoss ,DiceFocalLoss ,MaskedDiceLoss
from monai.metrics import DiceMetric
import os
import torch.nn.functional as F
from tqdm import tqdm
import tifffile
import random
import cv2
from torchmetrics import Dice

In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"
num_gpus = torch.cuda.device_count()
device , num_gpus


('cuda', 4)

In [3]:
def binary(org_img):
    #print(np.any(org_img))
    if np.any(org_img):
        normalized_img = (org_img - np.min(org_img)) / (np.max(org_img) - np.min(org_img))
        normalized_img[normalized_img < 0.5] = 0
        normalized_img[normalized_img >= 0.5] = 1
        return normalized_img
    else:
        return org_img
def rgb(org_img):
    normalized_img = (org_img - np.min(org_img)) / (np.max(org_img) - np.min(org_img))
    return normalized_img

In [4]:
class classification_dataset(Dataset):
    def __init__(self,image_filenames,mask_filenames,transforms=None):
        self.image_dir = "/scratch/akaniyar/colonoscopy/images/"
        self.mask_dir = "/scratch/akaniyar/colonoscopy/masks/"
        self.transform = transforms
        #self.mask_transform = transforms.Compose([
        self.mask_transform = transform.Compose([transform.ToTensor()])
    
        self.image_filenames = image_filenames
        self.mask_filenames = mask_filenames
        
    def __len__(self):
        return len(self.image_filenames)
    
    
    def __getitem__(self, idx):
        img = tifffile.imread(os.path.join(self.image_dir, self.image_filenames[idx]))
        label = tifffile.imread(os.path.join(self.mask_dir, self.mask_filenames[idx]))
        # print(np.unique(label))
        if len(np.unique(label))>1:
            label = 1
        else :
            label = 0
        #print(np.unique(img),np.unique(label))
        #label = binary(label.astype(np.float32))
        #img = rgb(img)
        if self.transform is not None:
            img, label = self.transform(img), torch.tensor(label).unsqueeze(0)
            
        label = label.to(torch.float32)
        return img, label
        

In [5]:
# Define the file paths
file1_path = '/home/akaniyar/Colonoscopy/data_load_text/np_filenames.txt'
file2_path = '/home/akaniyar/Colonoscopy/data_load_text/np_maskfilenames.txt'
file3_path = '/home/akaniyar/Colonoscopy/data_load_text/wp_filenames.txt'
file4_path = '/home/akaniyar/Colonoscopy/data_load_text/wp_maskfilenames.txt'

# Create empty lists to store image paths
np_filenames = []
np_maskfilenames = []
wp_filenames = []
wp_maskfilenames = []

#Read image paths from file1.txt and append to np_filenames list
with open(file1_path, 'r') as file1:
    for line in file1:
        np_filenames.append(line.strip())

# Read image paths from file2.txt and append to np_maskfilenames list
with open(file2_path, 'r') as file2:
    for line in file2:
        np_maskfilenames.append(line.strip())

# Read image paths from file3.txt and append to wp_filenames list
with open(file3_path, 'r') as file3:
    for line in file3:
        wp_filenames.append(line.strip())

# Read image paths from file4.txt and append to wp_maskfilenames list
with open(file4_path, 'r') as file4:
    for line in file4:
        wp_maskfilenames.append(line.strip())


In [6]:
print(len(np_filenames),len(np_maskfilenames))
print(len(wp_filenames),len(wp_maskfilenames))

15046 15046
3856 3856


In [7]:
wp_train_size = int(1.0*len(wp_filenames))
np_train_size = int(1.0*len(np_filenames))


image_filenames = wp_filenames[:wp_train_size]+np_filenames[:np_train_size] 
mask_filenames = wp_maskfilenames[:wp_train_size] +  np_maskfilenames[:np_train_size] 
print(len(image_filenames),len(mask_filenames))

18902 18902


In [8]:
data_transform = transform.Compose([
            transform.ToPILImage(),
            transform.Resize((128,128),InterpolationMode.BICUBIC),
            transform.ToTensor()
        ])

In [9]:
from sklearn.model_selection import train_test_split

# Splitting the dataset into train and test sets
image_filenames_train, image_filenames_test, mask_filenames_train, mask_filenames_test = train_test_split(
    image_filenames, mask_filenames, test_size=0.2, random_state=42)

# Further splitting the train set into train and validation sets
image_filenames_train, image_filenames_val, mask_filenames_train, mask_filenames_val = train_test_split(
    image_filenames_train, mask_filenames_train, test_size=0.1, random_state=42)

# Creating your segmentation dataset using the split filenames
train_dataset = classification_dataset(image_filenames=image_filenames_train,
                                     mask_filenames=mask_filenames_train,
                                     transforms=data_transform)

val_dataset = classification_dataset(image_filenames=image_filenames_val,
                                   mask_filenames=mask_filenames_val,
                                   transforms=data_transform)

test_dataset = classification_dataset(image_filenames=image_filenames_test,
                                    mask_filenames=mask_filenames_test,
                                    transforms=data_transform)


In [10]:
len(train_dataset),len(val_dataset),len(test_dataset)

(13608, 1513, 3781)

In [11]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True , num_workers = num_gpus)
val_loader = DataLoader(val_dataset, batch_size=32,shuffle=True,num_workers = num_gpus)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True,num_workers = num_gpus)

In [12]:
i,l = train_dataset[10000]
print(i.shape ,l.shape)
print(torch.unique(i) )
print(i.dtype,l.dtype)
# l = (l == 1.0000 )
# l2 = (l==1)
# plt.imshow(i.permute(1,2,0))
# plt.imshow(l.permute(1,2,0),alpha = 0.4,cmap = "gray")


torch.Size([3, 128, 128]) torch.Size([1])
tensor([0.0000, 0.0039, 0.0078, 0.0118, 0.0157, 0.0196, 0.0235, 0.0275, 0.0314,
        0.0353, 0.0392, 0.0431, 0.0471, 0.0510, 0.0549, 0.0588, 0.0627, 0.0667,
        0.0706, 0.0745, 0.0784, 0.0824, 0.0863, 0.0902, 0.0941, 0.0980, 0.1020,
        0.1059, 0.1098, 0.1137, 0.1176, 0.1216, 0.1255, 0.1294, 0.1333, 0.1373,
        0.1412, 0.1451, 0.1490, 0.1529, 0.1569, 0.1608, 0.1647, 0.1686, 0.1725,
        0.1765, 0.1804, 0.1843, 0.1882, 0.1922, 0.1961, 0.2000, 0.2039, 0.2078,
        0.2118, 0.2157, 0.2196, 0.2235, 0.2275, 0.2314, 0.2353, 0.2392, 0.2431,
        0.2471, 0.2510, 0.2549, 0.2588, 0.2627, 0.2667, 0.2706, 0.2745, 0.2784,
        0.2824, 0.2863, 0.2902, 0.2941, 0.2980, 0.3020, 0.3059, 0.3098, 0.3137,
        0.3176, 0.3216, 0.3255, 0.3294, 0.3333, 0.3373, 0.3412, 0.3451, 0.3490,
        0.3529, 0.3569, 0.3608, 0.3647, 0.3686, 0.3725, 0.3765, 0.3804, 0.3843,
        0.3882, 0.3922, 0.3961, 0.4000, 0.4039, 0.4078, 0.4118, 0.4157, 0.4196

In [16]:
# from model import Classification_head
from torch.nn.parallel import DataParallel
# from torchvision.models import swin_b as swin_tiny
from torchvision.models import convnext_base as convb
# from resunet_pretrain import Resenc

In [17]:
from torchsummary import summary

In [18]:
import torch.nn as nn
import torchvision.models
import torch

def convrelu(in_channels, out_channels, kernel, padding):
    return nn.Sequential(
    nn.Conv2d(in_channels, out_channels, kernel, padding=padding),
    nn.ReLU(inplace=True),
  )

class Resenc(nn.Module):
    def __init__(self, n_class):
        super().__init__()

        self.base_model = torchvision.models.resnet18(weights= "DEFAULT")
        self.base_layers = list(self.base_model.children())
        self.layer0 = nn.Sequential(*self.base_layers[:3]) # size=(N, 64, x.H/2, x.W/2)
        
        self.layer1 = nn.Sequential(*self.base_layers[3:5]) # size=(N, 64, x.H/4, x.W/4)

        self.layer2 = self.base_layers[5]  # size=(N, 128, x.H/8, x.W/8)
        self.layer3 = self.base_layers[6]  # size=(N, 256, x.H/16, x.W/16)
        self.layer4 = self.base_layers[7]  # size=(N, 512, x.H/32, x.W/32)
        


        # self.conv_original_size0 = convrelu(3, 64, 3, 1)
        # self.conv_original_size1 = convrelu(64, 64, 3, 1)
        self.average_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, n_class)
        
    def forward(self, input):
        # x_original = self.conv_original_size0(input)
        # x_original = self.conv_original_size1(x_original)

        layer0 = self.layer0(input)
        layer1 = self.layer1(layer0)
        layer2 = self.layer2(layer1)
        layer3 = self.layer3(layer2)
        layer4 = self.layer4(layer3)
        
        class_out = self.average_pool(layer4)
        class_out = class_out.view(class_out.size(0), -1)
        class_out = self.fc(class_out)
        return class_out

In [44]:
# model = Classification_head(n_channels=3,n_classes=1)
model = convb(weights='DEFAULT')
# model.head = nn.Sequential(nn.Linear(in_features=1024, out_features=512, bias=True),
#                            nn.Linear(in_features=512, out_features=256, bias=True),
#                            nn.Linear(in_features=256, out_features=64, bias=True),
#                            nn.Linear(in_features=64, out_features=32, bias=True),
#                            nn.Linear(in_features=32, out_features=1, bias=True))
# model = Resenc(1)
model.classifier[2] = nn.Sequential(nn.Linear(in_features=1024, out_features=1, bias=True))
model = DataParallel(model, device_ids=list(range(num_gpus)), dim=0)
model = model.to("cuda")
# summary(model,(3,256,256))


In [45]:
summary(model,(3,256,256))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 128, 64, 64]           6,272
            Conv2d-2          [-1, 128, 64, 64]           6,272
       LayerNorm2d-3          [-1, 128, 64, 64]             256
       LayerNorm2d-4          [-1, 128, 64, 64]             256
            Conv2d-5          [-1, 128, 64, 64]           6,400
           Permute-6          [-1, 64, 64, 128]               0
            Conv2d-7          [-1, 128, 64, 64]           6,400
         LayerNorm-8          [-1, 64, 64, 128]             256
           Permute-9          [-1, 64, 64, 128]               0
        LayerNorm-10          [-1, 64, 64, 128]             256
           Linear-11          [-1, 64, 64, 512]          66,048
           Linear-12          [-1, 64, 64, 512]          66,048
             GELU-13          [-1, 64, 64, 512]               0
             GELU-14          [-1, 64, 

In [46]:
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(),lr = 0.001)
# dice = Dice(average='micro').to(device)
loss_fn , optimizer

(BCEWithLogitsLoss(),
 Adam (
 Parameter Group 0
     amsgrad: False
     betas: (0.9, 0.999)
     capturable: False
     differentiable: False
     eps: 1e-08
     foreach: None
     fused: None
     lr: 0.001
     maximize: False
     weight_decay: 0
 ))

In [47]:
from train import *

In [None]:
epochs = 30
train(epochs, model, train_loader,val_loader)

  0%|          | 0/30 [00:00<?, ?it/s]

In [None]:
from torchmetrics import Accuracy 
acc = Accuracy(task="binary", num_classes=2).to(device)

In [None]:
def test():
    test_loss = 0
    test_dice_coefficient = []
    test_dice_loss = 0

    model.eval()
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            image, label = data
            image, label = image.to(device), label.to(device)
            output = model(image)
            loss = loss_fn(output, label)

            # calculate dice coefficient and dice loss
            #pred = (output > 0.5).float()
            # label = label.to(torch.int64)
            testdice = acc(output, label)
            # dice_loss = 1 - testdice

            test_loss += loss.item()
            test_dice_coefficient.append(testdice.item())
            # test_dice_loss += dice_loss.item()

        # calculate average test loss, dice coefficient, and dice loss
        #print(len(test_dice_coefficient))
    test_loss /= len(test_loader)
    avg_test_dice_coefficient = sum(test_dice_coefficient)/len(test_dice_coefficient)
    # test_dice_loss /= len(test_loader)

    print(f"Test Loss: {test_loss:.5f} | Accuracy: {avg_test_dice_coefficient:.5f}")


    
    
test()

In [51]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, accuracy_score

def get_roc_auc(model, test_loader, device):
    model.eval()
    y_true = []
    y_scores = []
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            image, label = data
            image, label = image.to(device), label.to(device)
            output = model(image)
            output = torch.sigmoid(output)  # Apply sigmoid activation for probability
            output = output.detach().cpu().numpy().flatten()
            label = label.detach().cpu().numpy().flatten()

            y_scores.extend(output)
            y_true.extend(label)

    fpr, tpr, _ = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)
    print(y_true, y_scores)
#     # Plot ROC curve
#     plt.figure(figsize=(8, 6))
#     plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (AUC = %0.2f)' % roc_auc)
#     plt.plot([0, 1], [0, 1], color='navy', linestyle='--')
#     plt.xlim([0.0, 1.0])
#     plt.ylim([0.0, 1.05])
#     plt.xlabel('False Positive Rate')
#     plt.ylabel('True Positive Rate')
#     plt.title('Receiver Operating Characteristic')
#     plt.legend(loc="lower right")
#     plt.show()

#     # Compute accuracy
#     # y_pred = np.round(y_scores)  # Round probabilities to binary predictions (0 or 1)
#     accuracy = accuracy_score(y_true, y_scores)
#     print('Accuracy: {:.5f}'.format(accuracy))


In [52]:
get_roc_auc(model, test_loader, device)

[1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0,

In [30]:
sample = next(iter(test_loader))
imgs, lbls = sample

actual_imagelabels = lbls[:10].numpy()
actual_imagelabels.squeeze()

array([0., 1., 0., 0., 0., 0., 0., 1., 0., 1.], dtype=float32)

In [31]:

test_output= model(imgs[:10])
x = torch.sigmoid(test_output)>0.5 
x = x.int()

print(f'Prediction labels: {x.cpu().numpy().squeeze()}')
print(f'Actual labels: {actual_imagelabels.squeeze()}')

Prediction labels: [0 0 0 0 0 0 0 0 0 0]
Actual labels: [0. 1. 0. 0. 0. 0. 0. 1. 0. 1.]


In [66]:
model.module.features[7][1].norm1

LayerNorm((1024,), eps=1e-05, elementwise_affine=True)

In [73]:
import warnings
warnings.filterwarnings('ignore')

from torchvision import models
import numpy as np
import cv2
import requests
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image, deprocess_image, preprocess_image
from PIL import Image
import matplotlib.pyplot as plt

#model = models.resnet18(pretrained=True)
model.eval()
import os
path = "/scratch/akaniyar/colonoscopy/images/"
x=[]
files = os.listdir(path)[:10]
for f in files :
    if os.path.isfile(path+'/'+f):
        x.append(f)

# print(ClassifierOutputTarget(0))
targets = [ClassifierOutputTarget(0)]
target_layers = [model.module.features]
y=0
for i in x:
    img = cv2.imread(path+i)
    #print(img)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (224, 224))
    img = np.float32(img)/np.max(img)
    #print(img)
    input_tensor = preprocess_image(img, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    #print(input_tensor.shape)
    with GradCAM(model=model, target_layers=target_layers) as cam:
        grayscale_cams = cam(input_tensor=input_tensor, targets=targets)
        cam_image = show_cam_on_image(img, grayscale_cams[0, :], use_rgb=True)
        cam = np.uint8(255*grayscale_cams[0, :])
        cam = cv2.merge([cam, cam, cam])
        images = np.hstack((np.uint8(255*img),cam_image))
        x = Image.fromarray(images)
        x.save(f"/home/akaniyar/Colonoscopy/grad_cam/{i}.png")