In [1]:
! pip install pycocotools scikit-image tensorboard
! pip install roboflow
from roboflow import Roboflow
from pycocotools.coco import COCO
import pycocotools.mask as mask_utils
from pycocotools.cocoeval import COCOeval
import matplotlib.pyplot as plt
import cv2
import seaborn as sns
from skimage import draw as sk_draw
import numpy as np
import json
from tqdm import tqdm
from statistics import mean
import os
import contextlib
import io
from IPython.display import clear_output
clear_output()

In [2]:
HOME = os.getcwd()
LOG_DIR="metrics/"
print(HOME)

/home/junior/Documents/stage-4a-SEDOGBO/model/model1/semi_supervised


In [3]:
#! pip uninstall -y torch torchvision
! pip install torch>=2.0.0 torchvision>=0.10.0

import torch
print(torch.__version__)
print(torch.cuda.is_available())
device = 'cuda' if torch.cuda.is_available() else 'cpu'

from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset, DataLoader
import torchvision.models.segmentation
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision import transforms

clear_output()

In [4]:
!pip install roboflow
rf = Roboflow(api_key="K72bvIl0rTcvckcth1sm")
#project = rf.workspace("insa-3ptmt").project("simplified")
#version = project.version(2)

#project = rf.workspace("insa-3ptmt").project("deep_forest")
#version = project.version(1)

project = rf.workspace("insa-3ptmt").project("final-tree-detection")
version = project.version(5)

dataset = version.download("coco-segmentation")
clear_output()

In [5]:
class CustomCOCODataset(Dataset):
    def __init__(self, coco_json_path, transform, image_dir, imageSize : list[int]):
        with contextlib.redirect_stdout(io.StringIO()):
            self.coco = COCO(coco_json_path)
        self.image_dir = image_dir
        self.transform = transform
        self.image_ids = list(self.coco.imgs.keys())
        self.imageSize = imageSize
        self.annFile = coco_json_path

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_id = self.image_ids[idx]
        img_info = self.coco.loadImgs(img_id)
        assert len(img_info) == 1, f"Plus d'une annotation pour l'image {img_id}"
        img_info = img_info[0]
        img_path = os.path.join(self.image_dir, img_info['file_name'])
        image = cv2.imread(img_path) #Image.open(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, self.imageSize, cv2.INTER_LINEAR)
        image = torch.as_tensor(image, dtype=torch.float32)
        image /= 255.0
        
        ann_ids = self.coco.getAnnIds(img_id)
        anns = self.coco.loadAnns(ann_ids)
        num_objects = len(anns)
        #print("Image", img_path, "avec id = ", img_id, "comporte ", len(anns), " annotations")
        masks = np.zeros((num_objects, self.imageSize[0], self.imageSize[1]), dtype=np.uint8)
        all_bboxes = torch.zeros([num_objects,4], dtype=torch.float32)
        object_number = 1
        for index, ann in enumerate(anns):
            x_min, y_min, width, height = ann['bbox']
            normalized_bbox = (x_min, y_min, x_min + width, y_min + height)
            all_bboxes[index] = torch.tensor(normalized_bbox)
            for seg in ann['segmentation']:
                rr, cc = sk_draw.polygon(seg[1::2], seg[0::2], masks[index].shape)
                masks[index, rr, cc] = min(object_number, 254)
                object_number += 1

        image = image.swapaxes(0, 2).swapaxes(1, 2)
        masks = (masks > 0).astype(bool)
        data = {"boxes": all_bboxes, "labels": torch.ones((num_objects, ), dtype=torch.int64),
                "masks": masks, "image_id": img_id}
        #print("Mask -- shape : ", masks.shape, "minimum = ", torch.min(torch.tensor(masks)).item(), 'maximum = ', torch.max(torch.tensor(masks)).item())
        #print("Original image -- shape : ", image.shape, "minimum = ", torch.min(image).item(), 'maximum = ', torch.max(image).item())

        return image, data

In [6]:
DATASET_PATH = "final-tree-detection-5/"
train_coco_json_path = DATASET_PATH+'train/_annotations.coco.json'
valid_coco_json_path = DATASET_PATH+'valid/_annotations.coco.json'
test_coco_json_path = DATASET_PATH+'test/_annotations.coco.json'

transform = transforms.Compose([
    transforms.ToTensor()
])

