In [None]:
!curl https://warwick.ac.uk/fac/sci/dcs/research/tia/glascontest/download/warwick_qu_dataset_released_2016_07_08.zip --output warwick_qu_dataset_released_2016_07_08.zip

In [None]:
!ls

In [None]:
!unzip  warwick_qu_dataset_released_2016_07_08.zip

In [None]:
!ls


In [None]:
cd 'Warwick QU Dataset (Released 2016_07_08)'

In [None]:
import pandas as pd
from PIL import Image


In [None]:
Image.open('train_2.bmp')

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.pyplot import imread,imshow
from PIL import Image
import torch
from skimage import io, transform
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


In [None]:
data=pd.read_csv('Grade.csv')
len(data)

In [None]:
class Train_dataset(Dataset):
    def __init__(self,csv_path,transform_img):
        self.data=pd.read_csv(csv_path)
        self.train_data=self.data[self.data.name.str.match('train*')]
        self.transform_img=transform_img
    def __len__(self):
        return len(self.train_data)

    def __getitem__(self,index):
        image_name=self.train_data.name.iloc[index]+'.bmp'
        image=Image.open(image_name)
        image=self.transform_img(image)
        label_name=self.train_data[' grade (GlaS)'].iloc[0]
        label=  1 if label_name == " malignant" else 0
        return [image,label]
    

In [None]:
class Valid_dataset(Dataset):
    def __init__(self,csv_path,transform_img):
        self.data=pd.read_csv(csv_path)
        self.train_data=self.data[self.data.name.str.match('testA*')]
        self.transform_img=transform_img
    
    def __len__(self):
        return len(self.train_data)

    def __getitem__(self, index):
        image_name=self.train_data.name.iloc[index]+'.bmp'
        image=Image.open(image_name)
        image=self.transform_img(image)
        label_name=self.train_data[' grade (GlaS)'].iloc[0]
        label=  1 if label_name == " malignant" else 0
        return [image,label]
    

In [None]:
class Test_dataset(Dataset):
    def __init__(self,csv_path,transform_img):
        self.data=pd.read_csv(csv_path)
        self.train_data=self.data[self.data.name.str.match('testB*')]
        self.transform_img=transform_img
    
    def __len__(self):
        return len(self.train_data)

    def __getitem__(self, index):
        image_name=self.train_data.name.iloc[index]+'.bmp'
        image=Image.open(image_name)
        image=self.transform_img(image)
        label_name=self.train_data[' grade (GlaS)'].iloc[0]
        label=  1 if label_name == " malignant" else 0
        return [image,label]
    


In [None]:
from torchvision import transforms

In [None]:
transform_img=transforms.Compose([
    transforms.Resize((250,250)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.ColorJitter()
    transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
])

In [None]:
transform_test=transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
])

In [None]:
train_dataset=Train_dataset('Grade.csv',transform_img)
valid_dataset=Valid_dataset('Grade.csv',transform_img)
test_dataset=Test_dataset('Grade.csv',transform_test)

In [None]:
params = {'batch_size': 1,'shuffle': True}
max_epochs = 10

training_generator = DataLoader(train_dataset, **params)
valid_generator = DataLoader(valid_dataset, **params)
test_generator = DataLoader(test_dataset, **params)

In [None]:

#for local_batch, local_labels in valid_generator:
        

In [None]:
import torchvision.models as models


In [None]:
resnet18 = models.resnet18(pretrained=True)


In [None]:
import torch.nn as nn

In [None]:
resnet18.fc=nn.Linear(in_features=512, out_features=2, bias=True)

In [None]:
model=resnet18

In [None]:
model=model.to(device)

In [None]:
for name,param in model.named_parameters():
    if param.requires_grad == True:
        print(name)

In [None]:
#for name,param in list(model.named_parameters())[-8:]:
#    param.requires_grad = False

In [None]:
#for name,param in model.named_parameters():
#    if param.requires_grad == True:
#        print(name)

In [None]:
params_to_update = []
for name,param in model.named_parameters():
    if param.requires_grad == True:
        params_to_update.append(param)

In [None]:
import torch.optim as optim

In [None]:
import time
import copy


In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    since = time.time()

    val_loss_history = []
    train_loss_history= []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss_so_far = 9999999999
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train','val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()


            print('{} Loss: {:.4f} '.format(phase, loss))

            # deep copy the model
            if phase == 'val' and loss < best_loss_so_far:
                best_loss_so_far=loss
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_loss_history.append(loss)
            if phase =='train':
                with torch.no_grad(): 
                    train_loss_history.append(loss)
                

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best loss in validation set : {:4f}'.format(best_loss_so_far))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_loss_history,train_loss_history

In [None]:
dataloaders={'train':training_generator,'val':valid_generator}
optimizer = optim.Adam(params_to_update,lr=1e-7)
criterion = nn.CrossEntropyLoss()

num_epochs=10
model_classification,v_hist,t_hist=train_model(model, dataloaders, criterion, optimizer, num_epochs=num_epochs)

