# Numerical comparison between different methods

**This colab shows what is the best method between RANSAC, LS and ellipse fitting as test for assess lungs direction. The output images are compare with reference images that are manually adjusted. It discerns between correct images, wrong images (adjusted but wrongly) and discarded images (the algorithm was not able to find a solution)**

In [None]:
!pip install Keras-Applications

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Clone BrixIA from Github

In [None]:
!git clone 'https://github.com/BrixIA/Brixia-score-COVID-19'

fatal: destination path 'Brixia-score-COVID-19' already exists and is not an empty directory.


Include BrixIA in sys.path

In [None]:
import sys
sys.path.append("/content/Brixia-score-COVID-19/src")

Imports

In [None]:
from BSNet.model import BSNet
from google.colab import drive
import tensorflow as tf
import cv2
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import linear_model
import shutil
import os
from skimage.segmentation import slic
from skimage.draw import polygon_perimeter
from skimage import measure

BASE_PATH = "/content/drive/Shareddrives/IDA covidcxr-hackaton/"

Mount GDrive

In [None]:
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


# Load the model with the weights

In [None]:
# Create the model with preloaded weights
# MAE: 0.48
model = BSNet(backbone_name='resnet18',
              input_shape=(512, 512, 1),
              input_tensor=None,
              encoder_weights=None,
              freeze_encoder=True,
              skip_connections='default',
              decoder_block_type='transpose',
              decoder_filters=(256, 128, 64, 32, 16),
              decoder_use_batchnorm=True,
              n_upsample_blocks=5,
              upsample_rates=(2, 2, 2, 2, 2),
              classes=4,
              activation='sigmoid',
              load_seg_model=True,
              seg_model_weights=BASE_PATH+'weights/segmentation-model.h5',
              freeze_segmentation=True,
              load_align_model=True,
              align_model_weights=BASE_PATH+'weights/alignment-model.h5',
              freeze_align_model=True,
              pretrain_aligment_net=False,
              explict_self_attention=True,
              load_bscore_model=True,
              bscore_model_weights=BASE_PATH+'weights/fpn_4lev_fliplr_ncl_loss03_correct_feat128-16-44.h5',
              )

Loading segmentation model
Loading alignment model
Loading BScore model


# Utility

In [None]:
def showImage(image, title=""):
  """
  Shows an image adding the desired title
  
  Parameters
  ----------
  image: numpy.ndarray
    The image to be shown
  title: string
    The title to be shown for the plotted image

  Returns
  -------
  None
  """
  
  if image.ndim == 2:
    plt.imshow(image, cmap = 'gray', interpolation = 'bicubic', vmin=0, vmax=255)
  else:
    plt.imshow(image, interpolation = 'bicubic', vmin=0, vmax=255)
    
  plt.title(title)
  plt.xticks([]), plt.yticks([])  # to hide tick values on X and Y axis
  plt.show()

In [None]:
def denormalizeImage(image):
  return np.uint8(np.where(image<0.5, 0, 255))

