In [1]:
import os
import zipfile
import cv2
from skimage.metrics import structural_similarity as ssim
from tqdm import tqdm
import pickle
from logging_file import *
from utils.plot_utils import *
from utils.data_manipulation import *
from PIL import Image

In [2]:
def extract_zip(zip_path, extract_to):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

def make_data_folder(zip_dir='zipped_data/labelized/', data_dir='data/labelized/'):
    images_dir = os.path.join(data_dir, "images")
    annotations_dir = os.path.join(data_dir, "annotations")

    if os.path.exists(data_dir):
        shutil.rmtree(data_dir)
    os.makedirs(data_dir)
    os.makedirs(images_dir, exist_ok=True)
    os.makedirs(annotations_dir, exist_ok=True)

    zip_files = [f for f in os.listdir(zip_dir) if f.endswith(".zip")]
    
    for zip_file in tqdm(zip_files, desc='Extracting zip files'):
        zip_path = os.path.join(zip_dir, zip_file)
        extract_to = os.path.join(zip_dir, "tmp")
        
        extract_zip(zip_path, extract_to)
        
        intermediate_folders = os.listdir(extract_to)
        if len(intermediate_folders) != 1:
            continue 
        intermediate_folder = os.path.join(extract_to, intermediate_folders[0])
        
        for class_dir in os.listdir(intermediate_folder):
            class_path = os.path.join(intermediate_folder, class_dir)
            
            if os.path.isdir(class_path):  
                for file in os.listdir(class_path):
                    file_path = os.path.join(class_path, file)
                    
                    if file.endswith((".jpg")):
                        shutil.move(file_path, os.path.join(images_dir, file))
                    
                    elif file.endswith(".txt"):
                        shutil.move(file_path, os.path.join(annotations_dir, file))
        
        shutil.rmtree(extract_to)

def clean_unmatched_files(image_folder='data/labelized/images/', 
                          annotation_folder='data/labelized/annotations/', 
                          backup_folder="backup_invalid_files"):
    
    logger = init_preprocess_logger()
    os.makedirs(backup_folder, exist_ok=True)

    image_files = set(f.replace('.jpg', '') for f in os.listdir(image_folder) if f.endswith('.jpg'))
    annotation_files = set(f.replace('.txt', '') for f in os.listdir(annotation_folder) if f.endswith('.txt'))

    images_to_delete = image_files - annotation_files
    annotations_to_delete = annotation_files - image_files

    for img in images_to_delete:
        img_path = os.path.join(image_folder, img + '.jpg')
        try:
            shutil.move(img_path, os.path.join(backup_folder, os.path.basename(img_path)))
            logger.info(f"Moved unmatched image {img_path} to backup folder.")
        except FileNotFoundError:
            logger.error(f"File not found error: {img_path}")
        except Exception as e:
            logger.error(f"Error moving {img_path}: {e}")

    for txt in annotations_to_delete:
        txt_path = os.path.join(annotation_folder, txt + '.txt')
        try:
            shutil.move(txt_path, os.path.join(backup_folder, os.path.basename(txt_path)))
            logger.info(f"Moved unmatched annotation {txt_path} to backup folder.")
        except FileNotFoundError:
            logger.error(f"File not found error: {txt_path}")
        except Exception as e:
            logger.error(f"Error moving {txt_path}: {e}")

    print("Cleaning completed")

In [3]:
def group_similar_images(image_folder='data/labelized/images/', 
                         backup_folder="backup_invalid_files", 
                         similarity_threshold=0.8):
    logger = init_preprocess_logger()
    
    os.makedirs(backup_folder, exist_ok=True)

    images = sorted([os.path.join(image_folder, img) for img in os.listdir(image_folder) if img.endswith(".jpg")])
    groups = [] 
    current_group = [images[0]]

    moved_images = set()

    for i in tqdm(range(1, len(images)), desc="Grouping similar images"):
        try:
            if images[i-1] in moved_images or images[i] in moved_images:
                continue

            score = compare_images(images[i-1], images[i], logger, backup_folder, moved_images)

            if score is None:
                continue
            
            if score >= similarity_threshold:
                current_group.append(images[i])
            else:
                groups.append(current_group)
                current_group = [images[i]]
        
        except Exception as e:
            move_image_and_annotation(images[i], logger)
            moved_images.add(images[i])  
            continue  

    groups.append(current_group) 

    with open('pickle_files/data/groups.pkl', 'wb') as file:
        pickle.dump(groups, file)

    return groups

