1. 데이터 구조 정리
2. 라벨링 품질 확인
3. 이미지 형식 통일(jpg), 손상 파일 제거
4. 파일명 변환

In [1]:
import cv2
import matplotlib.pyplot as plt
import os
import shutil
from pathlib import Path
from PIL import Image, ImageDraw
from MaskRecog.preprocess.preprocessor import load_args, load_image, load_yolo_label, draw_bbox
from tqdm import tqdm

# 데이터셋 1 (args["data_dir_1"])

## 1. 데이터 구조 정리

In [2]:
args = load_args()

In [3]:
def get_unique_path(dst_path: Path) -> Path:
    '''
    데이터 구조 정리시 파일명 중복 방지
    '''
    new_path = dst_path
    i = 1
    while new_path.exists():
        new_path = new_path.parent / f"{new_path.stem}({i}){new_path.suffix}"
        i += 1

    return new_path

In [4]:
def data_structure_setup(directory: Path) -> None:
    '''
    YOLO 형식 데이터 구조로 변환
    data/       
    └─ original/
    ├─ images
    └─ labels
    '''
    splits = ["train", "valid", "test"]
    label_ext = ".txt"
    image_exts = [".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff"]

    data_original_images = directory / "original" / "images"
    data_original_labels = directory / "original" / "labels"
    data_original_images.mkdir(parents=True, exist_ok=True)
    data_original_labels.mkdir(parents=True, exist_ok=True)

    for split in splits:
        split_dir = directory / "original" / split
        for file_path in split_dir.iterdir():
            if file_path.suffix.lower() == label_ext:
                target_dir = data_original_labels
            elif file_path.suffix.lower() in image_exts:
                target_dir = data_original_images
            else:
                continue

            dst_path = target_dir / file_path.name
            dst_path = get_unique_path(dst_path)
            shutil.move(str(file_path), str(dst_path))

In [None]:
# data_structure_setup(Path(args["data_dir_1"]))

## 2. 라벨링 품질 확인

In [5]:
def corresponding_image_path(txt_path: Path) -> Path | None:
    '''
    Find corresponding image file for given txt path.
    And return if it exist.

    Args:
        txt_path: Path object for the label (.txt) file
    Return:
        Path object of the corresponding image file, or None if not found
    '''
    image_dir = txt_path.parent.parent / 'images'
    base_name = txt_path.stem
    possible_exts = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']

    for ext in possible_exts:
        image_path = image_dir / f"{base_name}{ext}"
        if image_path.exists():
            return image_path

    return None

In [6]:
def sample_printing(label_path: Path, iteration: int= 10) -> None:
    txt_paths = label_path.glob("*.txt")
    i = 1
    for txt_path in txt_paths:
        if i >= iteration:
            break
        image_path = corresponding_image_path(txt_path)
        draw_bbox(image_path, txt_path)
        i += 1

In [None]:
# data_dir_1 = Path(args["data_dir_1"]) / "original" / "labels"
# sample_printing(data_dir_1)

In [None]:
# processed_data = Path(args["data_dir_1"]) / "processed" / "labels"
# sample_printing(processed_data)

## 3. 이미지 형식 통일(.jpg), 손상 파일 제거

1. 열리지 않는 파일 삭제
2. jpg가 아닌 이미지 파일 jpg로 변환
3. 변환된 이미지의 원본 파일 삭제

In [9]:
def delete_files(txt_path: Path=None, image_path: Path=None) -> None:
    '''
    Delete one file or two files.

    Args:
        txt path or image path or both of them
    '''
    if txt_path:
        try:
            os.remove(txt_path)
            print(f"Successfully deleted: {txt_path}")
        except Exception as e:
            print(f"{e}: failed to remove {txt_path}")

    if image_path:
        try:
            os.remove(image_path)
            print(f"Successfully deleted: {image_path}")
        except Exception as e:
            print(f"{e}: failed to remove {image_path}")


