In [None]:
import torch

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    numpy.random.seed(worker_seed)
    random.seed(worker_seed)

g = torch.Generator()
g.manual_seed(2)

In [None]:
import cv2
import nibabel as nib
from scipy import ndimage
import random
from get_lungs import get_lungs_for_lesion


def read_nifti_file(filepath):
    """Read and load volume"""
    # Read file
    scan = nib.load(filepath)
    # Get raw data
    scan = scan.get_fdata()        
        
    return scan


def normalize_nift(volume):
    """Normalize the volume"""
    min = -1250
    max = 250
    volume[volume < min] = min
    volume[volume > max] = max
    volume = (volume - min) / (max - min)
    volume = (volume*255).astype(np.uint8)
    return volume


def resize_volume_nift(img):
    """Resize across z-axis"""
    # Set the desired depth
    desired_depth = img.shape[-1]
    #desired_depth = 16
    desired_width = 512
    desired_height = 512
    # Get current depth
    current_depth = img.shape[-1]
    current_width = img.shape[0]
    current_height = img.shape[1]
    # Compute depth factor
    depth = current_depth / desired_depth
    width = current_width / desired_width
    height = current_height / desired_height
    depth_factor = 1 / depth
    width_factor = 1 / width
    height_factor = 1 / height
    # Rotate
    img = ndimage.rotate(img, 90, reshape=False)
    # Resize across z-axis
    img = ndimage.zoom(img, (width_factor, height_factor, depth_factor), order=1)
        
    new_img = []
    for i in range(0, np.shape(img)[2]):

        new_img.append(cv2.cvtColor(img[:,:,i].astype(np.uint8), cv2.COLOR_GRAY2RGB))
      
    return np.array(new_img)


def process_scan(path):
    """Read and resize volume"""
    # Read scan
    volume = read_nifti_file(path)
    # Normalize
    volume = normalize_nift(volume)
    # Resize width, height and depth
    volume = resize_volume_nift(volume)
    # get only lungs
    return volume

import os
import numpy as np

normal_path = '/media/riccelli/Disco 2/datasets_covid/mosmed/mosmed/MosMedData Chest CT Scans with COVID-19 Related Findings COVID19_1110 1.0/studies/Normal'
ct1_path = '/media/riccelli/Disco 2/datasets_covid/mosmed/mosmed/MosMedData Chest CT Scans with COVID-19 Related Findings COVID19_1110 1.0/studies/CT-1'
ct2_path = '/media/riccelli/Disco 2/datasets_covid/mosmed/mosmed/MosMedData Chest CT Scans with COVID-19 Related Findings COVID19_1110 1.0/studies/CT-2'
ct3_path = '/media/riccelli/Disco 2/datasets_covid/mosmed/mosmed/MosMedData Chest CT Scans with COVID-19 Related Findings COVID19_1110 1.0/studies/CT-3'
ct4_path = '/media/riccelli/Disco 2/datasets_covid/mosmed/mosmed/MosMedData Chest CT Scans with COVID-19 Related Findings COVID19_1110 1.0/studies/CT-4'

normal_scan_paths = [
    os.path.join(os.getcwd(), normal_path, x)
    for x in os.listdir(normal_path)
]
# Folder "CT-23" consist of CT scans having several ground-glass opacifications,
# involvement of lung parenchyma.
ct1_scan_paths = [
    os.path.join(os.getcwd(), ct1_path, x)
    for x in os.listdir(ct1_path)
]

ct2_scan_paths = [
    os.path.join(os.getcwd(), ct2_path, x)
    for x in os.listdir(ct2_path)
]

ct3_scan_paths = [
    os.path.join(os.getcwd(), ct3_path, x)
    for x in os.listdir(ct3_path)
]

ct4_scan_paths = [
    os.path.join(os.getcwd(), ct4_path, x)
    for x in os.listdir(ct4_path)
]

print("CT scans with normal lung tissue: " + str(len(normal_scan_paths)))
print("CT scans with CT1: " + str(len(ct1_scan_paths)))
print("CT scans with CT2: " + str(len(ct2_scan_paths)))
print("CT scans with CT3: " + str(len(ct3_scan_paths)))
print("CT scans with CT4: " + str(len(ct4_scan_paths)))

class_names = ['Normal', 'CT-1', 'CT-2', 'CT-3', 'CT-4']
idx_to_class = {i:j for i, j in enumerate(class_names)}
class_to_idx = {value:key for key,value in idx_to_class.items()}

from torchvision import transforms

data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch
import random

