# Make YOLO CustomDataSet
- YOLO를 학습하기 위한 데이터를 다운로드 하고 YOLO형식의 어노테이션 파일을 생성한다
  - 개인의 버킷에 보관되어있는 데이터를 다운로드한다.
  



### GCP 버킷에 저장되어 있는 데이터 가져오기
- download_bucket_with_transfer_manager : 버킷에 있는 모든 데이터를 가져온다.
  - bucket_name : GCP 버킷 이름
  - destination_directory : 받아온 데이터를 저장할 로컬 디렉토리 경로
    - 로컬 디렉토리 경로에 버킷의 모든 데이터와 폴더가 생성된다.
  - GCP 버킷 데이터 다운로드에 대한 인증 방법은 GCP API 키를 사용하거나 GCP에서 제공한는 다른방법을 사용한다.

In [None]:
from google.cloud.storage import Client, transfer_manager
bucket_name = "버킷 이름"
destination_directory = "/content/"

def download_bucket_with_transfer_manager(
    bucket_name, destination_directory="", workers=8, max_results=1000
):
    """Download all of the blobs in a bucket, concurrently in a process pool.

    The filename of each blob once downloaded is derived from the blob name and
    the `destination_directory `parameter. For complete control of the filename
    of each blob, use transfer_manager.download_many() instead.

    Directories will be created automatically as needed, for instance to
    accommodate blob names that include slashes.
    """

    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The directory on your computer to which to download all of the files. This
    # string is prepended (with os.path.join()) to the name of each blob to form
    # the full path. Relative paths and absolute paths are both accepted. An
    # empty string means "the current working directory". Note that this
    # parameter allows accepts directory traversal ("../" etc.) and is not
    # intended for unsanitized end user input.
    # destination_directory = ""

    # The maximum number of processes to use for the operation. The performance
    # impact of this value depends on the use case, but smaller files usually
    # benefit from a higher number of processes. Each additional process occupies
    # some CPU and memory resources until finished. Threads can be used instead
    # of processes by passing `worker_type=transfer_manager.THREAD`.
    # workers=8

    # The maximum number of results to fetch from bucket.list_blobs(). This
    # sample code fetches all of the blobs up to max_results and queues them all
    # for download at once. Though they will still be executed in batches up to
    # the processes limit, queueing them all at once can be taxing on system
    # memory if buckets are very large. Adjust max_results as needed for your
    # system environment, or set it to None if you are sure the bucket is not
    # too large to hold in memory easily.
    # max_results=1000


    storage_client = Client()
    bucket = storage_client.bucket(bucket_name)

    blob_names = [blob.name for blob in bucket.list_blobs(max_results=max_results)]

    results = transfer_manager.download_many_to_path(
        bucket, blob_names, destination_directory=destination_directory, max_workers=workers
    )

    for name, result in zip(blob_names, results):
        # The results list is either `None` or an exception for each blob in
        # the input list, in order.

        if isinstance(result, Exception):
            print("Failed to download {} due to exception: {}".format(name, result))
        else:
            print("Downloaded {} to {}.".format(name, destination_directory + name))

if __name__ == "__main__":
    download_bucket_with_transfer_manager(bucket_name, destination_directory)

### 다운 받은 초기 데이터 셋 폴더 구조
01.데이터/
   - 1.Training/
    - 라벨링데이터/
      - 02. 고추/
        - 0.정상.zip
        - 1.질병.zip
      - 05. 상추/
        - 0.정상.zip
        - 1.질병.zip
      - 11. 토마토/
        - 0.정상.zip
        - 1.질병.zip
    - 원천데이터/
      - 02. 고추/
        - 0.정상.zip
        - 1.질병.zip
      - 05. 상추/
        - 0.정상.zip
        - 1.질병.zip
      - 11. 토마토/
        - 0.정상.zip
        - 1.질병.zip
   - 2.Validation/
    - 라벨링데이터/
      - 02. 고추/
        - 0.정상.zip
        - 1.질병.zip
      - 05. 상추/
        - 0.정상.zip
        - 1.질병.zip
      - 11. 토마토/
        - 0.정상.zip
        - 1.질병.zip
    - 원천데이터/
      - 02. 고추/
        - 0.정상.zip
        - 1.질병.zip
      - 05. 상추/
        - 0.정상.zip
        - 1.질병.zip
      - 11. 토마토/
        - 0.정상.zip
        - 1.질병.zip
        

