## GCP 버킷에서 데이터 다운받기

### 버킷과 같은 GCE 인스턴스에서 실행 중이 아닐경우
- Google Cloud SDK를 활용한 인증 필요

In [1]:
!pip install -q google-cloud-storage
!pip install -q google-auth google-auth-oauthlib google-auth-httplib2
!pip install -q google-colab

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from google.colab import auth
auth.authenticate_user()

#### 버킷 내의 특정 폴더만 다운로드

In [3]:
def download_bucket_folder_with_transfer_manager(
    bucket_name, folder_name, destination_directory="", workers=8, max_results=1000
):
    """Download blobs from a specific folder in a bucket using transfer manager."""
    from google.cloud.storage import Client, transfer_manager

    storage_client = Client()
    bucket = storage_client.bucket(bucket_name)

    # Get blobs with the given folder prefix
    blob_names = [blob.name for blob in bucket.list_blobs(prefix=folder_name, max_results=max_results)]

    results = transfer_manager.download_many_to_path(
        bucket, blob_names, destination_directory=destination_directory, max_workers=workers
    )

    for name, result in zip(blob_names, results):
        if isinstance(result, Exception):
            print("Failed to download {} due to exception: {}".format(name, result))
        else:
            print("Downloaded {} to {}.".format(name, destination_directory + name))

if __name__ == "__main__":
    bucket_name = "버킷 이"
    folder_name = "01.데이터(1000)/"
    destination_directory = "/content/"

    download_bucket_folder_with_transfer_manager(
        bucket_name, folder_name, destination_directory
    )




Downloaded 01.데이터(1000)/1.Training/라벨링데이터/02.고추/0.정상.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/0.정상.zip.
Downloaded 01.데이터(1000)/1.Training/라벨링데이터/02.고추/1.질병.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/1.질병.zip.
Downloaded 01.데이터(1000)/1.Training/라벨링데이터/05.상추/0.정상.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/0.정상.zip.
Downloaded 01.데이터(1000)/1.Training/라벨링데이터/05.상추/1.질병.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/1.질병.zip.
Downloaded 01.데이터(1000)/1.Training/라벨링데이터/11.토마토/0.정상.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/0.정상.zip.
Downloaded 01.데이터(1000)/1.Training/라벨링데이터/11.토마토/1.질병.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/1.질병.zip.
Downloaded 01.데이터(1000)/1.Training/원천데이터/02.고추/0.정상.zip to /content/01.데이터(1000)/1.Training/원천데이터/02.고추/0.정상.zip.
Downloaded 01.데이터(1000)/1.Training/원천데이터/02.고추/1.질병.zip to /content/01.데이터(1000)/1.Training/원천데이터/02.고추/1.질병.zip.
Downloaded 01.데이터(1000)/1.Training/원천데이터/05.상추/0.정상.zip to /content/01.데

## 학습 폴더 구축

### os 관련 함수
- zip 파일 해제 및 삭제
- 특정 단어가 포함된 폴더 경로 찾기 함수
- 학습 폴더 구조 생성
- 폴더 내 파일 목록 생성
- 이미지 및 라벨 데이터를 학습 폴더로 복사
- 특정 파일 복사


In [5]:
# zip 파일 해제
import os
import zipfile

def unzip_files_in_directory(directory):
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".zip"):

                zip_file_path = os.path.join(root, file)
                extract_dir = os.path.splitext(zip_file_path)[0]  # .zip 확장자 제거

                # 디렉토리가 존재하지 않으면 생성
                if not os.path.exists(extract_dir):
                    os.makedirs(extract_dir)

                # ZIP 파일 해제
                with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
                    zip_ref.extractall(extract_dir)
                print(f"Unzipped {zip_file_path} to {extract_dir}")

                # ZIP 파일 삭제
                os.remove(zip_file_path)
                print(f"Deleted {zip_file_path}")

In [6]:
# 특정 단어가 포함된 폴더 경로 찾기
import os

def find_specific_dirs(root_directory, required_sub_folders, optional_sub_folders):
    matched_dirs = []

    for dirpath, dirnames, filenames in os.walk(root_directory):
        if all(sub_folder in dirpath for sub_folder in required_sub_folders) and \
                any(sub_folder in dirpath for sub_folder in optional_sub_folders):
                full_path = os.path.join(dirpath)
                matched_dirs.append(full_path)

    return matched_dirs

In [7]:
# 학습 데이터셋 폴더 구조 생
import os
import shutil