In [None]:
vhist = []

vhist = [h.cpu().numpy() for h in v_hist]
thist = []

thist = [h.cpu().detach().numpy() for h in t_hist]

plt.title("Loss vs. Number of Training Epochs")
plt.xlabel("Training Epochs")
plt.ylabel("Validation Loss")
plt.plot(range(1,num_epochs+1),vhist,label="Validation loss ")
plt.plot(range(1,num_epochs+1),thist,label="Train loss")

plt.xticks(np.arange(1, num_epochs+1, 1.0))
plt.legend()
plt.show()

In [None]:
#segmentation

In [None]:
import torch
import torch.nn as nn
from torchvision import models

def convrelu(in_channels, out_channels, kernel, padding):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel, padding=padding),
        nn.ReLU(inplace=True),
    )


class ResNetUNet(nn.Module):
    def __init__(self, n_class):
        super().__init__()

        self.base_model = models.resnet18(pretrained=True)
        self.base_layers = list(self.base_model.children())

        self.layer0 = nn.Sequential(*self.base_layers[:3]) # size=(N, 64, x.H/2, x.W/2)
        self.layer0_1x1 = convrelu(64, 64, 1, 0)
        self.layer1 = nn.Sequential(*self.base_layers[3:5]) # size=(N, 64, x.H/4, x.W/4)
        self.layer1_1x1 = convrelu(64, 64, 1, 0)
        self.layer2 = self.base_layers[5]  # size=(N, 128, x.H/8, x.W/8)
        self.layer2_1x1 = convrelu(128, 128, 1, 0)
        self.layer3 = self.base_layers[6]  # size=(N, 256, x.H/16, x.W/16)
        self.layer3_1x1 = convrelu(256, 256, 1, 0)
        self.layer4 = self.base_layers[7]  # size=(N, 512, x.H/32, x.W/32)
        self.layer4_1x1 = convrelu(512, 512, 1, 0)

        self.upsample = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)

        self.conv_up3 = convrelu(256 + 512, 512, 3, 1)
        self.conv_up2 = convrelu(128 + 512, 256, 3, 1)
        self.conv_up1 = convrelu(64 + 256, 256, 3, 1)
        self.conv_up0 = convrelu(64 + 256, 128, 3, 1)

        self.conv_original_size0 = convrelu(3, 64, 3, 1)
        self.conv_original_size1 = convrelu(64, 64, 3, 1)
        self.conv_original_size2 = convrelu(64 + 128, 64, 3, 1)

        self.conv_last = nn.Conv2d(64, n_class, 1)

    def forward(self, input):
        x_original = self.conv_original_size0(input)
        x_original = self.conv_original_size1(x_original)

        layer0 = self.layer0(input)
        layer1 = self.layer1(layer0)
        layer2 = self.layer2(layer1)
        layer3 = self.layer3(layer2)
        layer4 = self.layer4(layer3)

        layer4 = self.layer4_1x1(layer4)
        x = self.upsample(layer4)
        layer3 = self.layer3_1x1(layer3)
        x = torch.cat([x, layer3], dim=1)
        x = self.conv_up3(x)

        x = self.upsample(x)
        layer2 = self.layer2_1x1(layer2)
        x = torch.cat([x, layer2], dim=1)
        x = self.conv_up2(x)

        x = self.upsample(x)
        layer1 = self.layer1_1x1(layer1)
        x = torch.cat([x, layer1], dim=1)
        x = self.conv_up1(x)

        x = self.upsample(x)
        layer0 = self.layer0_1x1(layer0)
        x = torch.cat([x, layer0], dim=1)
        x = self.conv_up0(x)

        x = self.upsample(x)
        x = torch.cat([x, x_original], dim=1)
        x = self.conv_original_size2(x)

        out = self.conv_last(x)

        return out

In [None]:
def dice_loss(pred, target):
    smooth = 1.

    # have to use contiguous since they may from a torch.view op
    iflat = pred.contiguous().view(-1)
    tflat = target.contiguous().view(-1)
    intersection = (iflat * tflat).sum()

    A_sum = torch.sum(iflat * iflat)
    B_sum = torch.sum(tflat * tflat)
    
    return 1 - ((2. * intersection + smooth) / (A_sum + B_sum + smooth) )

        

In [None]:
model_seg = ResNetUNet(n_class=1)
model_seg=model_seg.to(device)

In [None]:
class my_sigmoid_layer(nn.Module):
    def __init__(self,):
        super(my_sigmoid_layer,self).__init__()
    def forward(self,input):
        return F.sigmoid(input) 
    
last_layer=my_sigmoid_layer()   
model_seg = nn.Sequential(model_seg,last_layer)