#### 압축 해제 함수 구현
- 지정된 디렉토리 내의 모든 ZIP 파일을 찾아 해제하는 함수.
  - 하위 디렉토리도 포함하여 탐색합니다.
  - param
    - directory: ZIP 파일을 찾아 해제할 루트 디렉토리 경로.
  - return
    - 없음
    

In [None]:
import os
import zipfile

init_unzip_flag = False

def unzip_files_in_directory(directory):
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".zip"):

                zip_file_path = os.path.join(root, file)
                extract_dir = os.path.splitext(zip_file_path)[0]  # .zip 확장자 제거

                # 디렉토리가 존재하지 않으면 생성
                if not os.path.exists(extract_dir):
                    os.makedirs(extract_dir)

                # ZIP 파일 해제
                with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                    zip_ref.extractall(extract_dir)
                print(f"Unzipped {zip_file_path} to {extract_dir}")



#### 라벨링데이터중 모든 데이터폴더 경로 가져오기
- 루트 디렉토리 내의 특정 폴더 아래에서 특정 이름의 디렉토리 경로를 찾는 함수
  - 하위 디렉토리도 포함하여 탐색합니다.
  -param
    - root_directory: 탐색을 시작할 루트 디렉토리
    - required_sub_folders: 필수로 포함되어야 하는 하위 폴더 이름 리스트
    - optional_sub_folders: 어떤 하위 폴더든 하나만 포함되면 되는 하위 폴더 이름 리스트
  - return
    - 디렉토리 경로 리스트
    

In [None]:
import os

def find_specific_dirs(root_directory, required_sub_folders, optional_sub_folders):
    matched_dirs = []

    for dirpath, dirnames, filenames in os.walk(root_directory):
        if all(sub_folder in dirpath for sub_folder in required_sub_folders) and \
                any(sub_folder in dirpath for sub_folder in optional_sub_folders):
                full_path = os.path.join(dirpath)
                matched_dirs.append(full_path)

    return matched_dirs

### 전체 데이터셋에서 필요한 데이터 추출 및 어노테이션 수정
- 활용 데이터 셋 : hub.or.kr/aihubdata/data/view.do?currMenu=115&topMenu=100&aihubDataSe=realm&dataSetSn=153
- 어노테이션 형식 : json -> txt(Yolo)

### 데이터의 작물 부위 확인
- 사용하는 데이터셋에선 줄기 및 열매, 잎 등 여러가지 식물 부위의 데이터가 있다. 우리는 잎 감지를 위해 데이터를 사용하기 때문에 잎 사진 데이터를 확인해야된다.
- 예시)
  - 이미지 파일명: V006_77_0_00_01_01_13_0_c03_20201209_0000_S01_1.jpg
  - 어노테이션 파일명: V006_77_0_00_01_01_13_0_c03_20201209_0000_S01_1.jpg.json
    - 구분자 '_'로 토큰을 나누면 각각의 토큰에 의미를 부여할 수 있다. 이는 데이터 셋 생성자가 정해놓은 규칙으로 작성되어 있는데 중요한 토큰들만 살펴보자
      - token[1] : 세부과제
        - '77': 시설작물 질병, '78':노지작물 해충, '79' :노지작물 질병, '80': 과수화상병
      - token[2] : 데이터 종류
        - '0' : 정상 작물, '1': 질병, '2': 해충, '3':충해
      - token[4] : 작물 코드
        - 별첨 1
      - token[5] : 작물의 부위별 코드
        - 별첨 2

#### 별첨
##### 별첨 1 (식물 종류)
00 작물 없음
01 가지
02 고추
03 단호박
04 딸기
05 상추
06 수박
07 애호박
08 오이
09 쥬키니호박
10 참외
11 토마토
12 포도
##### 별첨 2 (식물 부위)
00 구분 없음
01 열매
02 꽃
03 잎
04 가지
05 줄기
06 뿌리
07 해충

In [None]:
import re

def is_area(filename,area_code):
    # '_'를 기준으로 분할하고 6번째 요소(식물 부위 코드) 확인 -> 해당 요소가 객체의 위치를 나타난다.
    token = filename.split('_')
    #print(parts)
    if len(token) >= 7:
        if token[5] == area_code: # 03번은 잎이다.
            return True
        else:
            return False
    else:
        return False


def get_plant_type_code(filename):
    # '_'를 기준으로 분할하고 5번째 요소 식물 종류 코드 반환 -> Class name 설정을 위해
    token = filename.split('_')
    if len(token) >= 7:
        return token[4]
    else:
        return False

