In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
import pandas as pd
import torch.nn.functional as F
import cv2
from IPython.display import clear_output
from progressbar import progressbar as pro

In [0]:
class LoadData(torch.utils.data.Dataset):
    
    def __init__(self, csv_file, root_dir):
        self.aptos_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        
    def __len__(self):
        return len(self.aptos_frame)
    
    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir,
                                self.aptos_frame.iloc[idx, 0])
        image = cv2.imread(img_name + ".png")
        image = cv2.resize(image, (254,254))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        image = clahe.apply(image)
        #image = apply_entropy(image)
        image = image[np.newaxis,:,:]
        diagnosis = self.aptos_frame.iloc[idx, 1]
        diagnosis = np.array([diagnosis])

        sample = {'image': image, 'diagnosis': diagnosis}
        
        return sample


In [0]:
csv_file = "/content/gdrive/My Drive/modtrain.csv"
root_dir = "/content/gdrive/My Drive/train"

In [0]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [0]:
train_data = LoadData(csv_file, root_dir)
dataloaders = torch.utils.data.DataLoader(train_data, batch_size=64)

In [0]:
from PIL import Image
import torch
import torchvision.transforms as transforms
import os
import matplotlib.cm as mpl_color_map
import copy
# from misc_functions import get_example_params, save_class_activation_images


class CamExtractor():
    """
        Extracts cam features from the model
    """
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None

    def save_gradient(self, grad):
        self.gradients = grad

    def forward_pass_on_convolutions(self, x):
        """
            Does a forward pass on convolutions, hooks the function at given layer
        """
        conv_output = None
        for module_pos, module in self.model.features._modules.items():
            x = module(x)  # Forward
            if int(module_pos) == self.target_layer:
                x.register_hook(self.save_gradient)
                conv_output = x  # Save the convolution output on that layer
        return conv_output, x

    def forward_pass(self, x):
        """
            Does a full forward pass on the model
        """


        # Forward pass on the convolutions
        conv_output, x = self.forward_pass_on_convolutions(x)
        x = x.view(x.size(0), -1)  # Flatten
        # Forward pass on the classifier

        x = self.model.classifier(x)
        return conv_output, x


class GradCam():
    """
        Produces class activation map
    """
    def __init__(self, model, target_layer):
        self.model = model
        self.model.eval()
        # Define extractor
        self.extractor = CamExtractor(self.model, target_layer)

    def generate_cam(self, input_image, target_class=None):
        # Full forward pass
        # conv_output is the output of convolutions at specified layer
        # model_output is the final output of the model (1, 1000)
        conv_output, model_output = self.extractor.forward_pass(input_image)
        if target_class is None:
            target_class = np.argmax(model_output.data.numpy())
        # Target for backprop
        one_hot_output = torch.FloatTensor(1, model_output.size()[-1]).zero_().to(device)
        one_hot_output[0][target_class] = 1
        # Zero grads
        self.model.features.zero_grad()
        self.model.classifier.zero_grad()
        # Backward pass with specified target
        model_output.backward(gradient=one_hot_output, retain_graph=True)
        # Get hooked gradients
        guided_gradients = self.extractor.gradients.data.cpu().numpy()[0]
        # Get convolution outputs
        target = conv_output.data.cpu().numpy()[0]
        # Get weights from gradients
        weights = np.mean(guided_gradients, axis=(1, 2))  # Take averages for each gradient
        # Create empty numpy array for cam
        cam = np.ones(target.shape[1:], dtype=np.float32)
        # Multiply each weight with its conv output and then, sum
        for i, w in enumerate(weights):
            cam += w * target[i, :, :]
        cam = np.maximum(cam, 0)
        cam = (cam - np.min(cam)) / (np.max(cam) - np.min(cam))  # Normalize between 0-1
        cam = np.uint8(cam * 255)  # Scale between 0-255 to visualize
        cam = np.uint8(Image.fromarray(cam).resize((input_image.shape[2],
                       input_image.shape[3]), Image.ANTIALIAS))/255
        # ^ I am extremely unhappy with this line. Originally resizing was done in cv2 which
        # supports resizing numpy matrices with antialiasing, however,
        # when I moved the repository to PIL, this option was out of the window.
        # So, in order to use resizing with ANTIALIAS feature of PIL,
        # I briefly convert matrix to PIL image and then back.
        # If there is a more beautiful way, do not hesitate to send a PR.
        return cam