def set_Folder(name, path):
    # train,valid,test 파일의 경로 저장한 리스트
    result = []
    Project_path = os.path.join(path, name)

    if os.path.exists(Project_path):
      delete_folder(Project_path)

    os.makedirs(Project_path)
    print(f"Destination folder '{Project_path}' created.")

    for folder_name in ["train", "valid", "test"]:
      new_folder_path = os.path.join(Project_path, folder_name)
      result.append(new_folder_path)

      os.makedirs(new_folder_path)
      print(f"Destination folder '{new_folder_path}' created.")

    return result

def delete_folder(path):
    try:
        shutil.rmtree(path)
        print(f"Folder '{path}' deleted successfully.")
    except OSError as e:
        print(f"Error: {path} : {e.strerror}")

In [8]:
# 학습에 활용할 데이터를 목적지 폴더(학습 폴더 구조)에 맞춰 이동하는 함수
def copy_file(source_path, output_path):

  filename = os.path.basename(source_path)

  destination_path = os.path.join(output_path, filename)

  try:
      # Copy the file
      shutil.copy2(source_path, destination_path)
      # shutil.move(source_path, destination_path)
      return 1
  except Exception as e:
      print(f"Error occurred while copying the file: {e}")
      return 0


In [9]:
# 이미지와 라벨데이터를 학습 폴더에 복사
def store_all(images, labels, folder):
  labels_cnt = 0
  images_cnt = 0

  for image in images:
      cnt=copy_file(image,folder)
      images_cnt+=cnt

  for label in labels:
      cnt=copy_file(label,folder)
      labels_cnt += cnt

  print(labels_cnt, images_cnt)