In [None]:
class Train_dataset_seg(Dataset):
    def __init__(self,csv_path,transform_img,transform_target):
        self.data=pd.read_csv(csv_path)
        self.train_data=self.data[self.data.name.str.match('train*')]
        self.transform_img=transform_img
        self.transform_target=transform_target
    def __len__(self):
        return len(self.train_data)

    def __getitem__(self,index):
        image_name=self.train_data.name.iloc[index]
        image=Image.open(image_name+'.bmp')
        image=self.transform_img(image)
        label=Image.open(image_name+'_anno.bmp')
        label=self.transform_target(label)
        t = torch.Tensor([0]) # threshold
        out = (label > t).float() * 1
        return [image,out]

In [None]:
class Valid_dataset_seg(Dataset):
    def __init__(self,csv_path,transform_img,transform_target):
        self.data=pd.read_csv(csv_path)
        self.train_data=self.data[self.data.name.str.match('testA*')]
        self.transform_img=transform_img
        self.transform_target=transform_target
    def __len__(self):
        return len(self.train_data)

    def __getitem__(self,index):
        image_name=self.train_data.name.iloc[index]
        image=Image.open(image_name+'.bmp')
        image=self.transform_img(image)
        label=Image.open(image_name+'_anno.bmp')
        label=self.transform_target(label)
        t = torch.Tensor([0])  # threshold
        out = (label > t).float() * 1
        return [image,out]

In [None]:
transform_img=transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
])
transform_target=transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
])

In [None]:
train_dataset=Train_dataset_seg('Grade.csv',transform_img,transform_target)
valid_dataset=Valid_dataset_seg('Grade.csv',transform_img,transform_target)

params = {'batch_size': 2,'shuffle': True}

training_generator = DataLoader(train_dataset, **params)
valid_generator = DataLoader(valid_dataset, **params)


In [None]:
pip install torchsummary

In [None]:
from torchsummary import summary
summary(model_seg, input_size=(3, 224, 224))

In [None]:
def train_model_seg(model, dataloaders, criterion, optimizer, num_epochs=25):
    since = time.time()

    val_loss_history = []
    train_loss_history= []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss_so_far = 9999999999
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train','val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    #outputs=F.log_softmax(outputs)
                    outputs = m(outputs)
                    bceLoss=nn.BCELoss()
                    loss = criterion(outputs, labels)+bceLoss(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()


            print('{} Loss: {:.4f} '.format(phase, loss))

            # deep copy the model
            if phase == 'val' and loss < best_loss_so_far:
                best_loss_so_far=loss
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_loss_history.append(loss)
            if phase =='train':
                with torch.no_grad(): 
                    train_loss_history.append(loss)
                

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best loss in validation set : {:4f}'.format(best_loss_so_far))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_loss_history,train_loss_history

In [None]:
params_to_update = []
for name,param in model_seg.named_parameters():
    if param.requires_grad == True:
        params_to_update.append(param)

In [None]:
dataloaders={'train':training_generator,'val':valid_generator}
optimizer = optim.Adam(params_to_update,lr=1e-4)
criterion = dice_loss # + bce loss in model_seg loss calculation
num_epochs=50
model_best,v_hist,t_hist=train_model_seg(model_seg, dataloaders, criterion, optimizer, num_epochs=num_epochs)

In [None]:

for inputs, labels in valid_generator:
    inputs=inputs.to(device)
    
    output=model_best(inputs).cpu()
    #m = nn.Sigmoid()
    #output = m(output)
    #output=output*255
    #t = torch.Tensor([0])  # threshold
    #out = (output[0] > t).float() * 255
    img = np.transpose(output[0], (1,2,0))
    img=img.squeeze()
    plt.imshow(img)
    plt.show()
    break

#annotation on validation set

In [None]:
output[0]

In [None]:
vhist = []

vhist = [h.cpu().numpy() for h in v_hist]
thist = []

thist = [h.cpu().detach().numpy() for h in t_hist]

plt.title("Loss vs. Number of Training Epochs")
plt.xlabel("Training Epochs")
plt.ylabel("Validation Loss")
plt.plot(range(1,num_epochs+1),vhist,label="Validation loss ")
plt.plot(range(1,num_epochs+1),thist,label="Train loss")

plt.xticks(np.arange(1, num_epochs+1, 1.0))
plt.legend()
plt.show()

In [None]:
Image.open('testA_5.bmp')

In [None]:
#original annotation  for testA_5
image_name='testA_5_anno'
image=Image.open(image_name+'.bmp')
label=transform_target(image)
t = torch.Tensor([0])  # threshold
out = (label > t).float() * 255
img = np.transpose(out, (1,2,0))
img=img.squeeze()
plt.imshow(img)
plt.show()

In [None]:
image_name='testA_5'
image=Image.open(image_name+'.bmp')
image=transform_img(image)
image=image.unsqueeze(0)
image=image.to(device)
output=model_best(image)
output=output.cpu()
#m = nn.Sigmoid()
#output = m(output)
#output=output*255
#t = torch.Tensor([0])  # threshold
#out = (output[0] > t).float() * 255
img = np.transpose(output[0], (1,2,0))
img=img.squeeze()
plt.imshow(img)
plt.show()
#predicted testA_5 annotation