In [7]:
def collate_fn(batch):
    images = []
    targets = []
    for sample in batch:
        images.append(sample[0])
        targets.append(sample[1])

    images = torch.stack(images, dim=0)

    return images, targets
imageSize = [1024, 1024]
train_dataset = CustomCOCODataset(train_coco_json_path, transform, DATASET_PATH+"train", imageSize = imageSize)
valid_dataset = CustomCOCODataset(valid_coco_json_path, transform, DATASET_PATH+"valid", imageSize = imageSize)
test_dataset = CustomCOCODataset(test_coco_json_path, transform, DATASET_PATH+"test", imageSize = imageSize)

train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True, drop_last=False, collate_fn=collate_fn)
valid_dataloader = DataLoader(valid_dataset, batch_size=2, shuffle=False, drop_last=False, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)
#clear_output()

In [8]:
def show_mask(random_color=False):
    images, targets = next(iter(valid_dataloader))
    print("Images batch shape = ", images.shape)
    image, target = images[0], targets[0]
    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(20, 10))

    for ax in axes:
        ax.grid(False)
        ax.axis('off')
    bg = image.permute(1, 2, 0).cpu().numpy()
    bg = (bg * 255.0).astype(np.uint8)
    msk = bg.copy()

    axes[0].imshow(bg)
    axes[1].imshow(msk)
    ground_truth_seg = np.array(target["masks"])
    h, w = ground_truth_seg.shape[-2:]
    for mask in ground_truth_seg:
        if random_color:
            color = np.concatenate([np.random.random(3), np.array([0.3])], axis=0)
        else:
            color = np.array([30/255, 144/255, 255/255, 0.5])
        mask_image = mask.reshape(h, w, 1) * color.reshape(1, 1, -1)
        ax.imshow(mask_image)

#show_mask(random_color=True)

In [18]:
from scipy.optimize import linear_sum_assignment
def calculate_iou(mask1, mask2):
    intersection = np.logical_and(mask1, mask2)
    union = np.logical_or(mask1, mask2)
    iou = np.sum(intersection) / np.sum(union)
    return iou

def calculate_iou_with_bbox(box1, box2):
    x1, y1, x2, y2 = box1
    x1g, y1g, x2g, y2g = box2
    xi1 = max(x1, x1g)
    yi1 = max(y1, y1g)
    xi2 = min(x2, x2g)
    yi2 = min(y2, y2g)
    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2g - x1g) * (y2g - y1g)
    union_area = box1_area + box2_area - inter_area
    iou = inter_area / union_area
    return iou

def match_masks(pred_masks, gt_masks, iou_threshold=0.5):
    num_pred = len(pred_masks)
    num_gt = len(gt_masks)
    iou_matrix = np.zeros((num_gt, num_pred))
    for i in range(num_gt):
        for j in range(num_pred):
            iou_matrix[i, j] = calculate_iou(gt_masks[i], pred_masks[j])
    iou_matrix[iou_matrix < iou_threshold] = 0
    row_indices, col_indices = linear_sum_assignment(-iou_matrix)
    correspondances = []
    used_pred_indices = set()
    for gt_idx, pred_idx in zip(row_indices, col_indices):
        if iou_matrix[gt_idx, pred_idx] >= iou_threshold:
            correspondances.append((gt_idx, pred_idx, iou_matrix[gt_idx, pred_idx]))
            used_pred_indices.add(pred_idx)

    return correspondances

def precision_score(groundtruth_mask, pred_mask):
    intersect = np.sum(pred_mask * groundtruth_mask)
    total_pixel_pred = np.sum(pred_mask)
    if total_pixel_pred == 0:
        return 0.0
    precision = intersect / total_pixel_pred
    return round(precision, 3)

def recall_score(groundtruth_mask, pred_mask):
    intersect = np.sum(pred_mask * groundtruth_mask)
    total_pixel_truth = np.sum(groundtruth_mask)
    if total_pixel_truth == 0:
        return 0.0
    recall = intersect / total_pixel_truth
    return round(recall, 3)