def compare_images(img_path1, img_path2, logger, backup_folder, moved_images):
    try:
        img1 = cv2.imread(img_path1)
        img2 = cv2.imread(img_path2)

        if img1 is None:
            move_image_and_annotation(img1, logger)
            moved_images.add(img_path1)
            return None

        if img2 is None:
            move_image_and_annotation(img2, logger)
            moved_images.add(img_path2)
            return None

        if img1.size != img2.size:
            return 0

        img1_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
        img2_gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)

        score = ssim(img1_gray, img2_gray)
        return score

    except Exception as e:
        move_image_and_annotation(img1, logger)
        move_image_and_annotation(img2, logger)
        moved_images.add(img_path1)
        moved_images.add(img_path2)
        return None

def calculate_surface(bounding_box):
    _, _, _, width, height = bounding_box
    return width * height

def parse_annotation_file(annotation_path):
    bounding_boxes = []
    with open(annotation_path, 'r') as file:
        for line in file:
            bounding_boxes.append(list(map(float, line.split())))
    return bounding_boxes

def keep_best_images(groups):
    best_images = []
    
    for group in tqdm(groups, desc="Keeping best images"):
        best_annotation = None
        max_surface = -1
        
        for image_path in group:
            annotation_path = image_path.replace('images', 'annotations').replace('.jpg', '.txt')
            
            bounding_boxes = parse_annotation_file(annotation_path)
            
            if len(bounding_boxes) > 1:
                best_images.append(annotation_path)
                continue
            
            total_surface = calculate_surface(bounding_boxes[0])
            
            if total_surface > max_surface:
                max_surface = total_surface
                best_annotation = annotation_path
        
        if best_annotation:
            best_images.append(convert_path(best_annotation, mode='txt2img'))

    with open('pickle_files/data/best_images.pkl', 'wb') as file:
        pickle.dump(best_images, file)  

    print("Only the best images have been kept")
    return best_images

def move_non_best_files(groups, best_images, backup_folder="backup_invalid_files"):
    os.makedirs(backup_folder, exist_ok=True) 
    logger = init_preprocess_logger()
    
    best_images_set = set(best_images)  
    
    for group in tqdm(groups, desc="Moving non best files"):
        for image_path in group:
            annotation_path = image_path.replace('images', 'annotations').replace('.jpg', '.txt')
            
            if image_path not in best_images_set:
                try:
                    backup_image_path = os.path.join(backup_folder, os.path.basename(image_path))
                    shutil.move(image_path, backup_image_path)
                    logger.info(f"Moved image {os.path.basename(image_path)} to backup folder.")

                    if os.path.exists(annotation_path):
                        backup_annotation_path = os.path.join(backup_folder, os.path.basename(annotation_path))
                        shutil.move(annotation_path, backup_annotation_path)
                        logger.info(f"Moved annotation {os.path.basename(annotation_path)} to backup folder")
                except FileNotFoundError as e:
                    logger.error(f"File not found error: {os.path.basename(image_path)}")
                except Exception as e:
                    logger.error(f"Erreur moving {os.path.basename(image_path)}")


    print("All files have been moved")

In [8]:
make_data_folder()
clean_unmatched_files()
groups = group_similar_images()
best_images = keep_best_images(groups)
move_non_best_files(groups, best_images)
print("Data preparation is complete!")

Extracting zip files: 100%|██████████████████████████████████████████████████████████████| 4/4 [00:30<00:00,  7.55s/it]


Cleaning completed


Grouping similar images: 100%|█████████████████████████████████████████████████████| 9857/9857 [39:43<00:00,  4.14it/s]
Keeping best images: 100%|████████████████████████████████████████████████████████| 4886/4886 [00:46<00:00, 105.44it/s]


Only the best images have been kept


Moving non best files: 100%|██████████████████████████████████████████████████████| 4886/4886 [00:06<00:00, 740.82it/s]

All files have been moved
Data preparation is complete!





In [None]:
coun