def preprocess_image(img):
    img = cv2.resize(img, (224, 224))
    trans = transforms.Compose([transforms.ToTensor(), transforms.Normalize(
                                                              mean=[0.456],
                                                              std= [0.225])])
    img = trans(img)
    img.unsqueeze_(0)
    return img

def save_class_activation_images(org_img, activation_map, file_name, epo):
    """
        Saves cam activation map and activation map on the original image
    Args:
        org_img (PIL img): Original image
        activation_map (numpy arr): Activation map (grayscale) 0-255
        file_name (str): File name of the exported image
    """
    if not os.path.exists('/content/gdrive/My Drive/models/results'):
        os.makedirs('/content/gdrive/My Drive/models/results')
    # Grayscale activation map
    heatmap, heatmap_on_image = apply_colormap_on_image(org_img, activation_map, 'hsv')
    
    # Save heatmap on iamge
    path_to_file = os.path.join('/content/gdrive/My Drive/models/results', file_name+f'_{epo}_epoch.png')
    save_image(heatmap_on_image, path_to_file)

def apply_colormap_on_image(org_im, activation, colormap_name):
    """
        Apply heatmap on image
    Args:
        org_img (PIL img): Original image
        activation_map (numpy arr): Activation map (grayscale) 0-255
        colormap_name (str): Name of the colormap
    """
    # Get colormap
    org_im = Image.fromarray(org_im).resize((224,224))

    color_map = mpl_color_map.get_cmap(colormap_name)
    no_trans_heatmap = color_map(activation)
    # Change alpha channel in colormap to make sure original image is displayed
    heatmap = copy.copy(no_trans_heatmap)
    heatmap[:, :, 3] = 0.4
    heatmap = Image.fromarray((heatmap*255).astype(np.uint8))
    no_trans_heatmap = Image.fromarray((no_trans_heatmap*255).astype(np.uint8))

    # Apply heatmap on iamge
    heatmap_on_image = Image.new("RGBA", org_im.size)
    
    heatmap_on_image = Image.alpha_composite(heatmap_on_image, org_im.convert('RGBA'))

    heatmap_on_image = Image.alpha_composite(heatmap_on_image, heatmap)
    return no_trans_heatmap, heatmap_on_image

def save_image(im, path):
    """
        Saves a numpy matrix or PIL image as an image
    Args:
        im_as_arr (Numpy array): Matrix of shape DxWxH
        path (str): Path to the image
    """
    if isinstance(im, (np.ndarray, np.generic)):
        im = format_np_output(im)
        im = Image.fromarray(im)
    im.save(path)

def format_np_output(np_arr):
    """
        This is a (kind of) bandaid fix to streamline saving procedure.
        It converts all the outputs to the same format which is 3xWxH
        with using sucecssive if clauses.
    Args:
        im_as_arr (Numpy array): Matrix of shape 1xWxH or WxH or 3xWxH
    """
    # Phase/Case 1: The np arr only has 2 dimensions
    # Result: Add a dimension at the beginning
    if len(np_arr.shape) == 2:
        np_arr = np.expand_dims(np_arr, axis=0)
    # Phase/Case 2: Np arr has only 1 channel (assuming first dim is channel)
    # Result: Repeat first channel and convert 1xWxH to 3xWxH
    if np_arr.shape[0] == 1:
        np_arr = np.repeat(np_arr, 3, axis=0)
    # Phase/Case 3: Np arr is of shape 3xWxH
    # Result: Convert it to WxHx3 in order to make it saveable by PIL
    if np_arr.shape[0] == 3:
        np_arr = np_arr.transpose(1, 2, 0)
    # Phase/Case 4: NP arr is normalized between 0-1
    # Result: Multiply with 255 and change type to make it saveable by PIL
    if np.max(np_arr) <= 1:
        np_arr = (np_arr*255).astype(np.uint8)
    return np_arr