In [10]:
def convert_to_jpg(image_path: Path) -> None:
    '''
    Check image file's extension.
    If it's not 'jpg', convert into 'jpg'.
    If converting fails, delete image and corresponding txt files.
    '''
    base_name = image_path.stem
    txt_path = image_path.parent.parent / 'labels' / base_name / '.txt'

    try:
        if image_path.suffix.lower() in ['.jpg', '.jpeg']:
            new_path = image_path.with_suffix('.jpg')
            image_path.rename(new_path)
        else:
            image = load_image(image_path)
            new_path = image_path.with_suffix(".jpg")
            image.convert("RGB")
            image.save(new_path, "JPEG")

    except Exception as e:
        print(f"{e}: Failed to convert image extension {image_path}")
        delete_files(txt_path, image_path)

1. txt에 대응되는 이미지 파일이 없는 경우 삭제
2. txt나 이미지가 열리지 않는 경우 삭제
3. 이미지 확장자가 jpg가 아닌 경우 jpg로 변환
4. jpg로 변환이 안 되는 경우 -> 이미지, txt 삭제 
* 주의) 빈 txt 파일은 유효한 데이터임
5. txt가 YOLO 포멧이 아닌 경우 -> 이미지, txt 삭제

In [29]:
def data_cleaning(data_path: Path, start_file_num: int=1) -> None:
    '''

    '''
    label_path = data_path / "original" / "labels"
    txt_paths = [path for path in label_path.glob("*.txt")]

    # make YOLO format directory structure
    new_image_path = data_path / "cleaned" / "images"
    new_label_path = data_path / "cleaned" / "labels"
    os.makedirs(new_image_path, exist_ok = True)
    os.makedirs(new_label_path, exist_ok = True)

    i = 0 # filename index
    for txt_path in tqdm(txt_paths):
        # (image, txt) pairity check
        image_path = corresponding_image_path(txt_path)
        if image_path is None:
            print(f"Corresponding image file does not exist: {txt_path}")
            # delete_files(txt_path)
            continue

        # file open check
        try:
            image = Image.open(image_path)
            labels = load_yolo_label(txt_path)
        except Exception as e:
            print(f"File open error: {e} for {image_path} or {txt_path}")
            # delete_files(txt_path, image_path)
            continue

        # file name -> 00001.jpg
        file_num = i + start_file_num
        name = f"{file_num:05}"
        new_image_file = new_image_path / f"{name}.jpg"
        new_label_file = new_label_path / f"{name}.txt"

        # image extention check and convert
        ext = image_path.suffix.lower()
        if ext in [".jpg", ".jpeg"]:
            image.save(new_image_file, "JPEG")

        elif ext in [".png", '.bmp', '.tiff']:
            try:
                image = image.convert("RGB")
                image.save(new_image_file, "JPEG")
            except Exception as e:
                print(f"Image converting error {e}: {image_path}")
                # delete_files(txt_path, image_path)
                continue
        else:
            print(f"extension of {image_path} is incorrect.")
            continue
        #     delete_files(txt_path, image_path)

        with open(new_label_file, 'w') as f:
            for line in labels:
                f.write(' '.join(map(str, line)) + '\n')

        i += 1

In [12]:
# data_cleaning(Path(args["data_dir_1"]))

# 데이터셋 2(args["data_dir_2"])

## 1. 라벨링 품질 확인

In [None]:
label_dir_2 = Path(args["data_dir_2"]) / "original" / "labels"
sample_printing(label_dir_2)

## 2. 데이터 정리

In [30]:
data_dir_2 = Path(args["data_dir_2"])
data_cleaning(data_dir_2)

  0%|          | 0/853 [00:00<?, ?it/s]

100%|██████████| 853/853 [01:15<00:00, 11.28it/s]


'maksssksksss0.txt'

