In [None]:
import ast

In [None]:
class Image_label_Dataset(Dataset):
    def __init__(self, images_dir, csv_file, fallback_csv, transform=None, split="train", seed=42):
        """
        Args:
            images_dir (string): Dossier contenant les images.
            csv_file (string): Chemin vers le fichier CSV contenant les informations principales.
            fallback_csv (string): Chemin vers le fichier CSV contenant image_id + bbox + final_label.
            transform (callable, optional): Transformations à appliquer aux images.
            split (string): "train" ou "test" pour choisir le dataset.
            seed (int): Pour rendre la répartition fixe.
        """
        
        self.images_dir = images_dir
        self.transform = transform
        self.split = split
        self.seed = seed
        
        self.data = pd.read_csv(csv_file)
        self.fallback_data = pd.read_csv(fallback_csv)
        self.boxes = self._create_boxes_list()

        self._split_data()
        self.class_counts = self.count_classes()

    def _create_boxes_list(self):
        """
        Crée une liste de dictionnaires où chaque boîte est une entrée unique.
        """
        boxes = []
        for _, row in self.data.iterrows():
            image_id = str(row['id'])
            bbox = [row['xc'], row['yc'], row['w'], row['h']]
            avis = row['avis']
            label = self.avis_majoritaire(avis)
            
            if label is None:
                label = self.get_fallback_label(image_id, bbox)
            
            if label not in [None, 8]:  
                boxes.append({
                    'image_id': image_id,
                    'bbox': bbox,
                    'label': label
                })
        return boxes

    def get_fallback_label(self, image_id, bbox):
        """
        Cherche le label dans le fichier CSV de fallback si avis_majoritaire retourne None.
        """
        # Convertir la chaîne de caractères du 'bbox' du fichier fallback en tuple
        self.fallback_data['bbox_tuple'] = self.fallback_data['bbox'].apply(lambda x: ast.literal_eval(x))

        # Arrondir le bbox à 5 décimales pour correspondre au format du fichier fallback
        bbox_round = tuple(round(val, 5) for val in bbox)

        # Recherche d'une correspondance
        match = self.fallback_data[(self.fallback_data['idx'] == image_id) & 
                                    (self.fallback_data['bbox_tuple'] == bbox_round)]

        if not match.empty:
            return int(match.iloc[0]['final_label'])
        
        return None

    def _split_data(self):
        """
        Effectue le split de l'ensemble de données en train et test.
        """
        labels = [box["label"] for box in self.boxes]
        train_data, test_data = train_test_split(self.boxes, test_size=0.4, random_state=self.seed, stratify=labels)

        if self.split == "train":
            self.data_split = train_data
        elif self.split == "test":
            self.data_split = test_data
        else:
            raise ValueError("Split must be one of ['train', 'test']")

    def count_classes(self):
        """
        Compte le nombre d'instances pour chaque classe dans le dataset actuel.
        """
        class_counts = defaultdict(int)
        for annotation in self.data_split:
            label = annotation['label']
            class_counts[label] += 1
        return dict(class_counts)

    def avis_majoritaire(self, avis, min_count=4):
        """Calcule l'avis majoritaire uniquement s'il dépasse un seuil minimal."""
        parts = avis.split('_')
        count = Counter(parts)
        
        majoritaire, occurrences = max(count.items(), key=lambda x: x[1])
        
        if occurrences >= min_count:
            return int(majoritaire)
        
        return None

    def __len__(self):
        return len(self.data_split)

    def __getitem__(self, idx):
        """
        Retourne une image découpée selon la boîte englobante et son label.
        """
        annotation = self.data_split[idx]
        bbox = annotation['bbox']
        label = annotation['label']

        image_path = os.path.join(self.images_dir, f"{annotation['image_id']}.jpg")
        image = Image.open(image_path).convert("RGB")

        xc, yc, w, h = bbox
        x_min = int((xc - w / 2) * image.width)
        x_max = int((xc + w / 2) * image.width)
        y_min = int((yc - h / 2) * image.height)
        y_max = int((yc + h / 2) * image.height)

        cropped_image = image.crop((x_min, y_min, x_max, y_max))

        if self.transform:
            cropped_image = self.transform(cropped_image)
            
        label = torch.tensor(label, dtype=torch.long)
            
        return cropped_image, label


In [None]:
train_dataset_label = Image_label_Dataset(images_dir=img_dir, csv_file=csv_file, fallback_csv=label_csv, transform=transform, split='train')
test_dataset_label = Image_label_Dataset(images_dir=img_dir, csv_file=csv_file, fallback_csv=label_csv, transform=transform, split='test')

train_background = BackgroundDataset(img_dir, csv_file, transform=transform, num_samples=int(mean(train_dataset.class_counts.values())))
test_background = BackgroundDataset(img_dir, csv_file, transform=transform, num_samples=int(mean(test_dataset.class_counts.values())))

In [None]:
train_data_background = torch.utils.data.ConcatDataset([train_dataset_label, train_background])
test_data_background = torch.utils.data.ConcatDataset([test_dataset_label, test_background])

In [None]:
full_data_background = torch.utils.data.ConcatDataset([train_data_background, test_data_background])