Notebook servant à faire une inférence décomposée (détection, segmentation et regression) de deux images du même instant afin de compter le nombre de rolls de chaque catégorie.

In [1]:
import torchvision.models as models
import torch.nn as nn
import torch.optim
import os
import cv2
from PIL import Image
import torchvision.transforms.functional as F
from torchvision import transforms
import numpy as np
import numbers
import time
import matplotlib.pyplot as plt
from ultralytics import YOLO
%matplotlib inline
import shutil
from scipy.ndimage import label

In [None]:
det_threshold = 0.07 # minimum confidence for detection predictions to keep
seg_threshold = 0.8 # minimum confidence for segmentation predictions to keep
overlap_threshold = 1 # maximum commun part beetween mask before being consedered overlaping
iou = 0.5 # pour NMS detection
margin_y, margin_x = 20 ,1 # min 1
reg_imgsz =[224,224]
save = True # Si True garde les resultats intermédiaires dans le dossier "tmp"
visualize = False # si True garde les images intermédiares des models de detection et segmentation (AI explain) -> lent

front = 'presentation/front.jpg'
top = 'presentation/top.jpg'

In [None]:
# a modifier celon le modèle de regression
def gen_new_model(weights):
    """Génère un modèles de regression et charge les poids pré-entrainés

    Args:
        weights (torch.collections.OrderedDict): Poids du modèle

    Returns:
        nn.Module: Modèle de regression
    """
    model = models.resnet18()

    #modification de la dernière couche
    model.fc = nn.Sequential( 
        nn.Linear(512, 1024),
        nn.BatchNorm1d(1024),
        nn.ReLU(),
        nn.Linear(1024, 1024),
        nn.BatchNorm1d(1024),
        nn.ReLU(),
        nn.Linear(1024, 512),
        nn.BatchNorm1d(512),
        nn.ReLU(),
        nn.Linear(512, 1)
    )
    
    
    model.load_state_dict(weights)
    return model


# Mise en place des models

In [None]:
det_model = YOLO(os.path.join('models', 'det.pt'))
seg_model = YOLO(os.path.join('models', 'seg.pt'))
reg_model = gen_new_model(torch.load(os.path.join('models', 'reg.pt')))

In [None]:
# supprime le dossier temporaire
try:
    shutil.rmtree('tmp')
except WindowsError as error:
    print(error)

In [None]:
def overlap(mask1, mask2, threshold):
    """Détermine si deux masks se superposent

    Args:
        mask1 (np.ndarray): Un mask
        mask2 (np.ndarray): Un mask
        threshold (float): Seuil de superposition toléré

    Returns:
        int:    - 0 si pas de superposition
                - 1 si superposition et mask1 plus petit
                - 2 si superposition et mask2 plus petit
    """
    common = mask1*mask2
    common_size = np.sum(common)
    # trouver le plus petit mask
    size1 = np.sum(mask1)
    size2 = np.sum(mask2)
    if size1 > size2:
        smaller = 2
        smaller_size = size2
    else:
        smaller = 1
        smaller_size = size1
    print(common_size/smaller_size)
    if common_size/smaller_size > threshold:
        return smaller # retourne le plus petit mask qui devra être supprimé
    return 0 # pas d'overlap significatif

def remove_overlap(seg_x_s, seg_conf_s, seg_xyxy_s, seg_mask_s, threshold):
    """Supprime les masks superposés

    Args:
        seg_x_s (list: int): Liste de coordonnées x des détections triées de gauche à droite (croissant)
        seg_conf_s (list: float): Liste des scores de confiance des détections triés de gauche à droite (croissant)
        seg_xyxy_s (list: list):  Liste des coordonnées des boites encadrantes des détections triés de gauche à droite (croissant)
        seg_mask_s (list: np.ndarray):  Liste des masks des détections triés de gauche à droite (croissant)
        threshold (float): Seuil de superposition toléré

    Returns:
        _type_: _description_
    """
    i = 0
    while i != len(seg_mask_s):
        j=i+1
        while j != len(seg_mask_s):
            overlapping_mask = overlap(seg_mask_s[i], seg_mask_s[j], threshold)
            match overlapping_mask:
                case 1: # supprimer le premier mask
                    del(seg_x_s[i])
                    del(seg_conf_s[i])
                    del(seg_xyxy_s[i])
                    del(seg_mask_s[i])
                case 2: # supprimer le deuxième mask
                    del(seg_x_s[j])
                    del(seg_conf_s[j])
                    del(seg_xyxy_s[j])
                    del(seg_mask_s[j])
                case 0:
                    j+=1 # étape suivante (car pas de modification des listes)
        i+=1
    return seg_x_s, seg_conf_s, seg_xyxy_s, seg_mask_s