class CustomExternalMosmedDataset(Dataset):
    def __init__(self, paths, transform=None, target_transform=None):
        self.paths = paths
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        path = self.paths[idx]
        #print(path)
        exam = process_scan(path)
        label = path.split('/')[-2]
        label = class_to_idx[label]
                
        if self.transform:
            exam = self.target_transform(exam)
            
        if self.target_transform:
            label = self.target_transform(label)
        return exam, label, path

mosmed_dataset = np.concatenate((np.asarray(normal_scan_paths), np.asarray(ct1_scan_paths),
                                np.asarray(ct2_scan_paths), np.asarray(ct3_scan_paths), 
                                 np.asarray(ct4_scan_paths)))
#mosmed_dataset = np.asarray(abnormal_scan_paths)
mosmed_dataset = CustomExternalMosmedDataset(mosmed_dataset)
mosmed_dataloader = DataLoader(mosmed_dataset, batch_size=1, shuffle=True, worker_init_fn=seed_worker,
    generator=g)

In [None]:
import torchvision
from matplotlib import pyplot as plt
import numpy as np

#plt.rcParams['figure.figsize'] = [30, 20]

def imshow_img(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    #mean = np.array([0.485, 0.456, 0.406])
    #std = np.array([0.229, 0.224, 0.225])
    #inp = std * inp + mean
    #inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated
    
def imshow_mask(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, labels, path = next(iter(mosmed_dataloader))
#inputs = inputs.numpy().squeeze().transpose(0,3,1,2)
inputs = inputs.squeeze().permute(0,3,1,2)
# Make a grid from batch
out = torchvision.utils.make_grid(inputs)
print(labels)

imshow_img(out)

In [None]:
%%time

from get_lungs_vh_paper import get_lungs
from get_lesions_vh_paper import get_lesions
import re

plt.rcParams['figure.figsize'] = [20, 10]

k = 1

y_true = []
y_pred = []
perc_true = []
perc_pred = []
severity_true = []
severity_pred = []
images_with_lesion_pred = []
exam_lesion_pred = []
lesion_pixel_per_exam = []

for exam, label, path in iter(mosmed_dataloader):
    exam = exam.numpy().squeeze()#.transpose(0,3,1,2)
    
        # para cada batch segmenta o pulmão
    segmented_lung_inputs, predicted_lung_inputs = get_lungs(exam, 
                                                             resize=False, 
                                                             return_masks=True,
                                                             weight=2)
    
    segmented_lesion_inputs, predicted_lesion_inputs = get_lesions(segmented_lung_inputs, 
                                                             resize=False, 
                                                             return_masks=True,
                                                             weight='Mobilenet FPN',
                                                             thresh=0.5)
    biggest_lesion_image_idx = None
    biggest_lesion_area = 0.
    total_left_lung_area = 0
    total_right_lung_area = 0
    total_left_lesion_area = 0
    total_right_lesion_area = 0
    left_lung_per_image = 0
    right_lung_per_image = 0
    left_per_exam = 0
    right_per_exam = 0
    
    left_lung_per_image_perc = [] 
    right_lung_per_image_perc = []
    left_lesion_per_image_perc = []
    right_lesion_per_image_perc = []
    
    for i in range(exam.shape[0]):
        lung_mask = predicted_lung_inputs[i]
        
        lesion_mask = predicted_lesion_inputs[i,:,:,0]
        lesion_mask = np.expand_dims(lesion_mask, axis=2)
        contours_lung, hierarchy = cv2.findContours(lung_mask[:,:,0], cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        contours_lesion, hierarchy = cv2.findContours(lesion_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        left_lung = np.zeros((lung_mask.shape[0],lung_mask.shape[1],3),dtype = np.uint8)
        right_lung = np.zeros((lung_mask.shape[0],lung_mask.shape[1],3),dtype = np.uint8)
        left_lesion = np.zeros((lesion_mask.shape[0],lesion_mask.shape[1],3),dtype = np.uint8)
        right_lesion = np.zeros((lesion_mask.shape[0],lesion_mask.shape[1],3),dtype = np.uint8)
        left_lung_area = 0
        right_lung_area = 0
        left_lesion_area = 0
        right_lesion_area = 0
        
        if len(contours_lung) > 0:
            for cnt in contours_lung:
                leftmost = tuple(cnt[cnt[:,:,0].argmin()][0])
                rightmost = tuple(cnt[cnt[:,:,0].argmax()][0])

                left_dist = abs(leftmost[0] - 0)
                right_dist = abs(lung_mask.shape[1] - rightmost[0])

                if left_dist < right_dist:
                    left_lung = cv2.drawContours(left_lung, [cnt], 0, (255,0,0), -1)
                    left_lung_area += cv2.contourArea(cnt)
                    total_left_lung_area += left_lung_area
                else:
                    right_lung = cv2.drawContours(right_lung, [cnt], 0, (0,255,0), -1)
                    right_lung_area += cv2.contourArea(cnt)
                    total_right_lung_area += right_lung_area
                    
        if len(contours_lesion) > 0:
            for cnt in contours_lesion:
                leftmost = tuple(cnt[cnt[:,:,0].argmin()][0])
                rightmost = tuple(cnt[cnt[:,:,0].argmax()][0])

                left_dist = abs(leftmost[0] - 0)
                right_dist = abs(lesion_mask.shape[1] - rightmost[0])

                if left_dist < right_dist:
                    left_lesion = cv2.drawContours(left_lesion, [cnt], 0, (255,0,0), -1)
                    left_lesion_area += cv2.contourArea(cnt)
                    total_left_lesion_area += left_lesion_area
                
                else:
                    right_lesion = cv2.drawContours(right_lesion, [cnt], 0, (0,255,0), -1)
                    right_lesion_area += cv2.contourArea(cnt)
                    total_right_lesion_area += right_lesion_area
                    
                if (left_lesion_area+right_lesion_area) > biggest_lesion_area:
                    biggest_lesion_image_idx = i
                    biggest_lesion_area = left_lesion_area+right_lesion_area
            
            if right_lesion_area > 0:
                right_lesion_per_image_perc.append(right_lesion_area)
            if left_lesion_area > 0:
                left_lesion_per_image_perc.append(left_lesion_area)
            
            if left_lung_area > 500 and right_lung_area > 500:
                if left_lesion_area > 0:
                    left_lung_per_image_perc.append(left_lung_area)
                if right_lesion_area > 0:
                    print('right_lung area: ', right_lung_area)
                    right_lung_per_image_perc.append(right_lung_area)
            
            # pulmoes ficaram colados
            elif right_lung_area > 500: # left_lung_area é zero 
                right_lung_per_image_perc.append(right_lung_area/2)
                if left_lesion_area > 0:
                    left_lung_per_image_perc.append(right_lung_area/2)
                
            # pulmoes ficaram colados
            elif left_lung_area > 500:
                left_lung_per_image_perc.append(left_lung_area/2)
                if right_lesion_area > 0:
                    print('left_lung_area: ', left_lung_area/2)
                    right_lung_per_image_perc.append(left_lung_area/2)
            print(right_lung_per_image_perc)
                                
#         plt.subplot(161)
#         #plt.imshow(lung_mask)
#         plt.imshow(exam[i])
#         plt.subplot(162)
#         plt.imshow(left_lung)
#         plt.subplot(163)
#         plt.imshow(right_lung)
#         plt.subplot(164)
#         plt.imshow(lesion_mask)
#         plt.subplot(165)
#         plt.imshow(left_lesion)
#         plt.subplot(166)
#         plt.imshow(right_lesion)
#         plt.show()
    
    left_quant = round(100*total_left_lesion_area/total_left_lung_area,2)
    right_quant = round(100*total_right_lesion_area/total_right_lung_area,2)
    mean_quant = (left_quant + right_quant)
    
    lesion_pixel_per_exam.append(total_left_lesion_area + total_right_lesion_area)
    
    left_aux = [100*(i / j) for i, j in zip(left_lesion_per_image_perc, left_lung_per_image_perc)]
    right_aux = [100*(i / j) for i, j in zip(right_lesion_per_image_perc, right_lung_per_image_perc)]
    
    left_aux = [i for i in left_aux if i != 0]
    right_aux = [i for i in right_aux if i != 0]
    
    if len(left_aux)>0 and len(right_aux)>0:            
        max_aux = (np.max(left_aux) + np.max(right_aux))
        
    elif len(left_aux)>0:
        max_aux = (np.max(left_aux))
        
    elif len(right_aux)>0:
        max_aux = (np.max(right_aux))
    else:
        max_aux = 0
        
    if max_aux > 75:
        pred_label = 4
    elif (max_aux > 50 and max_aux <= 75):
        pred_label = 3
    elif (max_aux > 25 and max_aux <= 50):
        pred_label = 2
    elif (max_aux > 0 and max_aux <= 25):
        pred_label = 1
    else:
        pred_label = 0
    
#     if (left_quant > 75) or (right_quant > 75) or (mean_quant > 75):
#         pred_label = 4
#     elif (left_quant > 50 and left_quant <= 75) or (right_quant > 50 and right_quant <= 75) or (mean_quant > 50 and mean_quant <= 75):
#         pred_label = 3
#     elif (left_quant > 25 and left_quant <= 50) or (right_quant > 25 and right_quant <= 50) or (mean_quant > 25 and mean_quant <= 50):
#         pred_label = 2
#     elif (left_quant > 0 and left_quant <= 25) or (right_quant > 0 and right_quant <= 25) or (mean_quant > 0 and mean_quant <= 25):
#         pred_label = 1
#     else:
#         pred_label = 0

    print(f'=========================Exam {k}=========================')
    print(f'Total left lung area is: {total_left_lung_area}, Total left lesion area is: {total_left_lesion_area}')
    print(f'Total Right lung area is: {total_right_lung_area}, Total right lesion area is: {total_right_lesion_area}')
    print(f'Total left % is {round(100*total_left_lesion_area/total_left_lung_area,2)}%')
    print(f'Total right % is {round(100*total_right_lesion_area/total_right_lung_area,2)}%')
    print(f'Pred label is {pred_label}')
    print(f'True label is {label.item()}')
#     if left_lung_per_image!=0:
#         print(f'Left lesion per image {100*(total_left_lesion_area/left_lung_per_image)}')
#     if right_lung_per_image!=0:
#         print(f'and Right {100*(total_right_lesion_area/right_lung_per_image)}')
    #print(f'Left lesion per exam {100*np.mean(left_per_exam_perc)} and Right {100*np.mean(right_per_exam_perc)}')
    
    print(f'Left: {left_aux} Right: {right_aux}')
    if len(left_aux)>0:
        print(f'Left mean {np.mean(left_aux)}, left max {np.max(left_aux)}')
    if len(right_aux)>0:
        print(f' Right mean {np.mean(right_aux)}, right max {np.max(right_aux)}')
    
    if biggest_lesion_image_idx != None:
        plt.subplot(141)
        plt.axis('off')
        plt.imshow(exam[biggest_lesion_image_idx,:,:,:])
        plt.subplot(142)
        plt.axis('off')
        plt.imshow(segmented_lung_inputs[biggest_lesion_image_idx,:,:,:])
        plt.subplot(143)
        plt.axis('off')
        plt.imshow(predicted_lesion_inputs[biggest_lesion_image_idx,:,:,:])
        
        plt.subplot(144)
        img = (predicted_lesion_inputs[biggest_lesion_image_idx,:,:,:][:,:,0]).astype(np.uint8)
        contours, hierarchy = cv2.findContours(img, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        painted_img = cv2.drawContours(exam[biggest_lesion_image_idx,:,:,:].copy(), contours, -1, (255,0,0), 2)
        img2 = (predicted_lung_inputs[biggest_lesion_image_idx,:,:,:][:,:,0]).astype(np.uint8)
        contours, hierarchy = cv2.findContours(img2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        painted_img = cv2.drawContours(painted_img, contours, -1, (0,255,0), 2)
        plt.axis('off')
        plt.imshow(painted_img)
        plt.show()
    
    print(f'==========================================================')
    y_pred.append(pred_label)
    y_true.append(label.item())
    k += 1
#     if k > 200:
#         break

In [None]:
from sklearn.metrics import classification_report, accuracy_score, f1_score, precision_score, recall_score, confusion_matrix, ConfusionMatrixDisplay, mean_squared_error, mean_absolute_error

acc = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='weighted')
prec = precision_score(y_true, y_pred, average='weighted')
rec = recall_score(y_true, y_pred, average='weighted')
#spec = recall_score(y_true, y_pred, pos_label=0)
print(f"Resnext101 Unet++ & {round(acc*100,2)} & {round(f1*100,2)} & {round(prec*100,2)} & {round(rec*100,2)} \\\\")  

confusion_matrix = confusion_matrix(y_true, y_pred)
cm_display = ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = ['Normal', 'CT-1', 'CT-2', 'CT-3', 'CT-4'])
cm_display.plot(cmap="Blues")
plt.show()

In [None]:
print(classification_report(y_true, y_pred))

In [None]:
from sklearn.metrics import classification_report, accuracy_score, f1_score, precision_score, recall_score, confusion_matrix, ConfusionMatrixDisplay, mean_squared_error, mean_absolute_error

y_true2 = [int(x) for x in np.array(y_true)>0]
y_pred2 = [int(x) for x in np.array(y_pred)>0]

acc = accuracy_score(y_true2, y_pred2)
f1 = f1_score(y_true2, y_pred2)
prec = precision_score(y_true2, y_pred2)
rec = recall_score(y_true2, y_pred2)
spec = recall_score(y_true2, y_pred2, pos_label=0)
print(f"Resnet50 FPN & {round(acc*100,2)} & {round(f1*100,2)} & {round(prec*100,2)} & {round(rec*100,2)} & {round(spec*100,2)} \\\\")  

confusion_matrix = confusion_matrix(y_true2, y_pred2)
cm_display = ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = ['Normal', 'Covid'])
cm_display.plot(cmap="Blues")
plt.show()