### 데이터 어노테이션 형식 변환
- 기존 데이터셋의 어노테이션은 json형식의 커스텀 어노테이션 형식을 고수한다. 우리는 YOLO모델을 사용하여 학습시키기때문에 YOLO 어노테이션(txt)형식으로 변환해준다.
  - Yolo 어노테이션 형식
    - class_code x_center y_center box_width box_height
    - ex) "0 10 10 2 5"

In [None]:
def convert_to_yolo_annotation(path):
    # JSON 파일 읽기
    with open(path, 'r') as f:
        json_data = json.load(f)
        json_string = json.dumps(json_data)
        data = json.loads(json_string)
        # 이미지 너비와 높이
        image_width = data["description"]["width"]
        image_height = data["description"]["height"]

        # 바운딩 박스 좌표
        points = data["annotations"]["points"][0]

    # center구하고 데이터 정규화
    x_center = (points["xtl"] + points["xbr"]) / 2 / image_width
    y_center = (points["ytl"] + points["ybr"]) / 2 / image_height
    box_width = (points["xbr"] - points["xtl"]) / image_width
    box_height = (points["ybr"] - points["ytl"]) / image_height

    # YOLO 어노테이션 형식 리스트 반환
    return [x_center, y_center, box_width, box_height]

### Custom Dataset 폴더 구조 생성
- Yolo train 에서 지정한 데이터 폴더 구조를 생성한다.
- Yolo train 폴더 구조
  - CustomDataset <br>
  ㄴ train/<br>
  &nbsp;&nbsp;&nbsp; 1.jpg(이미지 파일)<br>
  &nbsp;&nbsp;&nbsp; 1.txt(어노테이션 파일)<br>
  ㄴ valid/<br>
  &nbsp;&nbsp;&nbsp; 2.jpg(이미지 파일)<br>
  &nbsp;&nbsp;&nbsp; 2.txt(어노테이션 파일)<br>
  ㄴ test/<br>
  &nbsp;&nbsp;&nbsp; 3.jpg(이미지 파일)<br>
  &nbsp;&nbsp;&nbsp; 3.txt(어노테이션 파일)<br>
   .yaml<br>

In [None]:
import os
import shutil

def set_Folder(name, path):
    # train,valid,test 파일의 경로 저장한 리스트
    result = []
    Project_path = os.path.join(path, name)

    if os.path.exists(Project_path):
      delete_folder(Project_path)

    os.makedirs(Project_path)
    print(f"Destination folder '{Project_path}' created.")

    for folder_name in ["train", "valid", "test"]:
      new_folder_path = os.path.join(Project_path, folder_name)
      result.append(new_folder_path)

      os.makedirs(new_folder_path)
      print(f"Destination folder '{new_folder_path}' created.")

    return result

def delete_folder(path):
    try:
        shutil.rmtree(path)
        print(f"Folder '{path}' deleted successfully.")
    except OSError as e:
        print(f"Error: {path} : {e.strerror}")

### 파일 저장
- 생성한 어노테이션 정보를 .txt파일의 형태로 저장한다.
- 조건에 만족하는 이미지 파일을 특정 폴더에 복사한다.
  - store_annotation(output_path,basename,data): 생성한 yolo 데이터 어노테이션 형식을 실제 파일로 저장한다.
    - output_path 에 data를 basename으로 저장한다
  - store_images(source_path, output_path): 이미지를 특정 폴더에 복사하여 저장한다.
    - source_path에 있는 이미지파일을 output_path경로에 저장한다.
  - store_all(images, labels, folder): 이미지와 어노테이션 파일을 특정 폴더에 저장한다.
    - images_list와 labels_list에 있는 데이터를 folder에 파일로 저장한다.

In [None]:
import json
plant_code = {"02":"0","05":"1","11":"2"}

def store_annotation(output_folder,basename,data):
    # 최종 어노테이션 문자열을 저장하는 list
    result = []

    bbox_string = " ".join([str(x) for x in data])
    # 반환받은 식물 코드는 YOLO class로 변환하여 기입한다.
    result.append(f"{plant_code[get_plant_type_code(basename)]} {bbox_string}")
    if result:
      # 확장자 제거
      basename = basename.split(".")[0]
      with open(os.path.join(output_folder, f"{basename}.txt"), "w", encoding="utf-8") as f:
          f.write("\n".join(result))
      return 1
    return 0