In [None]:
def get_padding(image, imgsz):
    """Ajoute des pixels noir pour avoir les bonnes dimensions d'image

    Args:
        image (PIL Image): Image source
        imgsz (list: int(w, h)): Dimension cible

    Returns:
        list: Dimensions pour le rembourrage
    """
    w, h = image.size
    w_padding = max((imgsz[0] - w) / 2, 0)
    h_padding = max((imgsz[1] - h) / 2, 0)
    t_pad = h_padding if h_padding % 1 == 0 else h_padding+0.5
    l_pad = w_padding if w_padding % 1 == 0 else w_padding+0.5
    b_pad = h_padding if h_padding % 1 == 0 else h_padding-0.5
    r_pad = w_padding if w_padding % 1 == 0 else w_padding-0.5
    padding = (int(l_pad), int(t_pad), int(r_pad), int(b_pad))
    return padding

class NewPad(object):
    def __init__(self, fill=0, padding_mode='constant', imgsz = [224,224]):
        assert isinstance(fill, (numbers.Number, str, tuple))
        assert padding_mode in ['constant', 'edge', 'reflect', 'symmetric']
        self.imgsz = imgsz
        self.fill = fill
        self.padding_mode = padding_mode
        
    def __call__(self, img):
        """
        Args:
            img (PIL Image): Image to be padded.

        Returns:
            PIL Image: Padded image.
        """
        return F.pad(img, get_padding(img, self.imgsz), self.fill, self.padding_mode)
    
    def __repr__(self):
        return self.__class__.__name__ + '(padding={0}, fill={1}, padding_mode={2})'.\
            format(self.fill, self.padding_mode)
    
data_transforms = transforms.Compose([
    NewPad(imgsz = reg_imgsz),
    transforms.Resize((reg_imgsz[1], reg_imgsz[0])), # h,w
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])     


In [None]:
def cut_out(mask, img, bbox):
    """Détoure le mask de l'image

    Args:
        mask (np.ndarray): Mask à détourer
        img (np.ndarray): Image source
        bbox (list: int(x_min, y_min, x_max, y_max)): Boite encadrante

    Returns:
        np.ndarray: Image détourée
    """
    x_min, y_min, x_max, y_max = list(map(int, bbox))
    mask = cv2.dilate(mask, np.ones((margin_y, margin_x), np.uint8)) # add margin around mask (for error resilience)
    color = np.array([0,0,0], dtype='uint8')
    cut_out_img = np.where(mask[...,None], img, color)
    h, w, _ = cut_out_img.shape
    cut_out_img = cut_out_img[max(y_min-margin_y, 0):min(y_max+margin_y,h), max(x_min-margin_x, 0):min(x_max+margin_x,w)]
    return cut_out_img

# Detectection

In [None]:
det_result = det_model(front, cfg='cfg_det.yaml', visualize = visualize, conf = det_threshold, save = save, project='tmp', name = 'det', exist_ok=True, agnostic_nms=True, iou=iou)

In [None]:
det_conf = [x for x in det_result[0].boxes.conf.tolist() if x > det_threshold]
det_xyxy = [det_result[0].boxes.xyxy[i].tolist() for i in range(len(det_result[0].boxes.conf.tolist())) if det_result[0].boxes.conf[i] > det_threshold]
det_x = [elmt[0] for elmt in det_xyxy]
det_class = [det_model.names[det_result[0].boxes.cls[i].tolist()] for i in range(len(det_result[0].boxes.conf.tolist())) if det_result[0].boxes.conf[i] > det_threshold]

det_x_s, det_conf_s, det_xyxy_s, det_class_s = map(list, zip(*sorted(zip(det_x, det_conf, det_xyxy, det_class)))) # tri des listes par rapport à det_x (position gauche/droite des boites prédites)

# Segmentation

In [None]:
seg_result = seg_model(top, cfg='cfg_seg.yaml', visualize = visualize, save = save, show_boxes = False, conf = seg_threshold, project='tmp', name = 'seg', exist_ok =True)

In [None]:
seg_conf = [x for x in seg_result[0].boxes.conf.tolist() if x > seg_threshold]
seg_xyxy = [seg_result[0].boxes.xyxy[i].tolist() for i in range(len(seg_result[0].boxes.conf.tolist())) if seg_result[0].boxes.conf[i] > seg_threshold]
seg_x = [elmt[0] for elmt in seg_xyxy]
seg_mask = [seg_result[0].masks.data[i].cpu().numpy() for i in range(len(seg_result[0].boxes.conf.tolist())) if seg_result[0].boxes.conf[i] > seg_threshold]

seg_x_s, seg_conf_s, seg_xyxy_s, seg_mask_s = map(list, zip(*sorted(zip(seg_x, seg_conf, seg_xyxy, seg_mask)))) # tri des listes par rapport à det_x (position gauche/droite des boites prédites)

## Visualisation de la segmentation

In [None]:
def visualize_bbox(img, bbox, color=(255, 0, 0), thickness=2):
    """Ajoute le boite encadrante à l'image

    Args:
        img (np.ndarray): Image source
        bbox (list: int(x_min, y_min, x_max, y_max)): Coordonnées de la boite encadrante
        color (tuple: int, optional): Couleur de la boite. Defaults to (255, 0, 0).
        thickness (int, optional): Epaisseur de la boite. Defaults to 10.

    Returns:
        np.ndarray: Image avec la boite encadrante
    """
    x_min, y_min, x_max, y_max = bbox
    cv2.rectangle(img, (x_min, y_min), (x_max, y_max), color=color, thickness=thickness)
    return img