In [None]:
def lungs_direction_test(image, connectedComponents=None, assessment_mode="ellipse_fitting") -> bool:
  if connectedComponents is None:
    num_regions, labels, stats, _ = cv2.connectedComponentsWithStats(image, 8, cv2.CV_32S)
  else:
    num_regions, labels, stats, _ = connectedComponents
  
  if num_regions < 3:
    return False
  argsort = np.argsort(stats[:, -1])[::-1]

  if assessment_mode == "least_square":
    lung_1 = np.argwhere(labels==argsort[1])
    y_lung_1 = lung_1[:,0]
    x_lung_1 = lung_1[:,1]
    lung_2 = np.argwhere(labels==argsort[2])
    y_lung_2 = lung_2[:,0]
    x_lung_2 = lung_2[:,1]
    slope_1, intercept_1 = np.polyfit(x_lung_1, -y_lung_1, 1)
    slope_2, intercept_2 = np.polyfit(x_lung_2, -y_lung_2, 1)

  elif assessment_mode == "ransac":
    lung_1 = np.argwhere(labels==argsort[1])
    y_lung_1 = lung_1[:,0]
    x_lung_1 = lung_1[:,1]
    lung_2 = np.argwhere(labels==argsort[2])
    y_lung_2 = lung_2[:,0]
    x_lung_2 = lung_2[:,1]
    ransac_1 = linear_model.RANSACRegressor(linear_model.LinearRegression())
    ransac_1.fit(x_lung_1.reshape(-1,1), -y_lung_1)
    ransac_2 = linear_model.RANSACRegressor(linear_model.LinearRegression())
    ransac_2.fit(x_lung_2.reshape(-1,1), -y_lung_2)
    slope_1 = ransac_1.estimator_.coef_[0]
    slope_2 = ransac_2.estimator_.coef_[0]
    intercept_1 = ransac_1.predict([[0]])[0]
    intercept_2 = ransac_2.predict([[0]])[0]

  elif assessment_mode == "ellipse_fitting":
    lungs = np.uint8(np.where(np.logical_or(np.equal(labels, argsort[1]), np.equal(labels, argsort[2])), 255, 0))
    canny_output = cv2.Canny(lungs, 100, 200)
    contours, _ = cv2.findContours(canny_output, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    # Keep just the 2 longest contours
    argsort_contour = np.argsort([len(val) for val in contours])[-2:]
    # Redefine contours with just the 2 longest contours
    contours = [contours[i] for i in argsort_contour]
    # Find the ellipses for each contour
    minEllipses = [None]*len(contours)
    for i, c in enumerate(contours):
      if c.shape[0] > 5:
        minEllipses[i] = cv2.fitEllipse(c)
    slope_1 = np.tan(np.deg2rad(90 - minEllipses[0][2]))
    slope_2 = np.tan(np.deg2rad(90 - minEllipses[1][2]))
    intercept_1 = -minEllipses[0][0][1] - slope_1 * minEllipses[0][0][0]
    intercept_2 = -minEllipses[1][0][1] - slope_2 * minEllipses[1][0][0]

  else:
    raise ValueError('assessment_mode must be either least_square, ransac or ellipse_fitting')

  x_intersection = (intercept_2 - intercept_1) / (slope_1 - slope_2)
  y_intersection = slope_1 * x_intersection + intercept_1

  if y_intersection > -256 and x_intersection > 0 and x_intersection < 512:
    return True
  return False

In [None]:
#Brute-force lungs segmentation assessment
def adjust_preprocessed_image(image, assessment_mode="ellipse_fitting", num_pixels_lung=10000):
  # Adapt to the dimensions requested by BSNet
  adapted_image = np.expand_dims(np.expand_dims(image / 255, axis=2), axis=0)
  # most images are not rotated (0), few of them are rotate buy just 90 degrees (3, 1),
  # almost none of them are rotated by 180 degree (2)
  for rotation_count in [0, 3, 1, 2]:
    rotated_image = np.rot90(adapted_image, k=rotation_count, axes=(1,2))
    for pos_neg in [False, True]:
      if pos_neg:
        rotated_image = 1 - rotated_image
      mask = model[0].predict(rotated_image)
      # Go back to 512x512 image format
      mask = np.squeeze(np.squeeze(mask, axis=0), axis=2)
      # Use the 0-255 pixel value range
      mask = denormalizeImage(mask)
      # Erode the mask
      mask = cv2.erode(mask, np.ones((3,3), np.uint8), iterations=3)
      # Find connected components
      connectedComponents = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S)
      num_regions, _, stats, _ = connectedComponents
      # Sort stats by the last column
      argsort = np.argsort(stats[:, -1])[::-1]
      stats = stats[argsort]
      # If the first and the second region has at least 10000 pixel then it's ok
      if num_regions>=3 and stats[1][-1] > num_pixels_lung and stats[2][-1] > num_pixels_lung:
        if(lungs_direction_test(mask, connectedComponents=connectedComponents, assessment_mode=assessment_mode)):
          return np.squeeze(np.squeeze(np.uint8(rotated_image * 255), axis = 0), axis = 2)
  return None