In [None]:
a = Path("/workspaces/mask_recognition/data/dataset_kaggle_Face_Mask_Detection/original/labels/maksssksksss0.txt")
a_labels = load_yolo_label(a)
a_labels[0]

FileNotFoundError: [Errno 2] No such file or directory: '/mask_recognition/data/dataset_kaggle_Face_Mask_Detection/original/labels/maksssksksss0.txt'

In [26]:
cls, cx, cy, bw, bh = a_labels[0]
type(cls), type(cx)

(int, float)

In [27]:
cleaned = []
for label in a_labels:
    cls, cx, cy, bw, bh = label
    new_cls = 0 if cls == 1 else 1
    cleaned.append(f"{new_cls} {cx} {cy} {bw} {bh}\n")

cleaned

['0 0.183594 0.337432 0.058594 0.072266\n',
 '1 0.401367 0.333333 0.080078 0.085938\n',
 '0 0.668945 0.315574 0.068359 0.099609\n']

In [28]:
with open("/mask_recognition/test/000.txt", "w") as f:
    f.writelines(cleaned)

In [None]:
def label_cleaning(txt_path: Path) -> None:
    labels = load_yolo_label(txt_path)
    cleaned = []

    for label in labels:
        cls, cx, cy, bw, bh = label
        new_cls = 0 if cls == 1 else 1
        cleaned.append(f"{new_cls} {cx} {cy} {bw} {bh}\n")
    
    with open(txt_path, 'w') as f:
        f.writelines(cleaned)

In [23]:
txt_path_gen = data_dir_2 / "cleaned" / "labels"
for txt_path in txt_path_gen.glob("*"):
    label_cleaning(txt_path)

00060.txt before

1 0.15375 0.6725 0.0125 0.0175
0 0.21375 0.6525 0.0175 0.0175
0 0.2575 0.72 0.01 0.01
0 0.28375 0.675 0.0125 0.015
0 0.3225 0.6725 0.015 0.0175
0 0.34125 0.685 0.0125 0.015
0 0.365 0.6825 0.015 0.0125
2 0.4375 0.735 0.025 0.025
0 0.575 0.555 0.025 0.03
0 0.7075 0.5325 0.025 0.0325

In [31]:
t_60 = Path("/mask_recognition/test/00060.txt")
label_cleaning(t_60)

In [33]:
labels = load_yolo_label(t_60)
cleaned = []

for label in labels:
    cls, cx, cy, bw, bh = label
    new_cls = 0 if cls == 1 else 1
    cleaned.append(f"{new_cls} {cx} {cy} {bw} {bh}\n")

cleaned

['0 0.15375 0.6725 0.0125 0.0175\n',
 '1 0.21375 0.6525 0.0175 0.0175\n',
 '1 0.2575 0.72 0.01 0.01\n',
 '1 0.28375 0.675 0.0125 0.015\n',
 '1 0.3225 0.6725 0.015 0.0175\n',
 '1 0.34125 0.685 0.0125 0.015\n',
 '1 0.365 0.6825 0.015 0.0125\n',
 '1 0.4375 0.735 0.025 0.025\n',
 '1 0.575 0.555 0.025 0.03\n',
 '1 0.7075 0.5325 0.025 0.0325\n']

In [34]:
with open(t_60, 'w') as f:
    f.writelines(cleaned)

In [6]:
src_dir = Path("/workspaces/mask_recognition/data/dataset_kaggle_Face_Mask_Dataset_YOLO_Format/cleaned")

In [12]:
image_dir = src_dir / "images"
label_dir = src_dir / "labels"
image_paths = sorted([image for image in image_dir.glob("*")])

data_paths = []    
for image_path in image_paths:
        break
        label_path = label_dir / image_path.with_suffix(".txt")
        data_paths.append((image_path, label_path))

label_dir / image_path.with_suffix(".txt").name

PosixPath('/workspaces/mask_recognition/data/dataset_kaggle_Face_Mask_Dataset_YOLO_Format/cleaned/labels/00001.txt')