In [1]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
from glob import glob
from math import ceil
from skimage import feature
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import jaccard_score

In [2]:
def plot_images(images: list, contours_list: np.array = None, centroids: list = None, n=16):
    
    images = images[:n]
    
    n_images = len(images)
    n_columns = 4
    n_rows = ceil(n_images/n_columns)

    x_size = 12
    y_size = int( n_rows*x_size/n_columns )

    fig, axes = plt.subplots(n_rows, n_columns, figsize=(x_size, y_size), sharex=True, sharey=True)

    plt.subplots_adjust(left=0.0,
                        bottom=0.0,
                        right=1,
                        top=1,
                        wspace=0.05,
                        hspace=0.05)
    
    ax = axes.flatten()

    for axs in ax[n_images:]:
        axs.remove()

    for i, img in enumerate(images):
        ax[i].set_axis_off()

        image = img.copy()
        image = image.astype('uint8')
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
        if contours_list is not None:
            img_contours = contours_list[i]
            cv2.drawContours(image, img_contours, -1, (0, 255, 0), 2)  # -1 means draw all contours, (0, 255, 0) is the color, 2 is the thickness
        if centroids is not None:
            for cntr in centroids[i]:
                cv2.circle(image, cntr, radius=3, color=(0,0,255), thickness=-1)

        ax[i].imshow(image)
     
    plt.show()