In [None]:
def maskVisualize(image, mask):
    """Ajoute le mask à l'image

    Args:
        img (np.ndarray): Image source
        mask (np.ndarray): Mask à visualiser

    Returns:
        np.ndarray: Image avec le mask
    """
    color = np.array([255,0,0], dtype='uint8')
    mask = cv2.dilate(mask, kernel = np.ones((margin_y, margin_x), np.uint8)) # déforme le mask pour prendre de la marge
    masked_img = np.where(mask[...,None], color, image) # image avec mask plein
    image = cv2.addWeighted(image, 0.8, masked_img, 0.2,0) # image avec mask dilué
    return image

In [None]:
image = cv2.imread(top)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
w, h, _ = image.shape
for i in range(len(seg_conf_s)):
    image = maskVisualize(image, cv2.resize(seg_mask_s[i], (h,w), interpolation =cv2.INTER_LANCZOS4))# cv2.INTER_LINEAR))
    #image = visualize_bbox(image, seg_xyxy_s[i])
plt.imshow(image)
plt.show()
if save:
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    cv2.imwrite('tmp/seg/overlap.jpg', image)
    

In [None]:
for mask in seg_mask_s:
    print(mask.dtype)

In [None]:
"""Sépare les mask non continues

Il faudra alors voir qu'est ce qui est gardé (sur cette image on a deux fois des segmentation qui prend les deux rangées mais qui se concentre sur chaque fois une des 2 (en entière) et l'autre est coupée)
Sur une des autres images on avais un petit bout de mask non continu mais sans aucun sens...
à priori si une segmentation a plusierus parties continues, un des partie est la vrai cible complete... et l'autre (plus petite) est à ignorer/supprimer, si l'autre est une autre rangée elle devrai être
segmenter aussi (séparement)
"""
for id, mask in enumerate(seg_mask_s):
    mask_copy= mask.copy()
    labeled_array, num_features = label(mask_copy)
    size_max = 0
    for i in range(1, num_features+1):    
        feature_mask = np.array(labeled_array, dtype='float32')
        feature_mask[feature_mask!=i]=0 # cache les autres parties continues
        feature_mask[feature_mask!=0]=1.0
        size=feature_mask.sum()
        print(feature_mask.dtype)
        if size_max < size:
            size_max=size
            seg_mask_s[id]=feature_mask.copy()
# marche sur cet exemple, à vérifier sur d'autre ou continuer avec le model "triche"
       


In [None]:
seg_x_s, seg_conf_s, seg_xyxy_s, seg_mask_s = remove_overlap(seg_x_s, seg_conf_s, seg_xyxy_s, seg_mask_s, overlap_threshold)

In [None]:
image = cv2.imread(top)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
w, h, _ = image.shape
for i in range(len(seg_conf_s)):
    image = maskVisualize(image, cv2.resize(seg_mask_s[i], (h,w), interpolation =cv2.INTER_LANCZOS4))# cv2.INTER_LINEAR))
    #image = visualize_bbox(image, seg_xyxy_s[i])
plt.imshow(image)
plt.show()
if save:
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    cv2.imwrite('tmp/seg/no_overlap.jpg', image)
    

# cropping & regression

In [None]:
# créer dossier temporaire
try:
    os.makedirs('tmp/cut_out')
except OSError:
    pass
# préparer le model de regression
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
reg_model.to(device)
reg_model.eval()
reg_s = []
# Découpages puis regressions
image = cv2.imread(top)
w, h, _ = image.shape
for i in range(len(seg_conf_s)):
    # cropping & save for debug
    img = cut_out(cv2.resize(seg_mask_s[i], (h,w), interpolation = cv2.INTER_LINEAR), image, seg_xyxy_s[i])
    cv2.imwrite("tmp/cut_out/{}.jpg".format(i), img)
    # regression
    img = Image.open("tmp/cut_out/{}.jpg".format(i)).convert("RGB")
    img = data_transforms(img)
    if save:
        transform = transforms.ToPILImage()
        imgpil = transform(img)
        imgpil.save(f'tmp/cut_out/transformed{i}.jpg')
    img = torch.unsqueeze(img, dim=0)
    img = img.to(device)
    pred = reg_model(img)
    pred = round(pred[0][0].item())
    print(pred)
    reg_s.append(pred)

In [None]:
"""reg_model = gen_new_model(torch.load('best.pt'))
reg_model.to('cuda')
reg_model.eval()
img = Image.open("1.jpg").convert("RGB")
img.show()
img = data_transforms(img)
img = torch.unsqueeze(img, dim=0)
img = img.to(device)
pred = reg_model(img)
print(pred[0][0].item())
pred = round(pred[0][0].item())
print(pred)"""

# Mise en commun

In [None]:
label_set = set(det_model.names.values())
common = dict(zip(det_class_s, reg_s))
results={x:0 for x in label_set}
for i in range(len(det_class_s)):
    results[det_class_s[i]]+=reg_s[i]
print(results)

In [None]:
if not save:
    shutil.rmtree('tmp')