def calculate_precision_recall(pred_masks, gt_masks, iou_threshold=0.5, confidence=0.5):
    #print("Nombres de masques : --prédits = ", len(pred_masks)," --vrais = ", len(gt_masks))
    pred_masks = pred_masks >= confidence
    gt_masks = gt_masks >= confidence
    correspondances = match_masks(pred_masks, gt_masks, iou_threshold)
    avg_precision, avg_recall = [], []

    matched_gt_indices = set([corr[0] for corr in correspondances])
    matched_pred_indices = set([corr[1] for corr in correspondances])

    for gt_idx, pred_idx, _ in correspondances:
        mask_precision = precision_score(gt_masks[gt_idx], pred_masks[pred_idx])
        avg_precision.append(mask_precision)
        mask_recall = recall_score(gt_masks[gt_idx], pred_masks[pred_idx])
        avg_recall.append(mask_recall)

    # Calculer les faux positifs (FP) et les faux négatifs (FN)
    fp_count = len(pred_masks) - len(matched_pred_indices)
    fn_count = len(gt_masks) - len(matched_gt_indices)
    #print(f"Nombre de faux positifs = {fp_count} et nombre de faux négatifs = {fn_count}")
    # Ajouter des précisions/rappels de 0 pour chaque faux positif et faux négatif
    avg_precision.extend([0] * fp_count)
    avg_recall.extend([0] * fn_count)
    
    if len(avg_precision) == 0:
        avg_precision_value = 0
    else:
        avg_precision_value = sum(avg_precision) / len(avg_precision)
    
    if len(avg_recall) == 0:
        avg_recall_value = 0
    else:
        avg_recall_value = sum(avg_recall) / len(avg_recall)

    return avg_precision_value, avg_recall_value, correspondances