# Read files

In [None]:
preprocessed_path = BASE_PATH+"TrainSetPreprocessed/"
adjusted_path = BASE_PATH+"TrainSetAdjusted/"
images_filenames = os.listdir(preprocessed_path)
number_total_images = len(images_filenames)

# Least square

In [None]:
number_correct_images = 0
number_discarded_images = 0
number_wrong_images = 0

for index, image_filename in enumerate(images_filenames):
  image = cv2.imread(preprocessed_path+image_filename, cv2.IMREAD_GRAYSCALE)
  corrected_image = cv2.imread(adjusted_path+image_filename, cv2.IMREAD_GRAYSCALE)
  output_image = adjust_preprocessed_image(image, assessment_mode="least_square")
  if output_image is None:
    number_discarded_images +=1
  elif np.array_equal(output_image, corrected_image):
    number_correct_images +=1
  else:
    number_wrong_images +=1

  print("\r"+str(index+1)+"/"+str(number_total_images)+";  Correct images = "+str(number_correct_images)+";  Discarded images = "+str(number_discarded_images)+";  Wrong images = "+str(number_wrong_images), end="")

1103/1103;  Correct images = 955;  Discarded images = 91;  Wrong images = 57

# RANSAC

In [None]:
number_correct_images = 0
number_discarded_images = 0
number_wrong_images = 0

for index, image_filename in enumerate(images_filenames):
  image = cv2.imread(preprocessed_path+image_filename, cv2.IMREAD_GRAYSCALE)
  corrected_image = cv2.imread(adjusted_path+image_filename, cv2.IMREAD_GRAYSCALE)
  output_image = adjust_preprocessed_image(image, assessment_mode="ransac")
  if output_image is None:
    number_discarded_images +=1
  elif np.array_equal(output_image, corrected_image):
    number_correct_images +=1
  else:
    number_wrong_images +=1

  print("\r"+str(index+1)+"/"+str(number_total_images)+";  Correct images = "+str(number_correct_images)+";  Discarded images = "+str(number_discarded_images)+";  Wrong images = "+str(number_wrong_images), end="")

1103/1103;  Correct images = 917;  Discarded images = 121;  Wrong images = 65

# Ellipses fitting

In [None]:
number_correct_images = 0
number_discarded_images = 0
number_wrong_images = 0

bad_images = []

for index, image_filename in enumerate(images_filenames):
  image = cv2.imread(preprocessed_path+image_filename, cv2.IMREAD_GRAYSCALE)
  corrected_image = cv2.imread(adjusted_path+image_filename, cv2.IMREAD_GRAYSCALE)
  output_image = adjust_preprocessed_image(image, assessment_mode="ellipse_fitting")
  if output_image is None:
    number_discarded_images +=1
    bad_images.append(image_filename)
  elif np.array_equal(output_image, corrected_image):
    number_correct_images +=1
  else:
    bad_images.append(image_filename)
    number_wrong_images +=1

  print("\r"+str(index+1)+"/"+str(number_total_images)+";  Correct images = "+str(number_correct_images)+";  Discarded images = "+str(number_discarded_images)+";  Wrong images = "+str(number_wrong_images), end="")

903/1103;  Correct images = 798;  Discarded images = 77;  Wrong images = 28



1103/1103;  Correct images = 971;  Discarded images = 101;  Wrong images = 31

In [None]:
for image_filename in bad_images:
  image = cv2.imread(preprocessed_path+image_filename, cv2.IMREAD_GRAYSCALE)
  output_image = adjust_preprocessed_image(image, assessment_mode="ellipse_fitting")
  showImage(image, "")