In [10]:
# 폴더 내의 모든 파일명을 리스트로 생성
def get_files_in_folder(folder_path):
    return [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

### 데이터 분류 및 검증 함수
- 데이터 분류
  - 식물 종류 코드 반환
  - 식물 부위 코드 반환
  - 식물 부위 확인
- 데이터 검증
  - 이미지와 라벨 데이터 비교
  - 폴더 내 이미지와 라벨 파일 이름 일치 확인

In [21]:
import re

# 식물 부위 확인 함수
def is_area(filename,area_code):
    # '_'를 기준으로 분할하고 6번째 요소(식물 부위 코드) 확인 -> 해당 요소가 객체의 위치를 나타난다.
    if get_plant_area_code(filename) == area_code: # 03번은 잎이다.
        return True
    else:
        return False

# 식물 종류 코드 반환 함수
def get_plant_type_code(filename):
    # '_'를 기준으로 분할하고 5번째 요소 식물 종류 코드 반환 -> Class name 설정을 위해
    token = filename.split('_')
    if len(token) >= 7:
        return token[4]
    else:
        return False


# 식물 부위 코드 반환 함수
def get_plant_area_code(filename):
    # '_'를 기준으로 분할하고 6번째 요소 식물 부위 코드 반환
    token = filename.split('_')
    if len(token) >= 7:
        return token[5]
    else:
        return False

In [22]:
# 파일 이름 비교 함수: 파일 확장자를 제거한 이름을 비교하여 동일한지 확인
def is_same_filename(file1, file2):
  # 파일 확장자를 제거한 파일 이름을 가져옴
    base1 = os.path.basename(file1).split('.')[0]
    base2 = os.path.basename(file2).split('.')[0]

    # 확장자를 제거한 파일 이름이 같은지 비교
    return base1 == base2


# 두 폴더를 비교하여 대응되는 이미지와 라벨 데이터를 확인하고,
# 매핑된 리스트와 각각 매칭되지 않은 파일 리스트를 반환
def pair_common_files(folder1, folder2):
    files1 = get_files_in_folder(folder1)
    files2 = get_files_in_folder(folder2)

    paired_list = []
    unmatched_files1 = []
    unmatched_files2 = files2[:]  # 복사하여 나중에 제거할 때 사용

    for file1 in files1:
        matched = False
        for file2 in files2:
            if is_same_filename(file1, file2): # 파일 이름 비교를 위한 함수 호출
                paired_list.append((os.path.join(folder1, file1), os.path.join(folder2, file2)))
                unmatched_files2.remove(file2)
                matched = True
                break
        if not matched:
            unmatched_files1.append(os.path.join(folder1, file1))

    unmatched_files2 = [os.path.join(folder2, file) for file in unmatched_files2]

    return paired_list, unmatched_files1, unmatched_files2

### 실행

In [13]:
# 압축해제
unzip_files_in_directory(root_path)

Unzipped /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/0.정상.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/0.정상
Deleted /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/0.정상.zip
Unzipped /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/1.질병.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/1.질병
Deleted /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/1.질병.zip
Unzipped /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/0.정상.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/0.정상
Deleted /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/0.정상.zip
Unzipped /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/1.질병.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/1.질병
Deleted /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/1.질병.zip
Unzipped /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/0.정상.zip to /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/0.정상
Deleted /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/0.정상.zip
Unzipped /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/1.질병.zip to /content/01.데이터(1000)/

In [23]:
root_path = "/content/01.데이터(1000)"
Dataset_name = "CustomDataSet"
Custom_dataset_path = "/content/"
extension_label = ["json","txt"]
extension_image = ["jpg","JPG","png","PNG"]
area_code = "03"

In [24]:
# 이미지 및 라벨 폴더 경로 리스트
train_image_paths = []
train_label_paths = []
validate_image_paths = []
validate_label_paths = []

train_image_paths += find_specific_dirs(root_path, ["1.Training","원천데이터"], ["0.정상","1.질병"])
train_label_paths += find_specific_dirs(root_path, ["1.Training","라벨링데이터"], ["0.정상","1.질병"])
validate_image_paths += find_specific_dirs(root_path, ["2.Validation","원천데이터"], ["0.정상","1.질병"])
validate_label_paths += find_specific_dirs(root_path, ["2.Validation","라벨링데이터"], ["0.정상","1.질병"])

# 특정 디렉토리를 찾는 함수 호출
for i in range(len(train_image_paths)):
  # print(train_image_paths[i])
  # print(train_label_paths[i])
  print(validate_image_paths[i])
  print(validate_label_paths[i])

/content/01.데이터(1000)/2.Validation/원천데이터/11.토마토/1.질병
/content/01.데이터(1000)/2.Validation/라벨링데이터/11.토마토/1.질병
/content/01.데이터(1000)/2.Validation/원천데이터/11.토마토/0.정상
/content/01.데이터(1000)/2.Validation/라벨링데이터/11.토마토/0.정상
/content/01.데이터(1000)/2.Validation/원천데이터/02.고추/1.질병
/content/01.데이터(1000)/2.Validation/라벨링데이터/02.고추/1.질병
/content/01.데이터(1000)/2.Validation/원천데이터/02.고추/0.정상
/content/01.데이터(1000)/2.Validation/라벨링데이터/02.고추/0.정상
/content/01.데이터(1000)/2.Validation/원천데이터/05.상추/1.질병
/content/01.데이터(1000)/2.Validation/라벨링데이터/05.상추/1.질병
/content/01.데이터(1000)/2.Validation/원천데이터/05.상추/0.정상
/content/01.데이터(1000)/2.Validation/라벨링데이터/05.상추/0.정상


In [25]:
# 각 이미지 폴더와 라벨 폴더를 비교하여 대응되는 파일을 찾고 결과 리스트에 추가
import os
train_paired_list = []
validate_paired_list = []

for index, train_image_path in enumerate(train_image_paths):
  paired_list, unmatched_files1, unmatched_files2 = pair_common_files(
      train_image_path,
      train_label_paths[index]
      )
  train_paired_list += paired_list
  print("==========================================")
  print("이미지 폴더 경로:", train_image_paths[index])
  print("라벨링 폴더 경로:", train_label_paths[index])
  print("매칭되지 않은 이미지 파일 개수:", len(unmatched_files1))
  print("매칭되지 않은 라벨 파일 개수:", len(unmatched_files2))


for index, validate_image_path in enumerate(validate_image_paths):
  paired_list, unmatched_files1, unmatched_files2 = pair_common_files(
      validate_image_path,
      validate_label_paths[index]
      )
  validate_paired_list += paired_list
  print("==========================================")
  print("이미지 폴더 경로:", validate_image_paths[index])
  print("라벨링 폴더 경로:", validate_label_paths[index])
  print("매칭되지 않은 이미지 파일 개수:", len(unmatched_files1))
  print("매칭되지 않은 라벨 파일 개수:", len(unmatched_files2))


이미지 폴더 경로: /content/01.데이터(1000)/1.Training/원천데이터/11.토마토/1.질병
라벨링 폴더 경로: /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/1.질병
매칭되지 않은 이미지 파일 개수: 0
매칭되지 않은 라벨 파일 개수: 0
이미지 폴더 경로: /content/01.데이터(1000)/1.Training/원천데이터/11.토마토/0.정상
라벨링 폴더 경로: /content/01.데이터(1000)/1.Training/라벨링데이터/11.토마토/0.정상
매칭되지 않은 이미지 파일 개수: 0
매칭되지 않은 라벨 파일 개수: 0
이미지 폴더 경로: /content/01.데이터(1000)/1.Training/원천데이터/02.고추/1.질병
라벨링 폴더 경로: /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/1.질병
매칭되지 않은 이미지 파일 개수: 0
매칭되지 않은 라벨 파일 개수: 0
이미지 폴더 경로: /content/01.데이터(1000)/1.Training/원천데이터/02.고추/0.정상
라벨링 폴더 경로: /content/01.데이터(1000)/1.Training/라벨링데이터/02.고추/0.정상
매칭되지 않은 이미지 파일 개수: 0
매칭되지 않은 라벨 파일 개수: 0
이미지 폴더 경로: /content/01.데이터(1000)/1.Training/원천데이터/05.상추/1.질병
라벨링 폴더 경로: /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/1.질병
매칭되지 않은 이미지 파일 개수: 0
매칭되지 않은 라벨 파일 개수: 0
이미지 폴더 경로: /content/01.데이터(1000)/1.Training/원천데이터/05.상추/0.정상
라벨링 폴더 경로: /content/01.데이터(1000)/1.Training/라벨링데이터/05.상추/0.정상
매칭되지 않은 이미지 파일 개수: 0
매칭되지 않은 라벨 파일 개수: 0
이미지 폴더 경로: /

In [26]:
# 매핑된 리스트를 이미지 리스트와 라벨 리스트로 분리
def unzip_tuples(paired_list):
    list1 = [item[0] for item in paired_list]
    list2 = [item[1] for item in paired_list]
    return list1, list2

all_train_images, all_train_labels = unzip_tuples(train_paired_list)
print("train 매칭된 이미지 파일 개수:", len(all_train_images))
print("train 매칭된 라벨 파일 개수:", len(all_train_labels))


all_validate_images, all_validate_labels = unzip_tuples(validate_paired_list)
print("validate 매칭된 이미지 파일 개수:", len(all_validate_images))
print("validate 매칭된 라벨 파일 개수:", len(all_validate_labels))

train 매칭된 이미지 파일 개수: 6000
train 매칭된 라벨 파일 개수: 6000
validate 매칭된 이미지 파일 개수: 2791
validate 매칭된 라벨 파일 개수: 2791


In [27]:
select_train_labels_path_list = list()
select_train_images_path_list = list()
select_validate_labels_path_list = list()
select_validate_images_path_list = list()

for i in all_train_images:
    basename= os.path.basename(i)
    if is_area(basename,area_code):
      select_train_images_path_list.append(i)

for i in all_train_labels:
    basename= os.path.basename(i)
    if is_area(basename,area_code):
      select_train_labels_path_list.append(i)

for i in all_validate_images:
    basename= os.path.basename(i)
    if is_area(basename,area_code):
      select_validate_images_path_list.append(i)

for i in all_validate_labels:
    basename= os.path.basename(i)
    if is_area(basename,area_code):
      select_validate_labels_path_list.append(i)

In [28]:
error = 0
true= 0
for index, value in enumerate(select_train_images_path_list):
  if is_same_filename(value,select_train_labels_path_list[index]):
    true +=1
  else:
    error += 1
print("train 데이터:",true,error)

error = 0
true= 0
for index, value in enumerate(select_validate_images_path_list):
  if is_same_filename(value,select_validate_labels_path_list[index]):
    true +=1
  else:
    error += 1
print("validate 데이터:",true,error)

train 데이터: 3998 0
validate 데이터: 2561 0


In [29]:
train_images = select_train_images_path_list
train_labels = select_train_labels_path_list
valid_images = select_validate_images_path_list
valid_labels = select_validate_labels_path_list

# 커스텀 데이터셋을 관리할 폴더 구조를 생성한다.
folder_list = set_Folder(Dataset_name, Custom_dataset_path)

# folder_list = [train경로, valid경로, test경로]
for index, folder in enumerate(folder_list):
  if index == 0: # train
    store_all(train_images,train_labels,folder)
  if index == 1: # vaild
    store_all(valid_images,valid_labels,folder)
  # if index == 2: # test
  #   store_all(test_images,test_labels,folder)

Folder '/content/CustomDataSet' deleted successfully.
Destination folder '/content/CustomDataSet' created.
Destination folder '/content/CustomDataSet/train' created.
Destination folder '/content/CustomDataSet/valid' created.
Destination folder '/content/CustomDataSet/test' created.
3998 3998
2561 2561