In [19]:
gt_masks = np.array([
    [[1, 1, 0, 0], [1, 1, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], 
    [[0, 0, 1, 1], [0, 0, 1, 1], [0, 0, 0, 0], [0, 0, 0, 0]]
])

pred_masks = np.array([
    [[0.9, 0.8, 0.1, 0.0], [0.9, 0.8, 0.1, 0.0], [0.2, 0.2, 0.1, 0.0], [0.2, 0.2, 0.1, 0.0]]
])

precision, recall, _ = calculate_precision_recall(pred_masks, gt_masks, iou_threshold=0.5, confidence=0.5)

print(f'Précision: {precision:.3f}')
print(f'Rappel: {recall:.3f}')


Précision: 1.000
Rappel: 0.500


In [20]:
def evaluate(model, data_loader, device, iou_threshold = 0.5, confidence=0.5):
    # Evaluate the model over a dataset and compute/log metrics
    # This function does is composed of 2 main parts
    #===================================================================#
    # getting losses infos #
    #===================================================================#
    model.train()
    batch_losses = {"val_box_loss": [], "val_seg_loss": [], "val_cls_loss": [], "val_global": []}
    with torch.no_grad():
        for val_batch in valid_dataloader:
            images, targets = val_batch
            images = list(image.to(device) for image in images)
            targets=[{k: torch.as_tensor(v).to(device) for k,v in t.items()} for t in targets]
            val_loss_dict = model(images, targets)
            val_losses = sum(loss for loss in val_loss_dict.values())
            # Model output keys : loss_classifier, loss_box_reg, loss_mask, loss_objectness, loss_rpn_box_reg
            batch_losses["val_box_loss"].append(val_loss_dict["loss_box_reg"].item())
            batch_losses["val_seg_loss"].append(val_loss_dict["loss_mask"].item())
            batch_losses["val_cls_loss"].append(val_loss_dict["loss_classifier"].item())
            batch_losses["val_global"].append(val_losses.item())

    for k in batch_losses.keys():
        batch_losses[k] = mean(batch_losses[k])

    #===================================================================#
    # getting metrics #
    #===================================================================#
    model.eval()
    with contextlib.redirect_stdout(io.StringIO()):
        coco = COCO(data_loader.dataset.annFile)
    coco_results = []
    all_ious = []
    precision_epoch, recall_epoch = [], []
    with torch.no_grad():
        for images, targets in data_loader:
            images = list(img.to(device) for img in images)
            targets=[{k: torch.as_tensor(v).to(device) for k,v in t.items()} for t in targets]
            outputs = model(images)
            #print("predictions masks : ", outputs[0]["masks"].shape)
            for i, output in enumerate(outputs):
                boxes = output['boxes'].cpu().numpy()
                scores = output['scores'].cpu().numpy()
                labels = output['labels'].cpu().numpy()
                masks = output['masks'].cpu().numpy()
                precision_one_img, recall_one_img, _ = calculate_precision_recall(masks, targets[i]['masks'].cpu().numpy(), 
                                                        iou_threshold=iou_threshold, confidence=confidence)
                
                precision_epoch.append(precision_one_img)
                recall_epoch.append(recall_one_img)
                gt_boxes = targets[i]['boxes'].cpu().numpy()
                #print("Nombre de bbox prédites : ", len(boxes))
                for j in range(len(boxes)):
                    box = boxes[j]
                    score = scores[j]
                    label = labels[j]
                    mask = masks[j][0]
                    #print("Mask for box : ", mask.shape, mask)
                    mask = (mask * 255).astype(np.uint8)
                    ious = [calculate_iou_with_bbox(box, gt_box) for gt_box in gt_boxes]
                    if ious:
                        max_iou = max(ious)
                        all_ious.append(max_iou)

                    coco_results.append({
                        'image_id': targets[i]["image_id"].item(),
                        'category_id': int(label),
                        'bbox': box.tolist(),
                        'score': float(score),
                        'segmentation': mask_utils.encode(np.asfortranarray(mask))
                    })

    precision_epoch = sum(precision_epoch) / len(precision_epoch)
    recall_epoch = sum(recall_epoch) / len(recall_epoch)
    print("Precision = ", precision_epoch, " Recall = ", recall_epoch)
    # COCO metrics evaluation
    #with contextlib.redirect_stdout(io.StringIO()):
    #    coco_dt = coco.loadRes(coco_results)
    #    coco_eval = COCOeval(coco, coco_dt, iouType='segm')
    #    coco_eval.evaluate()
    #    coco_eval.accumulate()
    #    coco_eval.summarize()
    # IoU computation
    mean_iou = np.mean(all_ious)
    #metrics = {"mAP": coco_eval.stats[0], "AP50": coco_eval.stats[1], "IoU": mean_iou}
    metrics = {"precision": precision_epoch, "recall": recall_epoch, "IoU": mean_iou}
    for k in batch_losses.keys():
        metrics[k] = batch_losses[k]
    return metrics

In [13]:
def fine_tune(model, optimizer, writer, num_epochs, train_dataloader, valid_dataloader = None):
    num_steps = len(train_dataloader)
    for epoch in range(num_epochs):
        batch_losses = {"box_loss": [], "seg_loss": [], "cls_loss": [], "global": []}
        print(f'Epoch [{epoch+1}/{num_epochs}]')
        for index, train_batch in enumerate((pbatch := tqdm(train_dataloader, colour='green'))):
            model.train()
            optimizer.zero_grad()
            images, targets = train_batch
            images = list(image.to(device) for image in images)
            targets=[{k: torch.as_tensor(v).to(device) for k,v in t.items()} for t in targets]
            loss_dict = model(images, targets)

            losses = sum(loss for loss in loss_dict.values())
            #Model output keys : loss_classifier, loss_box_reg, loss_mask, loss_objectness, loss_rpn_box_reg
            batch_losses["box_loss"].append(loss_dict["loss_box_reg"].item())
            batch_losses["seg_loss"].append(loss_dict["loss_mask"].item())
            batch_losses["cls_loss"].append(loss_dict["loss_classifier"].item())
            batch_losses["global"].append(losses.item())
            losses.backward()
            optimizer.step()

            if index == num_steps-1:
                for k in batch_losses.keys():
                    batch_losses[k] = mean(batch_losses[k])
                    writer.add_scalar(k, batch_losses[k], epoch)
                if valid_dataloader is not None:
                    #Validation loop
                    metrics = evaluate(model, valid_dataloader, device)
                    for k in metrics.keys():
                        writer.add_scalar(k, metrics[k], epoch)

    writer.flush()

In [14]:
model=torchvision.models.detection.maskrcnn_resnet50_fpn(weights="MaskRCNN_ResNet50_FPN_Weights.COCO_V1")

In [15]:
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor=FastRCNNPredictor(in_features,num_classes=2)
optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-5)
writer = SummaryWriter(log_dir=LOG_DIR)
model.to(device)
clear_output()

In [21]:
num_epochs = 2
fine_tune(model, optimizer, writer, num_epochs, train_dataloader, valid_dataloader)