In [3]:
class CoringDetector:

    def __init__(self, images: np.array) -> None:
        self.input_images = images
        self.all_transforms = {"input_images": self.input_images}

    def apply_transforms(
        self,
        canny_sigma: float = 2.7,
        N1: int = 19,
        N2: int = 9,
        N3: int = 5,
    ):
        """
        Aplica serie de transformações para encontrar e fechar os contornos
        que representam os boreholes.
        """
        images = self.input_images.copy()

        # Calcular bordas com canny
        images = [feature.canny(img, sigma=canny_sigma) * 255.0 for img in self.input_images]
        self.all_transforms["canny"] = images

        # Fechar bordas encontradas com fechamento (para criar blobs)
        kernel1 = np.ones((N1, N1), np.uint8)
        images = [cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel1) for img in images]
        self.all_transforms["closing"] = images

        # Aplicando abertura com kernel vertical para remover ruidos horizontais
        kernel2 = np.ones((N2, 1), np.uint8)
        images = [cv2.morphologyEx(img, cv2.MORPH_OPEN, kernel2) for img in images]
        self.all_transforms["open"] = images

        # Aplicando dilatação para juntar ruidos proximos em grandes regiões, pode facilitar
        # na filtragem por área, mas talvez não seja necessario.
        kernel3 = np.ones((N3, N3), np.uint8)
        images = [cv2.morphologyEx(img, cv2.MORPH_DILATE, kernel3) for img in images]
        self.all_transforms["dilate"] = images

        return images

    @staticmethod
    def find_contours(images: list):
        """
        Usa OpenCV para encotrar os blobs
        das images pre processadas.
        """

        converted_images = []
        for image in np.array(images):
            # Ensure that the image is in uint8 format (CV_8U)
            if image.max() == 1:
                image *= 255
            if image.dtype != np.uint8:
                image = image.astype(np.uint8)
            # Convert to CV_8UC1 format
            if len(image.shape) > 2:
                image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            converted_images.append(image)
        
        contours = []
        for img in converted_images:

            contours_list,_ = cv2.findContours(img, cv2.CHAIN_APPROX_SIMPLE, cv2.CHAIN_APPROX_NONE)
            contours.append(contours_list)

        return contours
    
    def apply_thresholds(self, contours: np.array, 
                         min_area : int = 20, 
                         max_area : int = 400,
                         min_round_ratio : float = 0.5,
                         max_round_ratio: float = 2.0):
        """
        Filtra thresholds para filtrar contornos com base
        em algumas propriedades.
        """
        
        filtered_contours = []       
        for contours_list in contours:
            for contours_list in contours:

                new_contours_list = []
                for cntr in contours_list:
                    
                    area = cv2.contourArea(cntr)
                    # contour_areas.append(area)

                    arclength = cv2.arcLength(cntr, True)
                    # arclengths.append(arclength)
                    
                    round_ratio =  4 * np.pi * area / ( arclength**2 )
                    # roundness.append(round_ratio)

                    if (round_ratio < max_round_ratio) and (round_ratio > min_round_ratio) and (area > min_area) and (area < max_area):
                        new_contours_list.append(cntr)
                
            
                filtered_contours.append(new_contours_list)        

        return filtered_contours
    
    def get_filtered_blobs(self, filtered_contours: list):
        """
        Usa os contornos filtrados para gerar a imges finais
        que representam os boreholes.
        """
        
        N_images = len(self.input_images)
        input_images_shape = (N_images, *(self.input_images[0].shape))
        masks = np.zeros(input_images_shape, dtype='uint8')

        for i, image in enumerate(masks):
                
            img_contours = filtered_contours[i]
            cv2.drawContours(image, img_contours, -1, (255, 255, 255), cv2.FILLED)  # -1 means draw all contours, (0, 255, 0) is the color, 2 is the thickness
    
        return masks

    @staticmethod
    def get_centroids(filtered_contours: list):

        centroids = []
        for contour_list in filtered_contours:
            centroids_list = []
            for cntr in contour_list:

                M = cv2.moments(cntr)
                if M['m00'] > 0:
                    cx = int(M['m10']/M['m00'])
                    cy = int(M['m01']/M['m00'])
                    centroid = np.array([cx, cy])
                    centroids_list.append(centroid)
            
            centroids.append(centroids_list)
        
        
        return centroids

    def evaluate_centroids(self, true_centroids, predicted_centroids, threshold=10):

        # Asserting compatible lengths to test masks
        try:
            assert len(true_centroids) == len(predicted_centroids)
        except Exception:
            print(f"Mask and contours have incompatible lengths: {len(true_centroids)} and {len(predicted_centroids)}.")

    
        true_positives = 0
        false_positives = 0
        false_negatives = 0

        for i in range(len(true_centroids)):
            for pred_centroid in predicted_centroids[i]:
                match_found = False
                for true_centroid in true_centroids[i]:
                    distance = np.linalg.norm(pred_centroid-true_centroid)
                    if distance < threshold:
                        match_found = True
                        break
                if match_found:
                    true_positives += 1
                else:
                    false_positives += 1
                
                print(true_positives)

        false_negatives = len(true_centroids) - true_positives

        precision = true_positives / (true_positives + false_positives)
        recall = true_positives / (true_positives + false_negatives)
        f1_score = 2 * (precision * recall) / (precision + recall)
        

        return precision, recall, f1_score


    def evaluate_masks(self, masks_pred : list,
                       masks_true : list):
        # Asserting compatible lengths to test masks
        try:
            assert len(masks_pred) == len(masks_true)
        except Exception:
            print(f"Mask and contours have incompatible lengths: {len(masks_pred)} and {len(masks_true)}.")

        # Calculating IoU
        iou_scores = []
        for mask in range(len(masks_pred)):
            iou_score = jaccard_score(masks_true[mask], masks_pred[mask], average='micro')
            iou_scores.append(iou_score)
            
        return iou_scores
    
    def optimize_detector_with_centroids(
            self, 
            train_dataset : list,
            centroids_true : list,
            transforms_param_grid : list = [{'canny_sigma' : 2.7, 'N1' : 19, 'N2': 9, 'N3' : 5}],
            threshold_param_grid : list = [{'min_area': 0.5, 'max_area': 450, 'min_round_ratio': 0.1, 'max_round_ratio': 1.0}]):
    
        best_tf_params = None
        best_ts_params = None
        best_score = float('-inf')

        for tf_params in tqdm(transforms_param_grid, desc="Optmizing Tansforms Parameters Set", unit="param set"):        
            for ts_params in threshold_param_grid:
                transformed_images  = self.apply_transforms(**tf_params)
                contours = self.find_contours(transformed_images)
                filtered_contours = self.apply_thresholds(contours, **ts_params)
                centroids_pred = self.get_centroids(filtered_contours)
                
                _, _, evaluation_metric = self.evaluate_centroids(centroids_true, centroids_pred)
    
                if evaluation_metric > best_score:
                    best_score = evaluation_metric
                    best_tf_params = tf_params
                    best_ts_params = ts_params

        return best_score, best_tf_params, best_ts_params

    def optimize_detector(self, train_dataset : list,
                                  masks_true : list,
                                  transforms_param_grid : list = [{'canny_sigma' : 2.7, 'N1' : 19, 'N2': 9, 'N3' : 5}],
                                  threshold_param_grid : list = [{'min_area': 0.5, 'max_area': 450, 'min_round_ratio': 0.1, 'max_round_ratio': 1.0}]):
        best_tf_params = None
        best_ts_params = None
        best_iou_score = float('-inf')

        for tf_params in tqdm(transforms_param_grid, desc="Optmizing Tansforms Parameters Set", unit="param set"):        
            for ts_params in threshold_param_grid:
                transformed_images  = self.apply_transforms(**tf_params)
                contours = self.find_contours(transformed_images)
                filtered_contours = self.apply_thresholds(contours, **ts_params)
                masks_pred = self.get_filtered_blobs(filtered_contours)
                
                evaluation_metric = self.evaluate_masks(masks_pred, masks_true)
                evaluation_mean = np.mean(evaluation_metric)
    
                if evaluation_mean > best_iou_score:
                    best_iou_score = evaluation_mean
                    best_tf_params = tf_params
                    best_ts_params = ts_params

        return best_iou_score, best_tf_params, best_ts_params