In [0]:
def train_model(model, criterion, optimizer, num_epochs=25):
    since = time.time()
    model.to(device)
    model.train()
    overall_acc ={}
    
    for j in range(num_epochs):
      train_loss = 0
      correct = 0
      acc_data_train = []
      train_losses = []
      for i, sample_batched in enumerate(dataloaders,1):
          clear_output()
          print(f"{i} of {len(dataloaders)}")

          inputs = sample_batched['image'].to(device)
          labels = sample_batched['diagnosis'].squeeze(1).to(device)

          # zero the parameter gradients
          optimizer.zero_grad()

          # forward + backward + optimize
          outputs = model(inputs.float())

          loss = criterion(F.log_softmax(outputs, dim=1), labels)
          loss.backward()
          optimizer.step()
          train_loss+= loss.item()
          pred = outputs.data.max(1, keepdim=True)[1].int()
          labels = labels.int()
          #print(labels)
          correct += pred.eq(labels.data.view_as(pred)).sum()       



      accuracy = float(100*float(correct)/3662)

      acc_data_train.append([accuracy])
      train_loss=float(train_loss)/float(i)
      train_losses.append([optimizer, lr,w,train_loss])
      # print statistics
      print('Train Epoch:{}  Accuracy: ({}/{}) {:.2f}%   Average Loss: {:.2f} \n'.
            format(j, correct, 3662, accuracy, train_loss))
      overall_acc[f'epoch_{j}'] = {}
      overall_acc[f'epoch_{j}']['acc'] = accuracy
      overall_acc[f'epoch_{j}']['loss'] = train_loss
      
      for layer in [2,8,13,20]:
      
        if __name__ == '__main__':
          # Get params
          target_example = 0  # Snake
          #(original_image, prep_img, target_class, file_name_to_export, pretrained_model) =\
              #torch.load("alexnet_model.pth")
          # Grad cam

          original_image = cv2.imread("/content/gdrive/My Drive/train/ff0740cb484a.png", 0)

          # prep_img
          prep_img = preprocess_image(original_image).to(device)
          target_class = 0

          grad_cam = GradCam(model, target_layer=layer)
          # Generate cam mask
          cam = grad_cam.generate_cam(prep_img, target_class)
          # Save mask

          save_class_activation_images(original_image, cam, f"grad_entropy_cam_layer_{layer}",j)

    return model, overall_acc

In [0]:
model_ft = models.vgg11(pretrained=True, progress=True)
#model_ft

In [0]:
for param in model_ft.parameters():
    param.requires_grad = False
#Adjust Model can read gray scale images
model_ft.features[0] = nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1,1), bias=False)
#Adjust Output
model_ft.classifier[6] = nn.Linear(in_features=4096, out_features=2, bias=True)

In [0]:
num_ftrs = model_ft.classifier[6].in_features
lr= 0.0003
w = 6.7752e-06
optimizer_ft = optim.Adam(model_ft.parameters(), lr=lr,weight_decay=w, amsgrad=True)
criterion = nn.CrossEntropyLoss()

In [0]:
model_ft, overall_acc = train_model(model_ft, criterion, optimizer_ft,
                       num_epochs=25)
torch.save(model_ft, "/content/gdrive/My Drive/models/vgg_11_filter_entropy.pth")
import pickle
f = open("/content/gdrive/My Drive/models/results_entropy_dict.pkl","wb")
pickle.dump(overall_acc,f)
f.close()

58 of 58
Train Epoch:24  Accuracy: (3456/3662) 94.37%   Average Loss: 0.16 



In [0]:
import torch
import pandas as pd
from sklearn.model_selection import train_test_split

In [0]:
csv_file = "/content/gdrive/My Drive/models/validation_01.csv"
root_dir = "/content/gdrive/My Drive/train"

In [0]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [0]:
train_data = LoadData(csv_file, root_dir)
dataloaders = torch.utils.data.DataLoader(train_data, batch_size=1)

In [0]:
labels = []
model.to(device)
for data in dataloaders:
  labels.append(model(data['image'].to(device).float()))

In [0]:
data = pd.read_csv(csv_file)

In [0]:
labels = []
model.to(device)
for id_ in data.id_code:
  image = cv2.imread(root_dir+"/"+id_+".png")
  image = cv2.resize(image, (254,254))
  image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
  image = clahe.apply(image)
  #image = apply_entropy(image)
  image = image[np.newaxis,:,:]
  image = torch.Tensor(image).to(device)
  labels.append(model(image.float()))  