In [None]:
# 학습에 활용할 이미지 데이터를 목적지 폴더(학습 폴더 구조)에 맞춰 이동하는 함수
def store_images(source_path, output_path):

  filename = os.path.basename(source_path)

  destination_path = os.path.join(output_path, filename)

  try:
      # Copy the file
      # shutil.copy2(source_path, destination_path)
      shutil.move(source_path, destination_path)
      return 1
  except Exception as e:
      print(f"Error occurred while copying the file: {e}")
      return 0


In [None]:
def store_all(images, labels, folder):
  labels_cnt = 0
  images_cnt = 0

  for image in images:
      cnt=store_images(image,folder)
      images_cnt+=cnt

  for label in labels:
    basename= os.path.basename(label)
    cnt = store_annotation(folder,basename,convert_to_yolo_annotation(label))
    labels_cnt += cnt

  print(labels_cnt, images_cnt)

### 구축 시 검증을 위한 함수들

In [None]:
# 파일 이름 비교 함수: 파일 확장자를 제거한 이름을 비교하여 동일한지 확인
def is_same_filename(file1, file2):
  # 파일 확장자를 제거한 파일 이름을 가져옴
    base1 = os.path.basename(file1).split('.')[0]
    base2 = os.path.basename(file2).split('.')[0]

    # 확장자를 제거한 파일 이름이 같은지 비교
    return base1 == base2

