In [2]:
DATA_PATH = "D:/NOA/시설작물모델/output/dataset"
OUTPUT_PATH = "D:/NOA/시설작물모델/output"
split_ratio_train = 0.8  # 훈련 데이터 비율
split_ratio_val = 0.2  # 검증 데이터 비율
data_splitter = DataSplitter(DATA_PATH, OUTPUT_PATH, split_ratio_train, split_ratio_val)
data_splitter.split_data()


Splitting data: 1it [00:55, 55.50s/it]


KeyboardInterrupt: 

In [4]:
import os
import random
import shutil
import json
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

class DataSplitter:
    def __init__(self, data_path, output_path, split_ratio_train, split_ratio_val, num_samples=1000):
        self.data_path = data_path
        self.output_path = output_path
        self.split_ratio_train = split_ratio_train
        self.split_ratio_val = split_ratio_val
        self.num_samples = num_samples  # 선택할 이미지의 수
        self.coco_train = {
            "images": [],
            "annotations": [],
            "categories": []
        }
        self.coco_val = {
            "images": [],
            "annotations": [],
            "categories": []
        }
        self.annotation_id = 1

    def split_data(self):
        # 각 데이터 폴더를 생성합니다.
        train_path = os.path.join(self.output_path, "train")
        val_path = os.path.join(self.output_path, "val")
        annotations_train_path = os.path.join(self.output_path, "annotations", "train.json")
        annotations_val_path = os.path.join(self.output_path, "annotations", "val.json")
        
        os.makedirs(train_path, exist_ok=True)
        os.makedirs(val_path, exist_ok=True)
        os.makedirs(os.path.join(self.output_path, "annotations"), exist_ok=True)

        # 이미지 파일 목록을 가져옵니다.
        image_files = []
        for root, dirs, files in os.walk(os.path.join(self.data_path, "images")):
            for file in files:
                if file.lower().endswith((".jpg", ".jpeg", ".png")):
                    img_path = os.path.join(root, file)
                    image_files.append(img_path)

        # 이미지 및 JSON 파일을 사전으로 미리 적재합니다.
        image_json_mapping = {}
        for img_path in image_files:
            json_path = img_path.replace("images", "labels").rsplit(".", 1)[0] + ".json"
            image_json_mapping[img_path] = json_path

        # 이미지를 작물 코드(pl_code)와 카테고리 이름을 기준으로 분류합니다.
        pl_code_to_images = {}
        category_to_images = {}
        for img_path, json_path in tqdm(image_json_mapping.items(), desc="이미지 분류 중"):
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            pl_code = data.get("pl_code", "")
            category_name = data.get("name", "")

            if pl_code not in pl_code_to_images:
                pl_code_to_images[pl_code] = []
            pl_code_to_images[pl_code].append(img_path)

            if category_name not in category_to_images:
                category_to_images[category_name] = []
            category_to_images[category_name].append(img_path)

        # 선택할 이미지를 랜덤하게 고릅니다.
        selected_files = []
        selected_pl_codes = set()
        selected_categories = set()

        pbar = tqdm(total=self.num_samples, desc="이미지 선택 중")
        while len(selected_files) < self.num_samples:
            img_path = random.choice(image_files)
            json_path = image_json_mapping[img_path]
            with open(json_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            pl_code = data.get("pl_code", "")
            category_name = data.get("name", "")

            # 이미지 후보를 선택할 때, 조건에 맞는지 확인합니다.
            if pl_code not in selected_pl_codes and category_name not in selected_categories:
                selected_files.append(img_path)
                selected_pl_codes.add(pl_code)
                selected_categories.add(category_name)
                pbar.update(1)
        
        pbar.close()

        # 선택된 이미지 파일을 처리합니다.
        pbar = tqdm(total=len(selected_files), desc="이미지 처리 중")
        with ThreadPoolExecutor() as executor:
            futures = []
            for img_path in selected_files:
                json_path = image_json_mapping[img_path]
                futures.append(executor.submit(self.process_image, img_path, json_path))

            for future in tqdm(as_completed(futures), total=len(futures), desc="이미지 처리 중"):
                result = future.result()
                if result:
                    coco_dataset, image_info = result
                    if coco_dataset is not None and image_info is not None:
                        if "train" in result:
                            self.coco_train["images"].append(image_info)
                            self.coco_train["annotations"].extend(coco_dataset["annotations"])
                            if not self.coco_train["categories"]:
                                self.coco_train["categories"] = coco_dataset["categories"]
                        elif "val" in result:
                            self.coco_val["images"].append(image_info)
                            self.coco_val["annotations"].extend(coco_dataset["annotations"])
                            if not self.coco_val["categories"]:
                                self.coco_val["categories"] = coco_dataset["categories"]
                pbar.update(1)

        pbar.close()

        # 데이터를 셔플링합니다.
        random.shuffle(self.coco_train["images"])
        random.shuffle(self.coco_val["images"])

        # annotations 파일을 저장합니다.
        with open(annotations_train_path, 'w', encoding='utf-8') as f:
            json.dump(self.coco_train, f, indent=4, ensure_ascii=False)
        
        with open(annotations_val_path, 'w', encoding='utf-8') as f:
            json.dump(self.coco_val, f, indent=4, ensure_ascii=False)

    def process_image(self, img_path, json_path):
        # 무작위로 선택한 비율에 따라 train 또는 val로 분배합니다.
        rand_val = random.random()
        if rand_val < self.split_ratio_train:
            dest_folder = os.path.join(self.output_path, "train")
            coco_dataset = {
                "images": [],
                "annotations": [],
                "categories": []
            }
            dataset_type = "train"
        else:
            dest_folder = os.path.join(self.output_path, "val")
            coco_dataset = {
                "images": [],
                "annotations": [],
                "categories": []
            }
            dataset_type = "val"

        # 이미지 파일을 이동시킵니다.
        os.makedirs(dest_folder, exist_ok=True)
        new_img_path = os.path.join(dest_folder, os.path.basename(img_path))
        shutil.copy(img_path, new_img_path)

        # JSON 파일에서 이미지 정보 가져오기
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        image_info = {
            "id": self.annotation_id,
            "file_name": os.path.basename(img_path),
            "width": data.get("width", 0),
            "height": data.get("height", 0),
            "pl_code": data.get("pl_code", "")
        }

        # 주석(annotations) 정보 추가
        for annotation in data["annotations"]:
            annotation["id"] = self.annotation_id
            annotation["image_id"] = self.annotation_id
            coco_dataset["annotations"].append(annotation)

        # 카테고리 추가 (처음 한 번만 추가)
        if not coco_dataset["categories"]:
            for category in data["categories"]:
                coco_dataset["categories"].append({
                    "id": category["id"],
                    "name": category["name"],
                    "supercategory": category.get("supercategory", "none")
                })

        self.annotation_id += 1

        return dataset_type, coco_dataset, image_info

In [3]:
import os
import random
import shutil
import json
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

class DataSplitter:
    def __init__(self, data_path, output_path, split_ratio_train, split_ratio_val, num_samples=1000):
        self.data_path = data_path
        self.output_path = output_path
        self.split_ratio_train = split_ratio_train
        self.split_ratio_val = split_ratio_val
        self.num_samples = num_samples
        self.coco_train = {"images": [], "annotations": [], "categories": []}
        self.coco_val = {"images": [], "annotations": [], "categories": []}
        self.annotation_id = 1

    def split_data(self):
        train_path = os.path.join(self.output_path, "train")
        val_path = os.path.join(self.output_path, "val")
        annotations_train_path = os.path.join(self.output_path, "annotations", "train.json")
        annotations_val_path = os.path.join(self.output_path, "annotations", "val.json")
        
        os.makedirs(train_path, exist_ok=True)
        os.makedirs(val_path, exist_ok=True)
        os.makedirs(os.path.join(self.output_path, "annotations"), exist_ok=True)

        image_files = [os.path.join(root, file)
                       for root, _, files in os.walk(os.path.join(self.data_path, "images"))
                       for file in files if file.lower().endswith((".jpg", ".jpeg", ".png"))]

        image_json_mapping = {img_path: img_path.replace("images", "labels").rsplit(".", 1)[0] + ".json"
                              for img_path in image_files}

        pl_code_to_images = {}
        category_to_images = {}
        
        with ThreadPoolExecutor() as executor:
            futures = {executor.submit(self.classify_image, img_path, json_path): img_path
                       for img_path, json_path in image_json_mapping.items()}
            
            for future in tqdm(as_completed(futures), total=len(futures), desc="이미지 분류 중"):
                img_path, pl_code, category_name = future.result()
                if pl_code:
                    pl_code_to_images.setdefault(pl_code, []).append(img_path)
                if category_name:
                    category_to_images.setdefault(category_name, []).append(img_path)

        selected_files = self.select_images(pl_code_to_images, category_to_images)

        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(self.process_image, img_path, image_json_mapping[img_path])
                       for img_path in selected_files]
            
            for future in tqdm(as_completed(futures), total=len(futures), desc="이미지 처리 중"):
                result = future.result()
                if result:
                    dataset_type, coco_dataset, image_info = result
                    if dataset_type == "train":
                        self.coco_train["images"].append(image_info)
                        self.coco_train["annotations"].extend(coco_dataset["annotations"])
                        if not self.coco_train["categories"]:
                            self.coco_train["categories"] = coco_dataset["categories"]
                    else:
                        self.coco_val["images"].append(image_info)
                        self.coco_val["annotations"].extend(coco_dataset["annotations"])
                        if not self.coco_val["categories"]:
                            self.coco_val["categories"] = coco_dataset["categories"]

        random.shuffle(self.coco_train["images"])
        random.shuffle(self.coco_val["images"])

        with open(annotations_train_path, 'w', encoding='utf-8') as f:
            json.dump(self.coco_train, f, indent=4, ensure_ascii=False)
        
        with open(annotations_val_path, 'w', encoding='utf-8') as f:
            json.dump(self.coco_val, f, indent=4, ensure_ascii=False)

    def classify_image(self, img_path, json_path):
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        pl_code = data.get("pl_code", "")
        category_name = data.get("name", "")
        return img_path, pl_code, category_name

    def select_images(self, pl_code_to_images, category_to_images):
        selected_files = set()
        selected_pl_codes = set()
        selected_categories = set()

        pbar = tqdm(total=self.num_samples, desc="이미지 선택 중")
        while len(selected_files) < self.num_samples:
            if not pl_code_to_images or not category_to_images:
                break
            
            pl_code = random.choice(list(pl_code_to_images.keys()))
            category = random.choice(list(category_to_images.keys()))

            pl_code_images = pl_code_to_images[pl_code]
            category_images = category_to_images[category]

            candidate_images = set(pl_code_images) & set(category_images)
            if not candidate_images:
                continue

            img_path = random.choice(list(candidate_images))
            if img_path in selected_files:
                continue

            selected_files.add(img_path)
            selected_pl_codes.add(pl_code)
            selected_categories.add(category)

            pl_code_to_images[pl_code].remove(img_path)
            category_to_images[category].remove(img_path)

            if not pl_code_to_images[pl_code]:
                del pl_code_to_images[pl_code]
            if not category_to_images[category]:
                del category_to_images[category]

            pbar.update(1)
        
        pbar.close()
        return list(selected_files)

    def process_image(self, img_path, json_path):
        rand_val = random.random()
        if rand_val < self.split_ratio_train:
            dest_folder = os.path.join(self.output_path, "train")
            coco_dataset = {"images": [], "annotations": [], "categories": []}
            dataset_type = "train"
        else:
            dest_folder = os.path.join(self.output_path, "val")
            coco_dataset = {"images": [], "annotations": [], "categories": []}
            dataset_type = "val"

        os.makedirs(dest_folder, exist_ok=True)
        new_img_path = os.path.join(dest_folder, os.path.basename(img_path))
        shutil.copy(img_path, new_img_path)

        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        image_info = {
            "id": self.annotation_id,
            "file_name": os.path.basename(img_path),
            "width": data.get("width", 0),
            "height": data.get("height", 0),
            "pl_code": data.get("pl_code", "")
        }

        for annotation in data["annotations"]:
            annotation["id"] = self.annotation_id
            annotation["image_id"] = self.annotation_id
            coco_dataset["annotations"].append(annotation)

        if not coco_dataset["categories"]:
            for category in data["categories"]:
                coco_dataset["categories"].append({
                    "id": category["id"],
                    "name": category["name"],
                    "supercategory": category.get("supercategory", "none")
                })

        self.annotation_id += 1

        return dataset_type, coco_dataset, image_info

In [4]:
# 예시 사용법:
data_splitter = DataSplitter(data_path="D:/NOA/시설작물모델/output/dataset",
                             output_path="D:/NOA/시설작물모델/output",
                             split_ratio_train=0.8,
                             split_ratio_val=0.2,
                             num_samples=1000)
data_splitter.split_data()

이미지 분류 중: 100%|██████████| 126677/126677 [00:01<00:00, 96629.11it/s] 
이미지 선택 중:   0%|          | 0/1000 [00:00<?, ?it/s]
이미지 처리 중: 0it [00:00, ?it/s]