Epoch [1/2]


100%|[32m██████████[0m| 17/17 [02:24<00:00,  8.53s/it]


Precision =  0.1812475  Recall =  0.06391744791666668
Epoch [2/2]


100%|[32m██████████[0m| 17/17 [02:25<00:00,  8.53s/it]

Precision =  0.1881125  Recall =  0.06860213541666667





In [None]:
%cd {HOME}
torch.save(model.state_dict(), f"model{num_epochs}.torch")

In [None]:
%tensorboard --logdir=LOG_DIR

In [None]:
def draw_metrics(metrics : dict):
    sns.set_theme(style="darkgrid")
    ncols, nepochs = len(metrics), len(next(iter(metrics.values())))
    fig, axs = plt.subplots(nrows=1, ncols=ncols, figsize=(10, 3))
    if ncols == 1:
        axs = [axs]
    for idx, (metric, values) in enumerate(metrics.items()):
        ax = axs[idx]
        sns.lineplot(x=np.arange(nepochs), y=values, ax=ax, marker='o', linestyle='-', color="blue")
        ax.set_xlabel(metric.capitalize())
        ax.grid(True)
    plt.tight_layout()
    plt.show()

draw_metrics({"box_loss": metrics["box_loss"], "seg_loss": metrics["seg_loss"], "cls_loss": metrics["cls_loss"], "global": metrics["global"]})
draw_metrics({"val_box_loss": metrics["val_box_loss"], "val_seg_loss": metrics["val_seg_loss"], "val_cls_loss": metrics["val_cls_loss"], "val_global": metrics["val_global"]})

In [None]:
! mkdir runs
SAVES_PATH = "runs/"
def visual_inference(image, prediction: dict, batch_index : int, confidence_threshold : float = 0.8,
                     prediction_path: str = None):
    fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(20, 10))

    for ax in axs:
        ax.grid(False)
        ax.axis('off')
    bg = image.permute(1, 2, 0).detach().cpu().numpy()
    bg = (bg * 255.0).astype(np.uint8)
    seg_im = bg.copy()
    for i in range(len(prediction['masks'])):
        msk=prediction['masks'][i,0].detach().cpu().numpy()
        scr=prediction['scores'][i].detach().cpu().numpy()
        if scr>confidence_threshold:
            seg_im[msk >= 0.5, 0] = 100
            seg_im[msk >= 0.5, 1] = 151
            seg_im[msk >= 0.5, 2] = 177
    axs[0].imshow(bg)
    axs[1].imshow(seg_im)
    #show_mask(prediction['masks'].detach().cpu().numpy(), axs[1], True)
    if not prediction_path:
      fig.savefig(SAVES_PATH+f'batch_{batch_index}.png', bbox_inches='tight')
    else:
      fig.savefig(SAVES_PATH + prediction_path, bbox_inches='tight')

In [None]:
model.eval()
precision, recall = 0.0, 0.0
with torch.no_grad():
    for index, (images, targets) in enumerate((pbatch := tqdm(test_dataloader, colour='green'))):
        predictions = model(images.to(device))
        #pred_masks = [m["masks"].detach().cpu().numpy() for m in predictions]
        #true_masks = [t["masks"] for t in  targets]
        #for target, prediction in zip(targets, predictions):
        #    compute_metrics(predictions, targets)
        # model output : boxes, labels, scores, masks
        for i in range(len(images)):
            visual_inference(images[i], predictions[i], index, confidence_threshold=0.5)

In [None]:
import random
def predict_non_annotated_image(file_id:str):
  file_path = f"data/{file_id}.jpeg"
  model.eval()
  with torch.no_grad():
      image = cv2.imread(file_path) #Image.open(img_path)
      image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
      image = cv2.resize(image, imageSize, cv2.INTER_LINEAR)
      image = torch.as_tensor(image, dtype=torch.float32)
      image /= 255.0
      image = image.swapaxes(0, 2).swapaxes(1, 2)
      uns_image = torch.unsqueeze(image, 0)
      prediction = model(uns_image.to(device))
      visual_inference(image, prediction[0], random.randint(50, 100), confidence_threshold=0.2, prediction_path=file_id+".png")

files = ["0_17", "0_6", "1_19", "1_20", "3_10", "2_15", "1_17"]
for file in files:
  predict_non_annotated_image(file)