In [4]:
imgs_dir = '../data/train/image/*'
imgs_paths = glob(imgs_dir)
imgs_paths = sorted(imgs_paths)


# Create list with all images in gray scale
images_gray = [ cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) for image_path in imgs_paths]
# images_gray = images_gray[:42]
# Visualizando imagens
# plot_images(images_gray)

In [5]:
detector = CoringDetector(images_gray[:42])
transformed_images  = detector.apply_transforms()
contours = detector.find_contours(transformed_images)
filtered_contours = detector.apply_thresholds(contours)

# Essas são as images usadas para testar contra os dados de treino.
blobs = detector.get_filtered_blobs(filtered_contours)

In [6]:
# plot_images(blobs)


In [7]:
# for stage in detector.all_transforms:

#     print(stage)
#     plot_images(detector.all_transforms[stage])
    

## Detector parameters optmization

In [8]:
import random

def plot_random_iou(masks_true, masks_preds):
    random.seed(42)
    
    idx = random.sample(range(len(masks_true)),1)[0]

    iou_score = jaccard_score(masks_true[idx], masks_preds[idx], average='micro')

    plt.imshow(images_gray[idx], cmap='gray')
    plt.imshow(masks_true[idx], cmap='Blues', alpha=0.5)
    plt.imshow(masks_preds[idx], cmap='Reds', alpha=0.3)
    plt.title("IoU: {:.2f}".format(iou_score))
    plt.axis('off')
    plt.show()

def plot_random_iou_grid(masks_true, masks_preds, grid_size=(5, 5), figsize=(15, 15), save_path=None):
    assert len(masks_true) == len(masks_preds), "Input lists must have the same length"

    random.seed(42)

    fig, axes = plt.subplots(*grid_size, figsize=figsize)

    for i in range(grid_size[0]):
        for j in range(grid_size[1]):
            idx = random.randint(0, len(masks_true) - 1)

            iou_score = jaccard_score(masks_true[idx], masks_preds[idx], average='micro')

            # axes[i, j].imshow(images_gray[idx], cmap='gray')
            axes[i, j].imshow(masks_true[idx], cmap='Blues', alpha=0.7)
            axes[i, j].imshow(masks_preds[idx], cmap='Reds', alpha=0.5)
            axes[i, j].set_title("IoU: {:.2f}".format(iou_score))
            axes[i, j].axis('off')

    plt.tight_layout()

    if save_path:
        plt.savefig(save_path, format='pdf', dpi=300)
    else:
        plt.show()

### Preprocessing data

#### Training images

In [9]:
N_images = 50

In [10]:
imgs_dir = '../data/train/image/*'
imgs_paths = glob(imgs_dir)
imgs_paths = sorted(imgs_paths)
imgs_paths = imgs_paths[:N_images]

train_dataset = [ cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) for image_path in imgs_paths]

In [11]:
# Padding images to 256x256

# Find the maximum shape among all arrays
max_shape = max(arr.shape for arr in train_dataset)

# Pad or resize each array to the maximum shape
padded_arrays = [np.pad(arr, [(0, max_shape[0] - arr.shape[0]), (0, max_shape[1] - arr.shape[1])], mode='constant', constant_values=int(np.mean(arr))) for arr in train_dataset]

# Convert the list of arrays to a single NumPy array
train_dataset = np.array(padded_arrays)