# 폴더 내의 모든 파일명을 리스트로 생성
def get_files_in_folder(folder_path):
    return [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

# 두 폴더를 비교하여 대응되는 이미지와 라벨 데이터를 확인하고,
# 매핑된 리스트와 각각 매칭되지 않은 파일 리스트를 반환
def pair_common_files(folder1, folder2):
    files1 = get_files_in_folder(folder1)
    files2 = get_files_in_folder(folder2)

    paired_list = []
    unmatched_files1 = []
    unmatched_files2 = files2[:]  # 복사하여 나중에 제거할 때 사용

    for file1 in files1:
        matched = False
        for file2 in files2:
            if is_same_filename(file1, file2): # 파일 이름 비교를 위한 함수 호출
                paired_list.append((os.path.join(folder1, file1), os.path.join(folder2, file2)))
                unmatched_files2.remove(file2)
                matched = True
                break
        if not matched:
            unmatched_files1.append(os.path.join(folder1, file1))

    unmatched_files2 = [os.path.join(folder2, file) for file in unmatched_files2]

    return paired_list, unmatched_files1, unmatched_files2

### 실행

##### 환경변수 설정

- root_path: 오리지널 데이터셋이 위치한 경로
- Dataset_name: 새로 생성할 커스텀 데이터셋의 이름을 설정합니다.
- Custom_dataset_path: 커스텀 데이터셋을 저장할 폴더의 경로를 지정합니다.
- extension_label: 라벨 데이터가 가지고 있는 확장자를 모아둔 리스트입니다.
- extension_image: 이미지 데이터가 가지고 있는 확장자를 모아둔 리스트입니다.
- area_code : 어떤 식물 부위를 사용할지 지정하는 변수입니다.

In [None]:
root_path = "/content/01.데이터/"
Dataset_name = "YOLO_CustomDataSet_V2"
Custom_dataset_path = "/content/"
extension_label = ["json","txt"]
extension_image = ["jpg","JPG","png","PNG"]
area_code = "03"

In [None]:
# 모든 zip 파일 압축해제
# 여러번 압축해제하는 실수를 방지
if !init_unzip_flag:
  unzip_files_in_directory(root_path)
  init_unzip_flag = True

Unzipped /content/drive/MyDrive/Colab Notebooks/Project/Chungbuk University/Capstone Design/AI_Model/PlantDiseaseDetection/data/raw/01.데이터/1.Training/라벨링데이터/2.고추/0.정상.zip to /content/drive/MyDrive/Colab Notebooks/Project/Chungbuk University/Capstone Design/AI_Model/PlantDiseaseDetection/data/raw/01.데이터/1.Training/라벨링데이터/2.고추/0.정상
Unzipped /content/drive/MyDrive/Colab Notebooks/Project/Chungbuk University/Capstone Design/AI_Model/PlantDiseaseDetection/data/raw/01.데이터/1.Training/라벨링데이터/2.고추/1.질병.zip to /content/drive/MyDrive/Colab Notebooks/Project/Chungbuk University/Capstone Design/AI_Model/PlantDiseaseDetection/data/raw/01.데이터/1.Training/라벨링데이터/2.고추/1.질병
Unzipped /content/drive/MyDrive/Colab Notebooks/Project/Chungbuk University/Capstone Design/AI_Model/PlantDiseaseDetection/data/raw/01.데이터/1.Training/라벨링데이터/5.상추/0.정상.zip to /content/drive/MyDrive/Colab Notebooks/Project/Chungbuk University/Capstone 

#### 이미지 및 라벨 폴더 경로 리스트 생성 및 검증

In [None]:
# 이미지 및 라벨 폴더 경로 리스트
image_folder_paths = []
label_folder_paths = []

image_folder_paths += find_specific_dirs(root_path, ["원천데이터"], ["0.정상","1.질병"])
label_folder_paths += find_specific_dirs(root_path, ["라벨링데이터"], ["0.정상","1.질병"])

# 특정 디렉토리를 찾는 함수 호출
for i in range(len(image_folder_paths)):
  print(image_folder_paths[i])
  print(label_folder_paths[i])

In [None]:
# 각 이미지 폴더와 라벨 폴더를 비교하여 대응되는 파일을 찾고 결과 리스트에 추가
import os
result_paired_list = []

#
for index, image_folder_path in enumerate(image_folder_paths):
  paired_list, unmatched_files1, unmatched_files2 = pair_common_files(
      image_folder_path,
      label_folder_paths[index]
      )
  result_paired_list += paired_list
  print("==========================================")
  print("이미지 폴더 경로:", image_folder_path)
  print("라벨링 폴더 경로:", label_folder_paths[index])
  print("매칭되지 않은 이미지 파일 개수:", len(unmatched_files1))
  print("매칭되지 않은 라벨 파일 개수:", len(unmatched_files2))


In [None]:
# 매핑된 리스트를 이미지 리스트와 라벨 리스트로 분리
def unzip_tuples(paired_list):
    list1 = [item[0] for item in paired_list]
    list2 = [item[1] for item in paired_list]
    return list1, list2

all_images, all_labels = unzip_tuples(result_paired_list)
print("매칭된 이미지 파일 개수:", len(all_images))
print("매칭된 라벨 파일 개수:", len(all_labels))

#### 필요한 식물 부위 데이터만 선별
  - 식물 부위 코드 비교를 통해 데이터를 선별

In [None]:
select_labels_path_list = list()
select_images_path_list = list()

for i in all_images:
    basename= os.path.basename(i)
    if is_area(basename,area_code):
      select_images_path_list.append(i)

for i in all_labels:
    basename= os.path.basename(i)
    if is_area(basename,area_code):
      select_labels_path_list.append(i)

##### 선별된 데이터들을 검증

In [None]:
error = 0
true= 0
for index, value in enumerate(select_images_path_list):
  if is_same_filename(value,select_labels_path_list[index]):
    true +=1
  else:
    error += 1
print(true,error)

#### 선별된 데이터를 커스텀 데이터셋 폴더 구조에 맞게 저장

In [None]:
# 훈련 데이터 95%, 검증데이터 3%, 테스트 데이터 2%
split_index1 = int(len(select_images_path_list) * 0.95)
split_index2 = int(len(select_images_path_list) * 0.98)
train_images = select_images_path_list[:split_index1]
train_labels = select_labels_path_list[:split_index1]
valid_images = select_images_path_list[split_index1:split_index2]
valid_labels = select_labels_path_list[split_index1:split_index2]
test_images = select_images_path_list[split_index2:]
test_labels = select_labels_path_list[split_index2:]

# 커스텀 데이터셋을 관리할 폴더 구조를 생성한다.
folder_list = set_Folder(Dataset_name, Custom_dataset_path)

# folder_list = [train경로, valid경로, test경로]
for index, folder in enumerate(folder_list):
  if index == 0: # train
    store_all(train_images,train_labels,folder)
  if index == 1: # vaild
    store_all(valid_images,valid_labels,folder)
  if index == 2: # test
    store_all(test_images,test_labels,folder)

Folder '/content/YOLO_CustomDataSet_V2' deleted successfully.
Destination folder '/content/YOLO_CustomDataSet_V2' created.
Destination folder '/content/YOLO_CustomDataSet_V2/train' created.
Destination folder '/content/YOLO_CustomDataSet_V2/valid' created.
Destination folder '/content/YOLO_CustomDataSet_V2/test' created.
114 114
3 3
3 3