# Check the shape of the resulting array
print(train_dataset.shape)

(50, 256, 256)


#### Masks

In [12]:
masks_dir = '../data/train/mask/*'
masks_paths = glob(masks_dir)
masks_paths = sorted(masks_paths)
masks_paths = masks_paths[:N_images]

# Create list with all images in gray scale
masks_gray = [ cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE) for mask_path in masks_paths]

In [13]:
# Padding masks to 256x256 

# Find the maximum shape among all arrays
max_shape = max(arr.shape for arr in masks_gray)

# Pad or resize each array to the maximum shape
padded_arrays = [np.pad(arr, [(0, max_shape[0] - arr.shape[0]), (0, max_shape[1] - arr.shape[1])], mode='constant') for arr in masks_gray]

# Convert the list of arrays to a single NumPy array
train_masks = np.array(padded_arrays)

# Get the true centroids for the maks
masks_contours = CoringDetector.find_contours(train_masks)
masks_centroids = CoringDetector.get_centroids(masks_contours)

# plot_images(train_masks, masks_contours, masks_centroids)

# Check the shape of the resulting array
print(train_masks.shape)

(50, 256, 256)


### Create params grids

In [14]:
# Define the parameter ranges and steps
min_area_range = np.arange(1, 10, 5.)  # from 10 to 50, steps of 10
max_area_range = np.arange(400, 450, 50)  # from 250 to 450, steps of 50
min_round_ratio_range = np.arange(0.1, 0.2, 0.1)  # from 0.25 to 0.75, steps of 0.125
max_round_ratio_range = np.arange(0.5, 1.5, 0.5)  # from 1.0 to 3.0, steps of 0.5

# Construct the param_grid
ts_param_grid = [
    {'min_area': min_area, 'max_area': max_area, 'min_round_ratio': min_round_ratio, 'max_round_ratio': max_round_ratio}
    for min_area in min_area_range
    for max_area in max_area_range
    for min_round_ratio in min_round_ratio_range
    for max_round_ratio in max_round_ratio_range
]

print(f"A total of {len(ts_param_grid)} parameters set were created.")

A total of 4 parameters set were created.


In [15]:
# Define the parameter ranges and steps
canny_sigma_range = np.arange(1, 5, 0.5)
N1_range = np.arange(11, 32, step=2)
N2_fixed = 9
N3_fixed = 5

# Construct the param_grid
tf_param_grid = [
    {'canny_sigma': canny_sigma, 'N1': N1, 'N2': N2_fixed, 'N3': N3_fixed}
    for canny_sigma in canny_sigma_range
    for N1 in N1_range
]

print(f"A total of {len(tf_param_grid)} parameters set were created.")

A total of 88 parameters set were created.


### Brute force params optimizing

In [16]:
#  optimize using IoU

# detector = CoringDetector(train_dataset)
# best_score, best_tf_params, best_ts_params = detector.optimize_detector(train_dataset, train_masks, transforms_param_grid=tf_param_grid, threshold_param_grid=ts_param_grid)

In [17]:
#  optimize using f1_score

detector = CoringDetector(train_dataset)
best_score, best_tf_params, best_ts_params = detector.optimize_detector_with_centroids(
    train_dataset, 
    masks_centroids, 
    transforms_param_grid=tf_param_grid,
    threshold_param_grid=ts_param_grid)

Optmizing Tansforms Parameters Set:   0%|          | 0/88 [00:00<?, ?param set/s]

Optmizing Tansforms Parameters Set:   0%|          | 0/88 [00:00<?, ?param set/s]

Mask and contours have incompatible lengths: 50 and 2500.
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0





ZeroDivisionError: float division by zero

In [None]:
print(best_score, best_tf_params, best_ts_params)

In [None]:
# Detector optimized with best_params
optm_detector = CoringDetector(train_dataset)
transformed_images  = optm_detector.apply_transforms(**best_tf_params)
contours = optm_detector.find_contours(transformed_images)
filtered_contours = optm_detector.apply_thresholds(contours, **best_params)

optm_blobs = optm_detector.get_filtered_blobs(filtered_contours)

In [None]:
plot_images(optm_blobs)

## Plotting results

In [None]:
# plot_random_iou(train_masks, optm_blobs)      

In [None]:
# plot_random_iou_grid(train_masks,optm_blobs)