<a href="https://colab.research.google.com/github/SEOUL-ABSS/SHIPSHIP/blob/main/SONAR3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1. 환경 설정 및 필수 라이브러리 임포트

프로젝트에 필요한 라이브러리를 설치/임포트하고, 전역 상수 및 기본 설정을 정의합니다.

In [6]:
# Install tensorflow
!pip install -q tensorflow tensorflow-hub soundfile librosa

# ==============================================================================
# 1. 환경 설정 및 필수 라이브러리 임포트
# ==============================================================================
print("1. 환경 설정 및 라이브러리 임포트 중...")

# 필요한 라이브러리 임포트
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
import librosa
import soundfile as sf # WAV 파일 처리용
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.font_manager as fm # 폰트 관리자 임포트
import requests # MBARI 데이터 다운로드용
import subprocess # 오프라인 환경 패키지 다운로드용 (선택 사항)
# from tensorflow_addons.optimizers import RectifiedAdam # 예시: TFA 옵티마이저 사용 시

print("라이브러리 임포트 완료.")

# ==============================================================================
# Matplotlib 한글 폰트 설정 (오류 수정 및 안정화)
# ==============================================================================
print("\nMatplotlib 한글 폰트 설정 중...")

# Colab 환경에서 나눔고딕 폰트 설치 및 설정 시도
# 설치 오류 방지를 위해 출력을 숨깁니다.
!sudo apt-get update > /dev/null # Ensure apt cache is updated
!sudo apt-get install -y fonts-nanum > /dev/null
!sudo fc-cache -fv > /dev_null

# 폰트 관리자 캐시 재로드 및 폰트 설정
try:
    # 나눔고딕 폰트 파일 경로 (Colab에 일반적으로 설치되는 위치)
    font_path = '/usr/share/fonts/truetype/nanum/NanumGothic.ttf'
    if os.path.exists(font_path):
        fm.fontManager.addfont(font_path)
        plt.rc('font', family='NanumGothic') # 나눔고딕으로 설정
        plt.rcParams['axes.unicode_minus'] = False # 마이너스 기호 깨짐 방지
        print("Matplotlib 폰트 설정 완료: NanumGothic")
    else:
        print(f"경고: 폰트 파일 '{font_path}'를 찾을 수 없습니다. 기본 폰트를 사용합니다.")
except Exception as e:
    print(f"Matplotlib 폰트 설정 중 오류 발생: {e}. 기본 폰트를 사용합니다.")

print("Matplotlib 폰트 설정 완료.")

# ==============================================================================
# 전역 상수 정의
# ==============================================================================
print("\n전역 상수 정의 중...")
YAMNET_SAMPLE_RATE = 16000
YAMNET_EMBEDDING_DIM = 1024 # YAMNet embedding dimension

# VGGish embedding dimension (usually 128)
# Based on TF Hub documentation for vggish/1, the output is 128-dimensional.
VGGISH_EMBEDDING_DIM = 128

# PANNs embedding dimension (using YAMNet placeholder for now)
# If using a real PANNs model, this dimension would need to be updated based on the model's output.
PANNS_EMBEDDING_DIM = YAMNET_EMBEDDING_DIM # Placeholder dimension, assuming similar output shape to YAMNet placeholder

# Dataset Paths
DEEPSHIP_BASE_PATH = '/content/DeepShip'
# Directory where MBARI Noise Data will be stored.
# Assuming this directory will contain .wav files acting as 'noise'.
MBARI_NOISE_BASE_DIR = '/content/MBARI_noise_data'

# List of DeepShip ship classes
DEEPSHIP_CLASSES = ['Cargo', 'Passengership', 'Tanker', 'Tug']

# List of models to process for comparison
MODELS_TO_PROCESS = ['YAMNet', 'PANNs', 'VGGish'] # Use 'PANNs' as the key for the placeholder

print("전역 상수 정의 완료.")

import os

# Assuming DEEPSHIP_BASE_PATH and MBARI_NOISE_BASE_DIR are defined

print("\nDeepShip 데이터 디렉토리 내용 확인:")
if os.path.exists(DEEPSHIP_BASE_PATH):
    print(f"'{DEEPSHIP_BASE_PATH}' 내용:")
    # List contents of the DeepShip base directory, focusing on the expected class folders
    expected_deepship_subdirs = DEEPSHIP_CLASSES # Use the global constant
    found_content = False
    for item in os.listdir(DEEPSHIP_BASE_PATH):
        item_path = os.path.join(DEEPSHIP_BASE_PATH, item)
        if os.path.isdir(item_path):
            print(f"  {item}/")
            # If it's an expected class directory, list some of its contents
            if item in expected_deepship_subdirs:
                 try:
                     files_in_class_dir = os.listdir(item_path)
                     print(f"    ({len(files_in_class_dir)} items)")
                     for f in files_in_class_dir[:5]: # List up to 5 files
                         print(f"      {f}")
                     if len(files_in_class_dir) > 5:
                         print("      ...")
                     found_content = True
                 except Exception as e:
                      print(f"    오류: 디렉토리 내용 확인 중 오류 발생: {e}")
            else:
                 # List contents of unexpected subdirectories briefly
                 try:
                      sub_items = os.listdir(item_path)
                      print(f"    ({len(sub_items)} items)")
                      for f in sub_items[:3]: # List up to 3 items in other subdirs
                          print(f"      {f}")
                      if len(sub_items) > 3:
                          print("      ...")
                 except Exception as e:
                      print(f"    오류: 디렉토리 내용 확인 중 오류 발생: {e}")

        elif os.path.isfile(item_path):
            print(f"  {item}")
            found_content = True

    if not found_content:
        print("  (디렉토리가 비어 있습니다)")

else:
    print(f"경고: DeepShip Base Path '{DEEPSHIP_BASE_PATH}'를 찾을 수 없습니다.")


print("\nMBARI 노이즈 데이터 디렉토리 내용 확인:")
if os.path.exists(MBARI_NOISE_BASE_DIR):
    print(f"'{MBARI_NOISE_BASE_DIR}' 내용:")
    found_noise_content = False
    for root, dirs, files in os.walk(MBARI_NOISE_BASE_DIR):
        level = root.replace(MBARI_NOISE_BASE_DIR, '').count(os.sep)
        indent = '  ' * level
        print(f'{indent}{os.path.basename(root)}/')
        subindent = '  ' * (level + 1)
        if dirs:
             for d in dirs[:5]: # List up to 5 subdirectories
                  print(f'{subindent}{d}/')
             if len(dirs) > 5:
                  print(f'{subindent}...')

        if files:
             print(f'{subindent}파일들 ({len(files)}개):')
             for f in files[:5]: # List up to 5 files
                 print(f'{subindent}{f}')
                 if f.endswith('.wav'):
                      found_noise_content = True # Found at least one wav file
             if len(files) > 5:
                  print(f'{subindent}...')
        if not dirs and not files:
             print(f'{subindent}(비어 있음)')


    if not found_noise_content:
        print("\n경고: MBARI 노이즈 데이터 디렉토리에서 .wav 파일을 찾지 못했습니다.")

else:
    print(f"경고: MBARI Noise Base Directory '{MBARI_NOISE_BASE_DIR}'를 찾을 수 없습니다.")

print("\n데이터 디렉토리 내용 확인 완료.")

# Install boto3 for S3 access
!pip install -q boto3
print("boto3 설치 완료.")

1. 환경 설정 및 라이브러리 임포트 중...
라이브러리 임포트 완료.

Matplotlib 한글 폰트 설정 중...
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Matplotlib 폰트 설정 완료: NanumGothic
Matplotlib 폰트 설정 완료.

전역 상수 정의 중...
전역 상수 정의 완료.

DeepShip 데이터 디렉토리 내용 확인:
'/content/DeepShip' 내용:
  Tug/
    (4 items)
      40.wav
      9.wav
      49.wav
      tug-metafile
  Passengership/
    (21 items)
      27.wav
      23.wav
      1.wav
      50.wav
      17.wav
      ...
  Cargo/
    (13 items)
      27.wav
      78.wav
      110.wav
      cargo-metafile
      38.wav
      ...
  Tanker/
    (29 items)
      19.wav
      24.wav
      50.wav
      43.wav
      47.wav
      ...
  .git/
    (12 items)
      logs
      refs
      objects
      ...
  README.txt

MBARI 노이즈 데이터 디렉토리 내용 확인:
'/content/MBARI_noise_data' 내용:
MBARI_noise_data/
  파일들 (12개):
  MARS-20180109T000000Z-16kHz.wav
  MARS-20180110

## 2. 데이터 확보: DeepShip 클론 및 노이즈 데이터 준비

DeepShip 데이터셋을 클론하고, MBARI 노이즈 데이터 디렉토리를 준비합니다. 이전에 다운로드된 MBARI 노이즈 샘플 파일이 있다면 해당 디렉토리로 이동시킵니다.

**주의**: 실제 MBARI Pacific Sound 16kHz 데이터셋은 직접 다운로드 또는 접근 설정이 필요할 수 있습니다. 아래 코드는 DeepShip 클론 및 샘플 노이즈 파일 처리를 위한 예시입니다.

In [7]:
# ==============================================================================
# 2. 데이터 확보: DeepShip 클론 및 노이즈 데이터 준비 (MBARI 다운로드 포함)
# ==============================================================================
import boto3 # Ensure boto3 is imported
from botocore import UNSIGNED # Ensure UNSIGNED config is imported
from botocore.client import Config # Ensure Config is imported
from pathlib import Path # Ensure Path is imported
import io # Ensure io is imported for potential in-memory reads (though we're downloading)
from six.moves.urllib.request import urlopen # Ensure urlopen is imported if still needed (less likely with direct S3 download)
import os # Ensure os is imported
import subprocess # Ensure subprocess is imported

print("\n2. 데이터 확보: DeepShip 클론 및 노이즈 데이터 준비 중...")

# Check if DeepShip is already cloned
if not os.path.exists(DEEPSHIP_BASE_PATH):
    print(f"DeepShip 데이터셋 클론 중: {DEEPSHIP_BASE_PATH}")
    # Clone the DeepShip repository
    # Use --depth 1 to clone only the latest commit, saving time and space
    try:
        subprocess.run(['git', 'clone', '--depth', '1', 'https://github.com/irfankamboh/DeepShip.git', DEEPSHIP_BASE_PATH], check=True, capture_output=True)
        print("DeepShip 데이터셋 클론 완료.")
    except subprocess.CalledProcessError as e:
        print(f"오류: DeepShip 데이터셋 클론 실패: {e.stderr.decode()}")
        print("수동으로 https://github.com/irfankamboh/DeepShip.git 를 클론하거나 다운로드하여")
        print(f"'{DEEPSHIP_BASE_PATH}' 경로에 위치시켜주세요.")
    except Exception as e:
         print(f"오류: DeepShip 데이터셋 클론 중 예기치 않은 오류 발생: {e}")
else:
    print(f"DeepShip 데이터셋이 이미 존재합니다: {DEEPSHIP_BASE_PATH}")

# Ensure the MBARI noise base directory exists
os.makedirs(MBARI_NOISE_BASE_DIR, exist_ok=True)
print(f"MBARI 노이즈 데이터 디렉토리 확인/생성 완료: {MBARI_NOISE_BASE_DIR}")

# --- Check if MBARI Noise Data already exists ---
# Count the number of .wav files in the MBARI_NOISE_BASE_DIR
existing_noise_files = [f for f in os.listdir(MBARI_NOISE_BASE_DIR) if f.endswith('.wav')]

if existing_noise_files:
    print(f"\nMBARI 노이즈 데이터가 지정된 디렉토리('{MBARI_NOISE_BASE_DIR}')에 이미 존재합니다. 다운로드를 건너뜁니다. (발견된 .wav 파일 수: {len(existing_noise_files)})")
else:
    # --- MBARI Noise Data Download ---
    # Use Boto3 to access the public S3 bucket and download a limited number of files
    # Based on the provided documentation example.
    s3_client = boto3.client('s3',
        aws_access_key_id='',
        aws_secret_access_key='',
        config=Config(signature_version=UNSIGNED))

    bucket = 'pacific-sound-16khz'
    # Define a prefix to narrow down the files (e.g., a specific year and month)
    # The documentation example uses '2018/01/'. Let's keep this or choose another if needed.
    prefix = '2018/01/' # Using January 2018 data as example

    # Limit the number of files to download to avoid excessive processing time and storage
    MAX_NOISE_FILES_TO_DOWNLOAD = 10 # Set the limit to 10 as requested

    print(f"\nMBARI 노이즈 데이터 다운로드 시도 중 (S3 버킷: {bucket}, Prefix: {prefix}, 최대 {MAX_NOISE_FILES_TO_DOWNLOAD} 파일):")

    try:
        # List objects in the specified bucket and prefix, potentially in pages
        paginator = s3_client.get_paginator('list_objects_v2')
        pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

        downloaded_count = 0
        found_any_objects = False # Track if any objects were found at all

        for page in pages:
            if 'Contents' in page:
                found_any_objects = True
                # print(f"  페이지에서 {len(page['Contents'])}개의 파일 발견. 다운로드 가능한 .wav 파일 탐색 중...") # Too verbose

                for obj in page['Contents']:
                    key = obj['Key']
                    # Only download .wav files and avoid directories or empty files
                    if key.endswith('.wav') and obj.get('Size', 0) > 0:
                        # Construct the local file path to save within the MBARI_NOISE_BASE_DIR
                        local_file_path = os.path.join(MBARI_NOISE_BASE_DIR, os.path.basename(key))

                        # Check if the file already exists locally to avoid re-downloading
                        if os.path.exists(local_file_path):
                            # print(f"    파일이 이미 존재합니다. 건너뜁니다: {os.path.basename(key)}") # Too verbose
                            pass # Skip if file already exists
                        else:
                            print(f"    다운로드 중: {os.path.basename(key)}...")
                            try:
                                s3_client.download_file(bucket, key, local_file_path)
                                downloaded_count += 1
                                print(f"      다운로드 완료 ({downloaded_count}/{MAX_NOISE_FILES_TO_DOWNLOAD})")
                            except Exception as download_e:
                                 print(f"    오류: 파일 다운로드 실패 ({os.path.basename(key)}): {download_e}")

                        # Stop downloading once the limit is reached
                        if downloaded_count >= MAX_NOISE_FILES_TO_DOWNLOAD:
                            print(f"\n  최대 다운로드 파일 수({MAX_NOISE_FILES_TO_DOWNLOAD})에 도달했습니다. 다운로드를 중지합니다.")
                            break # Break from the inner loop (files in this page)

                if downloaded_count >= MAX_NOISE_FILES_TO_DOWNLOAD:
                     break # Break from the outer loop (pages)


        if not found_any_objects:
            print(f"  경고: 지정된 Prefix '{prefix}'에서 파일을 찾을 수 없습니다.")
        elif downloaded_count == 0:
             # response might not be defined if no objects were found at all
             num_total_objects = 0
             try:
                  initial_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) # Check if any object exists
                  if 'Contents' in initial_response:
                       num_total_objects = len(initial_response['Contents']) # This is just the first page count if MaxKeys > 1, or just 1 if MaxKeys=1
             except Exception:
                  pass # Ignore error if listing fails

             if num_total_objects > 0:
                  print(f"\n  지정된 Prefix '{prefix}'에 파일이 존재하지만, 다운로드 가능한 .wav 파일을 찾지 못했거나 모두 건너뛰었습니다.")
             else:
                  print(f"\n  지정된 Prefix '{prefix}'에 파일을 찾을 수 없습니다.")


        print(f"\n총 {downloaded_count}개의 노이즈 .wav 파일을 다운로드했습니다.")

    except Exception as e:
        print(f"오류: MBARI 노이즈 데이터 다운로드 중 오류 발생: {e}")
        print("S3 버킷 접근 권한, Prefix 설정, 또는 Boto3 설정/설치를 확인해주세요.")


# Note: To get sufficient noise data for meaningful training, you may need to adjust the 'prefix'
# or 'MAX_NOISE_FILES_TO_DOWNLOAD', or implement more complex logic to gather data from multiple
# prefixes/months, depending on your data needs and the S3 bucket structure.
# Ensure that the downloaded data includes enough samples from the 'noise' category.


print("\n2. 데이터 확보 단계 완료.")


2. 데이터 확보: DeepShip 클론 및 노이즈 데이터 준비 중...
DeepShip 데이터셋이 이미 존재합니다: /content/DeepShip
MBARI 노이즈 데이터 디렉토리 확인/생성 완료: /content/MBARI_noise_data

MBARI 노이즈 데이터가 지정된 디렉토리('/content/MBARI_noise_data')에 이미 존재합니다. 다운로드를 건너뜁니다. (발견된 .wav 파일 수: 12)

2. 데이터 확보 단계 완료.


## 3. 데이터 로드 및 준비 함수 정의

DeepShip 데이터와 노이즈 데이터를 수집하고, 'ship' 및 'noise' 레이블을 할당하며, 훈련 및 테스트 세트로 분할하는 함수를 정의합니다.

In [8]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Assuming DEEPSHIP_BASE_PATH, MBARI_NOISE_BASE_DIR, DEEPSHIP_CLASSES are defined in previous cells.

def load_and_prepare_dataset(deepship_path, noise_data_dir, deepship_classes, test_size=0.2, random_state=42):
    """
    DeepShip 데이터셋과 노이즈 데이터를 로드하고 이진 분류(ship vs noise)를 위해 준비합니다.

    Args:
        deepship_path (str): DeepShip 데이터셋의 기본 경로.
        noise_data_dir (str): 노이즈 오디오 파일이 있는 디렉토리 경로.
        deepship_classes (list): DeepShip 데이터셋의 선박 클래스 이름 목록.
        test_size (float): 테스트 세트의 비율.
        random_state (int): 데이터 분할을 위한 랜덤 시드.

    Returns:
        tuple: (X_train_paths, X_test_paths, y_train_encoded, y_test_encoded, label_encoder, is_data_prepared, num_classes, noise_audio_paths)
               데이터 로드 및 준비 상태에 따라 결과가 달라질 수 있습니다.
               데이터 부족 시 X_train/test_paths, y_train/test_encoded는 빈 리스트/NumPy 배열이 됩니다.
               noise_audio_paths는 수집된 노이즈 파일 경로 목록입니다.
    """
    all_audio_paths = []
    all_labels = []
    noise_audio_paths = [] # List to store noise file paths for augmentation
    is_data_prepared = False
    label_encoder = None
    num_classes = 0

    print("\n데이터셋 로드 및 준비 시작: Ship vs Noise")
    print(f"DeepShip Base Path: {deepship_path}")
    print(f"MBARI Noise Data Directory: {noise_data_dir}")

    # --- 1. Integrate DeepShip Data ('ship') ---
    is_deepship_available = os.path.exists(deepship_path)
    if is_deepship_available:
        print(f"DeepShip 데이터셋에서 'ship' 오디오 파일 수집 중: {deepship_path}")
        found_ship_files = False
        # CORRECTED: Iterate directly through the class subdirectories at the top level of DeepShip
        for class_name in deepship_classes: # Use the provided deepship_classes list
            class_path = os.path.join(deepship_path, class_name)
            if os.path.isdir(class_path):
                print(f"  클래스 폴더 발견: {class_path} -> Class: {class_name}")
                for file_name in os.listdir(class_path):
                    if file_name.endswith('.wav'):
                        audio_path = os.path.join(class_path, file_name)
                        all_audio_paths.append(audio_path)
                        all_labels.append('ship') # Label all DeepShip ship types as 'ship'
                        found_ship_files = True
            else:
                 print(f"  경고: 예상 클래스 폴더 '{class_name}'를 '{deepship_path}'에서 찾을 수 없습니다.")


        if not found_ship_files:
            print(f"경고: '{deepship_path}' 내의 예상 클래스 폴더에서 'ship'으로 사용할 .wav 파일을 찾지 못했습니다. DeepShip 데이터셋 구조를 확인하세요.")
    else:
        print(f"경고: DeepShip 데이터셋을 찾을 수 없어 'ship' 데이터 수집을 건너뜁니다: {deepship_path}")


    # --- 2. Integrate Noise Data ('noise') ---
    is_noise_data_available_dir = os.path.exists(noise_data_dir)
    if is_noise_data_available_dir:
        print(f"노이즈 데이터 수집 중: {noise_data_dir}")
        found_noise_files = False
        # Collect all .wav files under the noise data directory
        for root, _, files in os.walk(noise_data_dir):
             # Exclude the DeepShip directory itself if it was downloaded into the noise dir by mistake
             if root.startswith(deepship_path):
                  continue
             for file_name in files:
                 if file_name.endswith('.wav'):
                     audio_path = os.path.join(root, file_name)
                     # Ensure we don't duplicate paths if DeepShip and downloaded DeepShip point to the same files
                     if audio_path not in all_audio_paths: # Avoid adding DeepShip files if they somehow ended up here
                         all_audio_paths.append(audio_path)
                         all_labels.append('noise') # 모든 노이즈 데이터를 'noise'로 레이블링
                         noise_audio_paths.append(audio_path) # Also add to noise_audio_paths for augmentation
                         found_noise_files = True

        if not found_noise_files:
            print(f"경고: '{noise_data_dir}'에서 'noise'로 사용할 .wav 파일을 찾지 못했습니다.")
    else:
        print(f"경고: 노이즈 데이터 디렉토리 '{noise_data_dir}'를 찾을 수 없어 'noise' 데이터 수집을 건너뜁니다.")
        print("실제 노이즈 데이터를 다운로드하여 이 디렉토리에 위치시켜주세요.")


    # --- 3. Data Preparation and Split ---
    unique_labels = np.unique(all_labels)

    # Check if data for both 'ship' and 'noise' classes is available and sufficient
    # We need at least 2 classes and some data for each class to perform a stratified split
    if len(all_audio_paths) > 0 and len(unique_labels) >= 2:
        # Check if each unique label has at least 2 samples for stratified split
        label_counts = pd.Series(all_labels).value_counts()
        if all(count >= 2 for count in label_counts):
            print(f"\n데이터 수집 완료. 총 샘플 수: {len(all_audio_paths)}")
            print(f"클래스 분포: {label_counts.to_dict()}")

            # 레이블 인코딩 ('ship', 'noise' 등 -> 0, 1 등)
            label_encoder = LabelEncoder()
            encoded_labels = label_encoder.fit_transform(all_labels)
            num_classes = len(label_encoder.classes_)
            print(f"레이블 인코딩 완료. 클래스: {label_encoder.classes_}, 총 {num_classes}개")

            # 데이터셋 분할 (훈련 및 테스트) - Stratified split으로 클래스 비율 유지
            X_train_paths, X_test_paths, y_train_encoded, y_test_encoded = train_test_split(
                all_audio_paths, encoded_labels, test_size=test_size, random_state=random_state, stratify=encoded_labels
            )

            print(f"데이터 분할 완료.")
            print(f"훈련 데이터 샘플 수: {len(X_train_paths)}")
            print(f"테스트 데이터 샘플 수: {len(X_test_paths)}")
            is_data_prepared = True # 데이터 준비 성공 플래그

        else:
             print("\n오류: 각 클래스별 샘플 수가 부족하여 데이터 분할(stratified split)을 수행할 수 없습니다.")
             print(f"클래스별 샘플 수: {label_counts.to_dict()}")
             is_data_prepared = False
             X_train_paths, X_test_paths, y_train_encoded, y_test_encoded = [], [], np.array([]), np.array([])


    else:
        print("\n오류: 'ship'과 'noise' 이진 분류를 위한 데이터가 충분하지 않습니다.")
        print(f"수집된 총 샘플 수: {len(all_audio_paths)}, 확인된 클래스: {unique_labels}")
        print("DeepShip 데이터셋이 올바르게 클론되었는지, 노이즈 데이터가 '{noise_data_dir}'에 충분히 있는지 확인해주세요.")
        is_data_prepared = False
        X_train_paths, X_test_paths, y_train_encoded, y_test_encoded = [], [], np.array([]), np.array([])


    print("\n데이터셋 로드 및 준비 함수 정의 완료.")

    # Return values regardless of success, check is_data_prepared flag later
    # Also return noise_audio_paths for potential augmentation
    return X_train_paths, X_test_paths, y_train_encoded, y_test_encoded, label_encoder, is_data_prepared, num_classes, noise_audio_paths

## 4. 오디오 전처리 및 임베딩 추출 함수 정의

각 오디오 모델(YAMNet, PANNs, VGGish)에 대한 오디오 전처리 및 임베딩 추출 함수를 정의합니다.

In [9]:
import tensorflow as tf # Ensure tf is imported
import tensorflow_hub as hub # Ensure hub is imported
import numpy as np # Ensure numpy is imported
import soundfile as sf # Ensure soundfile is imported
import librosa # Ensure librosa is imported
import os # Ensure os is imported if needed within functions
import random # Import random for selecting noise file

print("\n4. 오디오 전처리 및 임베딩 추출 함수 정의 중 (데이터 증강 포함)...")

# Assuming YAMNET_SAMPLE_RATE is defined in previous cells (should be 16000)
# Assuming global variables like noise_audio_paths (list of paths to noise files)
# will be available for data augmentation, or passed as an argument.
# For this modification, we assume noise_audio_paths is available in the scope where extract_embedding is called.

# Helper function for basic audio loading and resampling to a target sample rate
def load_and_resample_audio(audio_path, target_sample_rate):
    """오디오 파일을 로드하고 지정된 샘플링 레이트로 리샘플링합니다."""
    try:
        # soundfile.read returns waveform and sample rate
        waveform, sr = sf.read(audio_path, dtype='float32')
        if sr != target_sample_rate:
            # Resample if sample rate doesn't match
            # Use 'soxr_vq' backend for potentially faster resampling
            # CORRECTED: Removed invalid res_type='soxr_vq'
            waveform = librosa.resample(y=waveform, orig_sr=sr, target_sr=target_sample_rate)
        if waveform.ndim > 1:
            # Convert stereo to mono by averaging channels
            waveform = np.mean(waveform, axis=1)
        return waveform, target_sample_rate
    except Exception as e:
        # Print specific error for debugging
        print(f"오류: 오디오 파일 로드 및 리샘플링 실패 - {audio_path}, 오류: {e}")
        return None, None


# --- Data Augmentation Function: Mixing Noise ---
def mix_audio_with_noise(audio_waveform, audio_sr, noise_audio_paths, noise_level=0.1):
    """
    주어진 오디오 파형에 랜덤한 노이즈 오디오를 섞습니다.

    Args:
        audio_waveform (np.ndarray): 원본 오디오 파형.
        audio_sr (int): 원본 오디오 샘플링 레이트.
        noise_audio_paths (list): 사용 가능한 노이즈 오디오 파일 경로 목록.
        noise_level (float): 노이즈의 상대적인 볼륨 레벨 (0.0 to 1.0).

    Returns:
        np.ndarray: 노이즈가 혼합된 오디오 파형, 또는 혼합 실패 시 원본 파형.
    """
    if not noise_audio_paths:
        # print("경고: 노이즈 오디오 파일 경로 목록이 비어 있어 노이즈 혼합을 수행할 수 없습니다.")
        return audio_waveform # Return original if no noise sources

    try:
        # Select a random noise file
        random_noise_path = random.choice(noise_audio_paths)

        # Load and resample noise audio to match the original audio's sample rate
        noise_waveform, noise_sr = load_and_resample_audio(random_noise_path, audio_sr)

        if noise_waveform is None or noise_sr != audio_sr:
            # print(f"경고: 노이즈 파일 로드 또는 리샘플링 실패 ({os.path.basename(random_noise_path)}). 노이즈 혼합 건너뜜.")
            return audio_waveform # Return original if noise loading fails

        # Ensure noise waveform is at least as long as the audio waveform
        if len(noise_waveform) < len(audio_waveform):
            # Pad the noise or tile it to match the audio length
            # Simple tiling for demonstration
            tile_factor = (len(audio_waveform) // len(noise_waveform)) + 1
            noise_waveform = np.tile(noise_waveform, tile_factor)
            noise_waveform = noise_waveform[:len(audio_waveform)]
        elif len(noise_waveform) > len(audio_waveform):
            # Trim the noise to match the audio length
            noise_waveform = noise_waveform[:len(audio_waveform)]


        # Mix the audio and noise waveforms
        # Normalize both signals to prevent clipping after mixing (optional but recommended)
        # max_amp = max(np.max(np.abs(audio_waveform)), np.max(np.abs(noise_waveform)))
        # if max_amp > 0:
        #     audio_waveform /= max_amp
        #     noise_waveform /= max_amp

        mixed_waveform = audio_waveform + noise_level * noise_waveform

        # Simple clipping to prevent values outside [-1, 1] range
        mixed_waveform = np.clip(mixed_waveform, -1.0, 1.0)

        return mixed_waveform

    except Exception as e:
        print(f"오류: 오디오와 노이즈 혼합 실패 - 오류: {e}")
        return audio_waveform # Return original on error


# 1. Embedding Extraction for YAMNet (with optional augmentation)
def extract_yamnet_embedding(audio_path, yamnet_model, target_sample_rate=YAMNET_SAMPLE_RATE,
                             augment_with_noise=False, noise_audio_paths=None, noise_level=0.1):
    """오디오 파일을 YAMNet 입력에 맞춰 전처리하고 임베딩을 추출합니다 (데이터 증강 포함)."""
    waveform, sr = load_and_resample_audio(audio_path, target_sample_rate)

    if waveform is None or sr is None:
        return None # Return None if basic loading/resampling failed

    # --- Apply Data Augmentation (Noise Mixing) ---
    if augment_with_noise and noise_audio_paths:
        # print(f"  {os.path.basename(audio_path)}에 노이즈 혼합 적용 중...")
        mixed_waveform = mix_audio_with_noise(waveform, sr, noise_audio_paths, noise_level)
        waveform = mixed_waveform # Use the mixed waveform for embedding extraction


    try:
        # YAMNet model expects a tensor input
        scores, embeddings, spectrogram = yamnet_model(tf.constant(waveform, dtype=tf.float32))
        # Average frame-level embeddings to get a single embedding vector per audio file
        mean_embedding = tf.reduce_mean(embeddings, axis=0)
        return mean_embedding.numpy() # Return as NumPy array

    except Exception as e:
        # Print specific error for debugging
        print(f"오류: YAMNet 임베딩 추출 실패 - {audio_path}, 오류: {e}")
        return None


# 2. Embedding Extraction for PANNs (using YAMNet placeholder, with optional augmentation)
def extract_panns_embedding(audio_path, panns_model, target_sample_rate=YAMNET_SAMPLE_RATE,
                            augment_with_noise=False, noise_audio_paths=None, noise_level=0.1):
     """오디오 파일을 PANNs 입력에 맞춰 전처리하고 임베딩을 추출합니다 (YAMNet 플레이스홀더 사용, 데이터 증강 포함)."""
     # 이 함수는 PANNs 플레이스홀더(현재 YAMNet 사용)를 위한 임베딩 추출 함수입니다.
     # 플레이스홀더 모델은 YAMNet과 동일하게 파형을 입력으로 받습니다.
     # In a real scenario with a true PANNs model, this function would be adapted for its specific input/output.
     # We still check for model existence before calling
     if panns_model is None:
         # print(f"경고: PANNs 모델(플레이스홀더)이 로드되지 않았습니다. 임베딩 추출을 건너뜜: {audio_path}")
         return None

     waveform, sr = load_and_resample_audio(audio_path, target_sample_rate)

     if waveform is None or sr is None:
        return None # Return None if basic loading/resampling failed

     # --- Apply Data Augmentation (Noise Mixing) ---
     if augment_with_noise and noise_audio_paths:
         # print(f"  {os.path.basename(audio_path)}에 노이즈 혼합 적용 중 (PANNs placeholder)...")
         mixed_waveform = mix_audio_with_noise(waveform, sr, noise_audio_paths, noise_level)
         waveform = mixed_waveform # Use the mixed waveform for embedding extraction


     try:
        # PANNs placeholder (YAMNet) expects a tensor input
        _, embeddings, _ = panns_model(tf.constant(waveform, dtype=tf.float32))
        mean_embedding = tf.reduce_mean(embeddings, axis=0).numpy() # Average frame-level embeddings
        return mean_embedding

     except Exception as e:
         print(f"오류: PANNs 임베딩 추출 실패 - {audio_path}, 오류: {e}")
         return None


# 3. Embedding Extraction for VGGish (with optional augmentation)
# VGGish expects 16kHz mono float32 waveform and the model handles framing internally.
def extract_vggish_embedding(audio_path, vggish_model, target_sample_rate=YAMNET_SAMPLE_RATE):
    """오디오 파일을 VGGish 입력에 맞춰 전처리하고 임베딩을 추출합니다 (데이터 증강 미포함 - VGGish 특성 고려)."""
    # VGGish는 YAMNet과 다소 다른 전처리 및 특징 추출 방식을 사용하므로,
    # 간단한 오디오 파형 혼합 증강이 VGGish의 내부 스펙트로그램 계산 방식과 잘 맞지 않을 수 있습니다.
    # VGGish에 대한 증강은 좀 더 복잡한 접근 방식이 필요할 수 있으므로 여기서는 제외합니다.
    # 또는 오디오 파형 자체에 노이즈를 섞은 후 VGGish 입력으로 사용할 수도 있습니다.
    # 현재 코드는 YAMNet과 PANNs 플레이스홀더에만 노이즈 혼합 증강을 적용합니다.

    waveform, sr = load_and_resample_audio(audio_path, target_sample_rate)

    if vggish_model is None:
        # print(f"경고: VGGish 모델이 로드되지 않았습니다. 임베딩 추출을 건너뜜: {audio_path}")
        return None


    if waveform is None or sr is None:
        return None # Return None if basic loading/resampling failed

    try:
        # VGGish model directly handles the framing and embedding extraction from the waveform.
        # The model returns frame-level embeddings.
        embeddings = vggish_model(tf.constant(waveform, dtype=tf.float32))
        # For classification, we typically average the frame embeddings
        mean_embedding = tf.reduce_mean(embeddings, axis=0).numpy()
        return mean_embedding

    except Exception as e:
        print(f"오류: VGGish 임베딩 추출 실패 - {audio_path}, 오류: {e}")
        return None


print("오디오 전처리 및 임베딩 추출 함수 정의 완료 (데이터 증강 기능 포함).")


4. 오디오 전처리 및 임베딩 추출 함수 정의 중 (데이터 증강 포함)...
오디오 전처리 및 임베딩 추출 함수 정의 완료 (데이터 증강 기능 포함).


## 5. 오디오 모델 로드 함수 정의

전이 학습에 사용할 YAMNet, PANNs, VGGish 사전 학습 모델을 TensorFlow Hub에서 로드하는 함수를 정의합니다.

In [10]:
import tensorflow_hub as hub
import tensorflow as tf # Ensure tensorflow is imported for model loading

print("\n5. 오디오 모델 로드 함수 정의 중...")

# Define the TensorFlow Hub handles for the models.
# Using the global constants defined in Step 1
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
vggish_model_handle = 'https://tfhub.dev/google/vggish/1'
# Using the YAMNet handle as a placeholder for PANNs as per previous discussion.
panns_model_handle = 'https://tfhub.dev/google/yamnet/1' # Using YAMNet as placeholder for PANNs

def load_audio_models():
    """
    TensorFlow Hub에서 YAMNet, PANNs, VGGish 모델을 로드합니다.

    Returns:
        dict: 로드된 모델 객체를 담고 있는 딕셔너리.
              모델 로드 실패 시 해당 모델 이름에 대해 값은 None이 됩니다.
    """
    models = {}
    are_models_loaded_successfully = True

    print("\n오디오 모델 로드 중...")

    # Load YAMNet
    try:
        print("  YAMNet 모델 로드 중...")
        models['YAMNet'] = hub.load(yamnet_model_handle)
        print("  YAMNet 모델 로드 완료.")
    except Exception as e:
        print(f"  오류: YAMNet 모델 로드 중 오류 발생: {e}")
        models['YAMNet'] = None # Set to None if loading fails
        are_models_loaded_successfully = False

    # Load PANNs (Placeholder)
    try:
        print("  PANNs 모델 로드 중 (YAMNet 플레이스홀더 사용)...")
        models['PANNs'] = hub.load(panns_model_handle)
        print("  PANNs 모델 로드 완료 (YAMNet 플레이스홀더).")
    except Exception as e:
        print(f"  오류: PANNs 모델 로드 중 오류 발생: {e}")
        models['PANNs'] = None # Set to None if loading fails
        are_models_loaded_successfully = False


    # Load VGGish
    try:
        print("  VGGish 모델 로드 중...")
        models['VGGish'] = hub.load(vggish_model_handle)
        print("  VGGish 모델 로드 완료.")
    except Exception as e:
        print(f"  오류: VGGish 모델 로드 중 오류 발생: {e}")
        models['VGGish'] = None # Set to None if loading fails
        are_models_loaded_successfully = False

    # Check if at least one model loaded successfully
    if all(model is None for model in models.values()):
        print("오류: 모든 오디오 모델 로드에 실패했습니다.")
        are_models_loaded_successfully = False # Ensure this is False if all failed
    else:
         print("모델 로드 상태:")
         for name, model in models.items():
             status = "Loaded" if model else "Failed"
             print(f"  {name}: {status}")


    print("\n오디오 모델 로드 함수 정의 완료.")

    # Return the dictionary of models. The caller must check for None values.
    return models, are_models_loaded_successfully

# Example usage (will be called in a later cell):
# loaded_models, are_models_loaded = load_audio_models()


5. 오디오 모델 로드 함수 정의 중...


## 6. 데이터 로드, 임베딩 추출 및 데이터 준비 실행

정의된 함수들을 사용하여 데이터셋을 로드하고, 각 모델별로 임베딩을 추출하며, 훈련 및 테스트를 위한 최종 데이터셋(임베딩 및 레이블)을 준비합니다.

In [11]:
import numpy as np
import tensorflow as tf # Ensure tf is imported for constant conversion and one-hot encoding

# Assuming load_and_prepare_dataset, extract_yamnet_embedding, extract_panns_embedding, extract_vggish_embedding,
# load_audio_models, YAMNET_SAMPLE_RATE, YAMNET_EMBEDDING_DIM, VGGISH_EMBEDDING_DIM, PANNS_EMBEDDING_DIM,
# DEEPSHIP_BASE_PATH, MBARI_NOISE_BASE_DIR, DEEPSHIP_CLASSES are defined and available from previous cells.


# --- 1. Load and Prepare Dataset Paths and Labels ---
# This calls the function defined in Step 3
# Ensure load_and_prepare_dataset is defined and available
if 'load_and_prepare_dataset' not in locals():
    print("오류: 'load_and_prepare_dataset' 함수가 정의되지 않았습니다. 데이터 로드 및 준비를 건너뜀.")
    X_train_paths, X_test_paths, y_train_encoded, y_test_encoded, label_encoder, is_data_prepared, num_classes = [], [], np.array([]), np.array([]), None, False, 0
else:
    X_train_paths, X_test_paths, y_train_encoded, y_test_encoded, label_encoder, is_data_prepared, num_classes = load_and_prepare_dataset(
        deepship_path=DEEPSHIP_BASE_PATH,
        noise_data_dir=MBARI_NOISE_BASE_DIR,
        deepship_classes=DEEPSHIP_CLASSES # Pass the global constant
    )


# --- 2. Load Audio Models ---
# This calls the function defined in Step 5
# Ensure load_audio_models is defined and available
if 'load_audio_models' not in locals():
     print("오류: 'load_audio_models' 함수가 정의되지 않았습니다. 모델 로드를 건너뜀.")
     loaded_models, are_models_loaded = {}, False
else:
    loaded_models, are_models_loaded = load_audio_models()

# --- 3. Model-wise Embedding Extraction and Data Filtering ---
X_train_embeddings = {}
X_test_embeddings = {}
y_train_filtered = {} # Store encoded labels corresponding to successfully extracted train embeddings
y_test_filtered = {}  # Store encoded labels corresponding to successfully extracted test embeddings
y_test_encoded_filtered = {} # Store encoded test labels filtered by successful embedding extraction for evaluation reports

# Dictionary mapping model names to their loaded model object, embedding dimension, and extraction function
# Ensure embedding dimension constants and extraction functions are defined and available
models_info = {}
if are_models_loaded: # Only populate models_info if models were loaded
    if 'extract_yamnet_embedding' in locals() and 'YAMNET_EMBEDDING_DIM' in locals() and loaded_models.get('YAMNet') is not None:
        models_info['YAMNet'] = {'model': loaded_models['YAMNet'], 'dim': YAMNET_EMBEDDING_DIM, 'extract_func': extract_yamnet_embedding}
    else:
        print("경고: YAMNet 모델 정보 또는 함수가 누락되었습니다. YAMNet 임베딩 추출을 건너뜁니다.")
    if 'extract_panns_embedding' in locals() and 'PANNS_EMBEDDING_DIM' in locals() and loaded_models.get('PANNs') is not None:
        models_info['PANNs'] = {'model': loaded_models['PANNs'], 'dim': PANNS_EMBEDDING_DIM, 'extract_func': extract_panns_embedding}
    else:
         print("경고: PANNs 모델 정보 또는 함수가 누락되었습니다. PANNs 임베딩 추출을 건너뜁니다.")
    if 'extract_vggish_embedding' in locals() and 'VGGISH_EMBEDDING_DIM' in locals() and loaded_models.get('VGGish') is not None:
        models_info['VGGish'] = {'model': loaded_models['VGGish'], 'dim': VGGISH_EMBEDDING_DIM, 'extract_func': extract_vggish_embedding}
    else:
         print("경고: VGGish 모델 정보 또는 함수가 누락되었습니다. VGGish 임베딩 추출을 건너뜁니다.")


# Flag to indicate if embeddings were successfully extracted for at least one model with sufficient data
are_embeddings_extracted_successfully = False

if is_data_prepared and are_models_loaded and models_info:
    print("\n모델별 임베딩 추출 시작 (시간이 오래 걸릴 수 있습니다)...")

    for model_name, info in models_info.items():
        model = info['model']
        extract_func = info['extract_func']
        # embedding_dim = info['dim'] # Dim not needed here, used later for model building

        # Check if the model loaded successfully for this specific model_name (already done in models_info population)
        if model is None: # This case should be handled by not adding to models_info, but double check
             print(f"\n--- {model_name} 모델 로드 실패. 임베딩 추출 건너뜀 (내부 오류). ---")
             continue

        print(f"\n--- {model_name} 임베딩 추출 중 ---")
        train_embeddings_list = []
        test_embeddings_list = []
        train_labels_filtered_list = []
        test_labels_filtered_list = []

        # Process training data paths
        print("  훈련 데이터 처리 중...")
        for i, path in enumerate(X_train_paths):
            embedding = extract_func(path, model) # Pass the specific model object
            if embedding is not None:
                train_embeddings_list.append(embedding)
                train_labels_filtered_list.append(y_train_encoded[i]) # Keep the original encoded label

            if (i + 1) % 100 == 0: # Increased interval for less verbose output
                print(f"    {i+1}/{len(X_train_paths)} 훈련 파일 처리 완료.")

        # Process testing data paths
        print("  테스트 데이터 처리 중...")
        for i, path in enumerate(X_test_paths):
            embedding = extract_func(path, model) # Pass the specific model object
            if embedding is not None:
                test_embeddings_list.append(embedding)
                test_labels_filtered_list.append(y_test_encoded[i]) # Keep the original encoded label

            if (i + 1) % 50 == 0: # Increased interval for less verbose output
                print(f"    {i+1}/{len(X_test_paths)} 테스트 파일 처리 완료.")


        # Convert lists to NumPy arrays
        X_train_embeddings[model_name] = np.array(train_embeddings_list)
        X_test_embeddings[model_name] = np.array(test_embeddings_list)
        y_train_filtered[model_name] = np.array(train_labels_filtered_list)
        y_test_filtered[model_name] = np.array(test_labels_filtered_list)
        y_test_encoded_filtered[model_name] = np.array(test_labels_filtered_list) # Store for evaluation reports


        print(f"\n  {model_name} 임베딩 추출 및 필터링 완료.")
        print(f"  훈련 임베딩 형태: {X_train_embeddings[model_name].shape}")
        print(f"  훈련 레이블 형태: {y_train_filtered[model_name].shape}")
        print(f"  테스트 임베딩 형태: {X_test_embeddings[model_name].shape}")
        print(f"  테스트 레이블 형태: {y_test_filtered[model_name].shape}")

        # Check if embeddings were successfully extracted for this model with sufficient samples
        # Need at least 2 samples for training/validation split implicitly in Keras fit
        # And at least 2 classes in the filtered training data for training viability
        if X_train_embeddings[model_name].shape[0] >= 2 and model_name in y_train_filtered and len(np.unique(y_train_filtered[model_name])) >= 2:
            are_embeddings_extracted_successfully = True # Set flag to True if at least one model has data


    print("\n모델별 임베딩 추출 단계 완료.")

else:
    print("\n데이터 로드, 모델 로드, 또는 모델 정보 누락으로 임베딩 추출을 건너뜁니다.")


# --- 4. Prepare Labels for Training (One-Hot Encoding) ---
y_train_one_hot = {}
y_test_one_hot = {}

# Perform one-hot encoding only if embeddings were extracted successfully for at least one model
# and if label_encoder and num_classes are available and valid from data preparation
is_data_ready_for_training = False # Reset and determine based on one-hot encoding success

if are_embeddings_extracted_successfully and 'label_encoder' in locals() and label_encoder is not None and 'num_classes' in locals() and num_classes >= 2:
    print("\n모델별 레이블 One-Hot 인코딩 시작...")
    all_models_encoded_successfully = True # Track if all models that had embeddings are encoded

    for model_name in models_info.keys(): # Iterate through original model names that were intended to be processed
        # Check if filtered labels exist and are not empty for this model, and have at least 2 classes
        if model_name in y_train_filtered and y_train_filtered[model_name].size > 0 and len(np.unique(y_train_filtered[model_name])) >= 2 and \
           model_name in y_test_filtered and y_test_filtered[model_name].size > 0 and len(np.unique(y_test_filtered[model_name])) >= 2:

             print(f"--- {model_name} 레이블 인코딩 중 ---")
             try:
                 # One-hot Encode the filtered labels using the shared label_encoder
                 y_train_one_hot[model_name] = tf.keras.utils.to_categorical(y_train_filtered[model_name], num_classes=num_classes)
                 y_test_one_hot[model_name] = tf.keras.utils.to_categorical(y_test_filtered[model_name], num_classes=num_classes)
                 print(f"  {model_name} 훈련 레이블 형태 (One-Hot): {y_train_one_hot[model_name].shape}")
                 print(f"  {model_name} 테스트 레이블 형태 (One-Hot): {y_test_one_hot[model_name].shape}")
             except Exception as e:
                 print(f"  오 오류: {model_name} 레이블 One-Hot 인코딩 중 오류 발생: {e}")
                 # Initialize empty arrays if encoding fails
                 y_train_one_hot[model_name] = np.array([])
                 y_test_one_hot[model_name] = np.array([])
                 all_models_encoded_successfully = False # Mark failure


        else:
            print(f"  경고: {model_name}에 대한 필터링된 레이블이 부족하거나 클래스가 2개 미만이어서 One-Hot 인코딩을 건너뜁니다.")
            # Initialize empty arrays if filtered labels are missing or empty or insufficient classes
            y_train_one_hot[model_name] = np.array([])
            y_test_one_hot[model_name] = np.array([])
            all_models_encoded_successfully = False # Mark failure


    if all_models_encoded_successfully:
         print("\n모델별 레이블 One-Hot 인코딩 단계 완료.")
         is_data_ready_for_training = True # Set the flag to True if all intended models were encoded successfully
    else:
         print("\n일부 모델의 레이블 One-Hot 인코딩 실패 또는 데이터 부족.")
         is_data_ready_for_training = False # Set the flag to False if any intended model failed encoding

else:
    print("\n임베딩 추출 실패, 클래스 수 부족, 또는 label_encoder 누락으로 레이블 One-Hot 인코딩을 건너뜁니다.")
    is_data_ready_for_training = False # Set the flag to False


if is_data_ready_for_training:
    print("\n데이터 로드, 임베딩 추출 및 데이터셋 준비 단계 성공.")
else:
    print("\n데이터 로드, 임베딩 추출 및 데이터셋 준비 단계 실패: 훈련에 필요한 데이터가 부족합니다.")


데이터셋 로드 및 준비 시작: Ship vs Noise
DeepShip Base Path: /content/DeepShip
MBARI Noise Data Directory: /content/MBARI_noise_data
DeepShip 데이터셋에서 'ship' 오디오 파일 수집 중: /content/DeepShip
  클래스 폴더 발견: /content/DeepShip/Cargo -> Class: Cargo
  클래스 폴더 발견: /content/DeepShip/Passengership -> Class: Passengership
  클래스 폴더 발견: /content/DeepShip/Tanker -> Class: Tanker
  클래스 폴더 발견: /content/DeepShip/Tug -> Class: Tug
노이즈 데이터 수집 중: /content/MBARI_noise_data

데이터 수집 완료. 총 샘플 수: 75
클래스 분포: {'ship': 63, 'noise': 12}
레이블 인코딩 완료. 클래스: ['noise' 'ship'], 총 2개
데이터 분할 완료.
훈련 데이터 샘플 수: 60
테스트 데이터 샘플 수: 15

데이터셋 로드 및 준비 함수 정의 완료.


ValueError: too many values to unpack (expected 7)

## 7. 모델 구축 및 학습 함수 정의

각 모델에서 추출된 임베딩을 입력으로 받아 최종 분류를 수행하는 Keras 모델(분류 헤드)을 구축하고 학습시키는 함수를 정의합니다.

In [None]:
import tensorflow as tf # Ensure tf is imported
from tensorflow.keras.models import Model # Ensure Model is imported
from tensorflow.keras.layers import Input, Dense, Dropout # Ensure layers are imported
from tensorflow.keras.optimizers import Adam # Ensure Adam is imported
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau # Ensure callbacks are imported
import numpy as np # Ensure numpy is imported

print("\n7. 분류기 모델 구축 및 학습 함수 정의 중...")

def build_classifier_model(input_shape, num_classes, learning_rate=0.001):
    """
    임베딩 입력을 위한 분류 헤드 Keras 모델을 구축하고 컴파일합니다.

    Args:
        input_shape (int): 입력 임베딩의 차원.
        num_classes (int): 출력 클래스의 수 (이진 분류의 경우 2).
        learning_rate (float): Adam 옵티마이저의 학습률.

    Returns:
        tensorflow.keras.models.Model: 컴파일된 Keras 모델 객체.
    """
    embedding_input = Input(shape=(input_shape,), name='embedding_input')

    x = Dense(128, activation='relu')(embedding_input)
    x = Dropout(0.4)(x) # Increased dropout for regularization
    x = Dense(64, activation='relu')(x)
    x = Dropout(0.4)(x) # Increased dropout for regularization
    # Output layer for binary classification (num_classes=2) or multi-class
    output = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=embedding_input, outputs=output)

    # Model compilation
    optimizer = Adam(learning_rate=learning_rate)

    # Use categorical_crossentropy loss for multi-class (including binary with 2 classes)
    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    print("  분류 헤드 모델 구축 및 컴파일 완료.")
    model.summary()

    return model


def train_model(model, X_train, y_train_one_hot, X_test, y_test_one_hot,
                               epochs=50, batch_size=16):
    """
    주어진 데이터로 Keras 모델을 학습시킵니다.

    Args:
        model (tensorflow.keras.models.Model): 학습할 컴파일된 Keras 모델.
        X_train (np.ndarray): 訓練 임베딩 데이터.
        y_train_one_hot (np.ndarray): One-Hot 인코딩된 訓練 레이블.
        X_test (np.ndarray): 테스트 임베딩 데이터 (검증에 사용).
        y_test_one_hot (np.ndarray): One-Hot 인코딩된 테스트 레이블 (검증에 사용).
        epochs (int): 학습 에폭 수.
        batch_size (int): 배치 크기.

    Returns:
        tuple: (trained_model, history) 학습된 모델 객체와 학습 기록.
               データ不足またはエラー発生時 (None, None) 返還。
    """
    print("\n  모델 학습 시작...")

    # Data validation and sufficiency checks
    if X_train.size == 0 or y_train_one_hot.size == 0 or X_test.size == 0 or y_test_one_hot.size == 0:
        print("  경고: 분류기 학습을 위한 훈련 또는 테스트 데이터가 부족합니다. 학습을 건너뜁니다.")
        return None, None

    if X_train.shape[0] < 2 or X_test.shape[0] < 2:
        print("  경고: 분류기 학습을 위한 훈련 또는 테스트 데이터 샘플 수가 2개 미만입니다. 학습을 건너뜁니다.")
        return None, None

    # Ensure there are at least 2 unique classes in the training labels
    # np.unique(np.argmax(y_train_one_hot, axis=1)) gets the unique original encoded labels
    if y_train_one_hot.ndim > 1 and len(np.unique(np.argmax(y_train_one_hot, axis=1))) < 2:
         print("  경고: 분류기 학습을 위한 훈련 레이블에 클래스가 2개 미만입니다. 학습을 건너뜁니다.")
         return None, None
    elif y_train_one_hot.ndim == 1 and len(np.unique(y_train_one_hot)) < 2: # Handle case where labels might not be one-hot yet (should be one-hot here)
         print("  경고: 분류기 학습을 위한 훈련 레이블에 클래스가 2개 미만입니다 (One-Hot 인코딩 전 상태 확인). 학습을 건너뜁니다.")
         return None, None


    # EarlyStopping 및 ReduceLROnPlateau 콜백 설정
    early_stopping = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True)
    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=7, min_lr=0.000001)
    callbacks = [early_stopping, reduce_lr]

    # Model training
    try:
        history = model.fit(
            X_train, y_train_one_hot,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(X_test, y_test_one_hot), # Use validation_data for test set evaluation during training
            callbacks=callbacks,
            verbose=1
        )
        print("  모델 학습 완료.")
        return model, history
    except Exception as e:
        print(f"  오류: 모델 학습 중 오류 발생: {e}")
        return None, None


print("\n모델 구축 및 학습 함수 정의 완료.")

## 8. 모델 평가 및 결과 시각화 함수 정의

학습된 모델의 성능을 테스트 데이터에 대해 평가하고, 분류 리포트, 혼동 행렬, 학습 곡선 등을 시각화하는 함수를 정의합니다.

In [None]:
import matplotlib.pyplot as plt # Ensure matplotlib is imported
import seaborn as sns # Ensure seaborn is imported
from sklearn.metrics import classification_report, confusion_matrix # Ensure sklearn metrics are imported
import numpy as np # Ensure numpy is imported

print("\n8. 모델 평가 및 결과 시각화 함수 정의 중...")

# Assuming necessary libraries (matplotlib, seaborn, sklearn.metrics, numpy) are imported.
# Assuming label_encoder is available if evaluation is attempted.

def evaluate_and_visualize_model(model_name, trained_model, X_test, y_test_one_hot, y_test_encoded_filtered,
                                 label_encoder, history):
    """
    학습된 모델의 성능을 평가하고 결과를 시각화합니다.

    Args:
        model_name (str): 모델 이름 (예: 'YAMNet', 'VGGish').
        trained_model (tf.keras.Model): 학습된 모델 객체.
        X_test (np.ndarray): 테스트 임베딩 데이터.
        y_test_one_hot (np.ndarray): One-Hot 인코딩된 테스트 레이블.
        y_test_encoded_filtered (np.ndarray): 필터링된 실제 인코딩 레이블 (혼동 행렬, 리포트용).
        label_encoder (sklearn.preprocessing.LabelEncoder): 레이블 인코더 객체.
        history (tf.keras.callbacks.History): 모델 학습 기록.

    Returns:
        dict: 평가 결과를 담고 있는 딕셔너리 (loss, accuracy).
              평가 실패 시 None 값을 가질 수 있습니다.
    """
    print(f"\n--- {model_name} 모델 성능 평가 및 시각화 시작 ---")
    evaluation_metrics = {'loss': None, 'accuracy': None} # Initialize metrics dictionary

    # --- 1. 데이터 및 모델 유효성 확인 ---
    # Check if the model and data are valid and not empty
    if trained_model is None:
        print(f"  경고: {model_name} 모델이 학습되지 않았습니다. 평가를 건너뜁니다.")
        return evaluation_metrics

    if not isinstance(X_test, np.ndarray) or X_test.size == 0:
        print(f"  경고: {model_name} 모델의 테스트 임베딩 데이터가 유효하지 않거나 비어 있습니다. 평가를 건너뜁니다.")
        return evaluation_metrics

    if not isinstance(y_test_one_hot, np.ndarray) or y_test_one_hot.size == 0:
         print(f"  경고: {model_name} 모델의 One-Hot 테스트 레이블이 유효하지 않거나 비어 있습니다. 평가를 건너뜁니다.")
         return evaluation_metrics

    if not isinstance(y_test_encoded_filtered, np.ndarray) or y_test_encoded_filtered.size == 0:
         print(f"  경고: {model_name} 모델의 필터링된 실제 인코딩 레이블이 유효하지 않거나 비어 있습니다. 평가를 건너뜁니다.")
         return evaluation_metrics

    if label_encoder is None:
        print(f"  경고: 레이블 인코더가 None입니다. 평가 리포트 및 혼동 행렬 레이블이 정확하지 않을 수 있습니다.")
        # Proceed with evaluation but without accurate labels

    # Ensure there are at least 2 unique classes in the filtered test labels for meaningful evaluation
    unique_test_labels = np.unique(y_test_encoded_filtered)
    if len(unique_test_labels) < 2:
         print(f"  경고: 필터링된 테스트 데이터에 클래스가 2개 미만입니다 ({len(unique_test_labels)}개). 평가를 건너뜁니다.")
         return evaluation_metrics

    # Determine the number of classes from the one-hot labels shape or label_encoder
    num_classes = y_test_one_hot.shape[1] if y_test_one_hot.ndim > 1 else 1
    if label_encoder is not None:
        num_classes_from_encoder = len(label_encoder.classes_)
        if num_classes != num_classes_from_encoder:
             print(f"  경고: One-Hot 레이블 형태와 LabelEncoder의 클래스 수가 일치하지 않습니다 ({num_classes} vs {num_classes_from_encoder}).")
             # Use the number from one-hot labels for consistency with model output
             pass # Continue, using num_classes from one-hot shape


    print("  데이터 및 모델 유효성 확인 완료. 평가 진행.")

    # --- 2. 모델 평가 ---
    print("  테스트 데이터로 모델 평가...")
    try:
        loss, accuracy = trained_model.evaluate(X_test, y_test_one_hot, verbose=0)
        print(f"  테스트 세트 손실: {loss:.4f}")
        print(f"  테스트 세트 정확도: {accuracy:.4f}")
        evaluation_metrics['loss'] = loss
        evaluation_metrics['accuracy'] = accuracy
    except Exception as e:
         print(f"  오류: {model_name} 모델 평가 중 오류 발생: {e}")
         # Continue to prediction and reporting if evaluation fails but prediction might work


    # --- 3. 예측 및 리포트 생성 ---
    print("\n  예측 수행 및 리포트 생성...")
    try:
        y_pred_probs = trained_model.predict(X_test)
        y_pred = np.argmax(y_pred_probs, axis=1)

        print("\n  분류 리포트:")
        # Use target_names from label_encoder if available and matches unique test labels
        if label_encoder is not None and len(unique_test_labels) == num_classes:
             print(classification_report(y_test_encoded_filtered, y_pred, target_names=label_encoder.classes_))
        else:
             # Fallback without target names
             print(classification_report(y_test_encoded_filtered, y_pred))
             print("  경고: 레이블 인코더 또는 클래스 불일치로 인해 target_names를 사용할 수 없습니다.")


        # --- 4. 혼동 행렬 시각화 ---
        print("\n  혼동 행렬 시각화:")
        # Ensure the unique classes in the filtered test data match the number of classes for the matrix size
        if len(unique_test_labels) == num_classes:
             # Ensure labels for confusion matrix calculation cover all unique predicted and true labels
             all_possible_labels = np.unique(np.concatenate((y_test_encoded_filtered, y_pred)))
             # If label_encoder is available, use its classes order for consistent matrix
             if label_encoder is not None and hasattr(label_encoder, 'classes_'):
                 labels_for_matrix = np.arange(len(label_encoder.classes_))
                 # Filter labels_for_matrix to only include those present in y_test_encoded_filtered or y_pred for smaller datasets
                 # This avoids plotting empty rows/columns if not all classes are in the test set (post-filtering)
                 present_labels_in_data = np.unique(np.concatenate((y_test_encoded_filtered, y_pred))).tolist()
                 labels_for_matrix = [l for l in labels_for_matrix if l in present_labels_in_data]

                 cm = confusion_matrix(y_test_encoded_filtered, y_pred, labels=labels_for_matrix)

                 plt.figure(figsize=(max(6, len(labels_for_matrix)), max(5, len(labels_for_matrix)))) # Adjust figure size based on num classes
                 # Use label_encoder classes for tick labels if available, filtered to present labels
                 tick_labels = [label_encoder.classes_[i] for i in labels_for_matrix]
                 sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                            xticklabels=tick_labels, yticklabels=tick_labels)
             else:
                  # Fallback without labels if label_encoder or classes don't match
                  # Use unique labels from the data for matrix labels if label_encoder is not fully usable
                  labels_for_matrix = np.unique(np.concatenate((y_test_encoded_filtered, y_pred)))
                  cm = confusion_matrix(y_test_encoded_filtered, y_pred, labels=labels_for_matrix)
                  plt.figure(figsize=(max(6, len(labels_for_matrix)), max(5, len(labels_for_matrix)))) # Adjust figure size
                  sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') # Fallback without labels
             plt.xlabel('예측 레이블')
             plt.ylabel('실제 레이블')
             plt.title(f'{model_name} 혼동 행렬')
             plt.show()
        else:
             print("  경고: 필터링된 테스트 데이터에 모든 클래스가 포함되지 않아 혼동 행렬을 생성할 수 없습니다.")


    except Exception as e:
        print(f"  오류: {model_name} 모델 예측 또는 리포트 생성 중 오류 발생: {e}")


    # --- 5. 학습 과정 시각화 ---
    print("\n  학습 과정 시각화:")
    # Check if history object is valid and has the 'history' attribute
    if history is not None and hasattr(history, 'history'):
        plt.figure(figsize=(12, 5))

        # Accuracy plot
        plt.subplot(1, 2, 1)
        plt.plot(history.history.get('accuracy', []), label='훈련 정확도') # Use .get() for safety
        if 'val_accuracy' in history.history: # Check if validation accuracy exists
             plt.plot(history.history['val_accuracy'], label='검증 정확도')
        plt.xlabel('에폭')
        plt.ylabel('정확도')
        plt.title(f'{model_name} 훈련 및 검증 정확도')
        plt.legend()

        # Loss plot
        plt.subplot(1, 2, 2)
        plt.plot(history.history.get('loss', []), label='훈련 손실') # Use .get() for safety
        if 'val_loss' in history.history: # Check if validation loss exists
             plt.plot(history.history['val_loss'], label='검증 손실')
        plt.xlabel('에폭')
        plt.ylabel('손실')
        plt.title(f'{model_name} 훈련 및 검증 손실')
        plt.legend()

        plt.show()
    else:
        print("  경고: 학습 기록(history)이 없어 그래프를 그릴 수 없습니다.")


    print(f"\n--- {model_name} 모델 성능 평가 및 시각화 완료 ---")
    return evaluation_metrics # Return evaluation results


print("\n모델 평가 및 결과 시각화 함수 정의 완료.")

## 9. 예측 함수 정의

학습된 모델을 사용하여 새로운 오디오 파일에 대한 예측을 수행하는 함수를 정의합니다.

In [None]:
import os # Ensure os is imported
import numpy as np # Ensure numpy is imported

print("\n9. 예측 함수 정의 중...")

def predict_on_new_audio(model, yamnet_model, label_encoder, audio_path):
    """
    새로운 오디오 파일에 대해 학습된 모델로 예측을 수행합니다.

    Args:
        model (tensorflow.keras.models.Model): 예측에 사용할 학습된 Keras 모델.
        yamnet_model: YAMNet 모델 객체 (오디오 전처리 및 임베딩 추출에 사용).
        label_encoder (sklearn.preprocessing.LabelEncoder): 레이블 인코더 객체.
        audio_path (str): 예측할 오디오 파일 경로.

    Returns:
        str: 예측된 레이블 문자열, 또는 예측 실패 시 None.
    """
    print(f"\n새 오디오 파일('{os.path.basename(audio_path)}') 예측 시작...")

    if not os.path.exists(audio_path):
        print(f"오류: 예측할 오디오 파일을 찾을 수 없습니다: {audio_path}")
        return None

    # Use the defined YAMNet embedding extraction function for preprocessing
    # Ensure extract_yamnet_embedding is defined and available
    if 'extract_yamnet_embedding' not in locals():
        print("오류: 'extract_yamnet_embedding' 함수가 정의되지 않았습니다. 예측을 건너뜁니다.")
        return None

    new_audio_embedding = extract_yamnet_embedding(audio_path, yamnet_model)

    if new_audio_embedding is None:
        print("새 오디오 파일 임베딩 추출에 실패했습니다.")
        return None

    # Reshape the embedding to match the model's expected input shape (add batch dimension)
    new_audio_embedding = np.expand_dims(new_audio_embedding, axis=0)

    # Perform prediction
    try:
        prediction_probs = model.predict(new_audio_embedding)[0]
        predicted_class_idx = np.argmax(prediction_probs)

        # Convert predicted index back to label using the label encoder
        if label_encoder is not None and hasattr(label_encoder, 'inverse_transform'):
            predicted_label = label_encoder.inverse_transform([predicted_class_idx])[0]
        else:
            print("경고: label_encoder가 없거나 inverse_transform 메서드를 사용할 수 없습니다. 예측된 인덱스만 반환합니다.")
            predicted_label = predicted_class_idx # Return index if label_encoder is not usable


        print(f"\n예측 결과 for '{os.path.basename(audio_path)}':")
        if label_encoder is not None and hasattr(label_encoder, 'classes_'):
            # Print probabilities for each class
            for i, class_name in enumerate(label_encoder.classes_):
                print(f"  {class_name}: {prediction_probs[i]*100:.2f}%")
        else:
             # Print probabilities by index if class names are not available
             for i in range(len(prediction_probs)):
                  print(f"  Class {i} (Index {i}): {prediction_probs[i]*100:.2f}%")

        print(f"최종 예측: {predicted_label}")
        return predicted_label

    except Exception as e:
        print(f"오류: 예측 수행 중 오류 발생: {e}")
        return None


print("예측 함수 정의 완료.")

## 10. 모델별 전체 파이프라인 실행 및 결과 비교

정의된 함수들을 사용하여 각 모델(YAMNet, PANNs, VGGish)에 대해 데이터 준비(임베딩 추출 포함), 학습, 평가 과정을 순차적으로 실행하고, 최종 성능을 비교합니다.

In [None]:
import numpy as np
import tensorflow as tf
import pandas as pd
import os # Ensure os is imported
import matplotlib.pyplot as plt # Ensure matplotlib is imported
import seaborn as sns # Ensure seaborn is imported
from sklearn.metrics import classification_report, confusion_matrix # Ensure sklearn metrics are imported
import random # Ensure random is imported for augmentation


print("\n10. 모델별 전체 파이프라인 실행 및 결과 비교 시작...")

# --- 1. 데이터 로드 및 준비 ---
# Calls the function defined in Step 3 (corrected version)
# Ensure load_and_prepare_dataset is defined and available
if 'load_and_prepare_dataset' not in locals():
    print("오류: 'load_and_prepare_dataset' 함수가 정의되지 않았습니다. 파이프라인 실행을 중단합니다.")
    is_data_prepared = False
    X_train_paths, X_test_paths, y_train_encoded, y_test_encoded, label_encoder, num_classes, noise_audio_paths = [], [], np.array([]), np.array([]), None, 0, [] # Initialize noise_audio_paths
else:
    # Modified to also return noise_audio_paths
    X_train_paths, X_test_paths, y_train_encoded, y_test_encoded, label_encoder, is_data_prepared, num_classes, noise_audio_paths = load_and_prepare_dataset(
        deepship_path=DEEPSHIP_BASE_PATH,
        noise_data_dir=MBARI_NOISE_BASE_DIR,
        deepship_classes=DEEPSHIP_CLASSES # Pass the global constant
    )

# --- 2. 오디오 모델 로드 ---
# Calls the function defined in Step 5
# Ensure load_audio_models is defined and available
if 'load_audio_models' not in locals():
     print("오류: 'load_audio_models' 함수가 정의되지 않았습니다. 파이프라인 실행을 중단합니다.")
     are_models_loaded = False
     loaded_models = {}
else:
    loaded_models, are_models_loaded = load_audio_models()


# --- 3. 모델별 임베딩 추출 및 데이터 필터링 (배치 처리 및 증강 포함) ---
X_train_embeddings = {}
X_test_embeddings = {}
y_train_filtered = {} # Store encoded labels corresponding to successfully extracted train embeddings
y_test_filtered = {}  # Store encoded labels corresponding to successfully extracted test embeddings
y_test_encoded_filtered = {} # Store encoded test labels filtered by successful embedding extraction for evaluation reports

# Dictionary mapping model names to their loaded model object, embedding dimension, and extraction function
# Ensure embedding dimension constants and extraction functions are defined and available
models_info = {}
if are_models_loaded: # Only populate models_info if models were loaded
    if 'extract_yamnet_embedding' in locals() and 'YAMNET_EMBEDDING_DIM' in locals() and loaded_models.get('YAMNet') is not None:
        models_info['YAMNet'] = {'model': loaded_models['YAMNet'], 'dim': YAMNET_EMBEDDING_DIM, 'extract_func': extract_yamnet_embedding}
    else:
        print("경고: YAMNet 모델 정보 또는 함수가 누락되었습니다. YAMNet 처리를 건너뜁니다.")
    if 'extract_panns_embedding' in locals() and 'PANNS_EMBEDDING_DIM' in locals() and loaded_models.get('PANNs') is not None:
        models_info['PANNs'] = {'model': loaded_models['PANNs'], 'dim': PANNS_EMBEDDING_DIM, 'extract_func': extract_panns_embedding}
    else:
         print("경고: PANNs 모델 정보 또는 함수가 누락되었습니다. PANNs 처리를 건너뜁니다.")
    if 'extract_vggish_embedding' in locals() and 'VGGISH_EMBEDDING_DIM' in locals() and loaded_models.get('VGGish') is not None:
        models_info['VGGish'] = {'model': loaded_models['VGGish'], 'dim': VGGISH_EMBEDDING_DIM, 'extract_func': extract_vggish_embedding}
    else:
         print("경고: VGGish 모델 정보 또는 함수가 누락되었습니다. VGGish 처리를 건너뜁니다.")


# Flag to indicate if embeddings were successfully extracted for at least one model with sufficient data
are_embeddings_extracted_successfully = False

# Define batch size for embedding extraction
EMBEDDING_EXTRACTION_BATCH_SIZE = 32 # Adjust based on available RAM

if is_data_prepared and are_models_loaded and models_info:
    print("\n모델별 임베딩 추출 시작 (배치 처리 및 증강 포함 - 시간이 오래 걸릴 수 있습니다)...")

    # Combine train and test paths and labels for easier iteration
    all_paths = X_train_paths + X_test_paths
    all_encoded_labels = list(y_train_encoded) + list(y_test_encoded)
    split_indices = {'train': (0, len(X_train_paths)), 'test': (len(X_train_paths), len(all_paths))}


    for model_name, info in models_info.items():
        model = info['model']
        extract_func = info['extract_func']

        if model is None:
             print(f"\n--- {model_name} 모델 로드 실패. 임베딩 추출 건너뜁니다. ---")
             # Initialize empty arrays for this model to avoid errors later
             X_train_embeddings[model_name] = np.array([])
             X_test_embeddings[model_name] = np.array([])
             y_train_filtered[model_name] = np.array([])
             y_test_filtered[model_name] = np.array([])
             y_test_encoded_filtered[model_name] = np.array([]) # Also for evaluation reports
             continue # Skip to the next model


        print(f"\n--- {model_name} 임베딩 추출 중 ---")
        current_model_embeddings_list = []
        current_model_labels_filtered_list = []


        # Process all data in batches for embedding extraction
        for i in range(0, len(all_paths), EMBEDDING_EXTRACTION_BATCH_SIZE):
            batch_paths = all_paths[i:i + EMBEDDING_EXTRACTION_BATCH_SIZE]
            batch_labels = all_encoded_labels[i:i + EMBEDDING_EXTRACTION_BATCH_SIZE]

            print(f"  처리 중: 배치 {i // EMBEDDING_EXTRACTION_BATCH_SIZE + 1} / {len(all_paths) // EMBEDDING_EXTRACTION_BATCH_SIZE + (1 if len(all_paths) % EMBEDDING_EXTRACTION_BATCH_SIZE > 0 else 0)}")


            for j, audio_path in enumerate(batch_paths):
                 original_label_encoded = batch_labels[j]
                 original_label_str = label_encoder.inverse_transform([original_label_encoded])[0] if label_encoder is not None and hasattr(label_encoder, 'inverse_transform') else str(original_label_encoded)


                 # Apply augmentation (mix noise) only for 'ship' samples during training data processing
                 # Check if this sample is originally from the training set
                 is_training_sample = i + j < split_indices['train'][1]

                 # Determine if augmentation should be applied
                 # Apply augmentation if it's a training sample, the original label is 'ship', and noise data is available
                 apply_augmentation = is_training_sample and original_label_str == 'ship' and noise_audio_paths

                 # Extract embedding, potentially with augmentation
                 # Pass noise_audio_paths to the extract function if augmentation is needed
                 embedding = extract_func(
                     audio_path,
                     model,
                     augment_with_noise=apply_augmentation,
                     noise_audio_paths=noise_audio_paths,
                     noise_level=0.1 # Define noise level
                 )


                 if embedding is not None:
                    current_model_embeddings_list.append(embedding)
                    current_model_labels_filtered_list.append(original_label_encoded) # Keep the original encoded label

            # Optional: Add augmented samples for 'ship' class in training set
            # This is a simple approach; more sophisticated augmentation strategies exist.
            if is_training_sample and original_label_str == 'ship' and noise_audio_paths:
                 # Augment the current batch of ship training samples
                 for j, audio_path in enumerate(batch_paths):
                     if label_encoder.inverse_transform([batch_labels[j]])[0] == 'ship':
                         # Re-extract embedding with augmentation
                         augmented_embedding = extract_func(
                             audio_path,
                             model,
                             augment_with_noise=True, # Force augmentation
                             noise_audio_paths=noise_audio_paths,
                             noise_level=0.1 # Define noise level
                         )
                         if augmented_embedding is not None:
                              current_model_embeddings_list.append(augmented_embedding)
                              current_model_labels_filtered_list.append(batch_labels[j]) # Original label for augmented sample


        # Convert list to NumPy array after processing all batches for this model
        all_model_embeddings = np.array(current_model_embeddings_list)
        all_model_labels_filtered = np.array(current_model_labels_filtered_list)

        # Split the combined embeddings and labels back into train and test sets
        # This requires re-splitting based on the original distribution or tracking which samples were original/augmented
        # A simpler approach for demonstration is to re-split the combined filtered data
        # However, this loses the original train/test split integrity if samples failed extraction or were augmented.
        # A more robust approach would be to build train/test lists separately during batch processing.

        # Let's rebuild train/test lists separately during batch processing for correctness
        train_embeddings_list = []
        test_embeddings_list = []
        train_labels_filtered_list = []
        test_labels_filtered_list = []


        print("\n  훈련 데이터 임베딩 추출 및 필터링 (재처리, 증강 포함)...")
        for i, path in enumerate(X_train_paths):
            original_label_encoded = y_train_encoded[i]
            original_label_str = label_encoder.inverse_transform([original_label_encoded])[0] if label_encoder is not None and hasattr(label_encoder, 'inverse_transform') else str(original_label_encoded)

            # Extract original embedding
            embedding = extract_func(path, model, augment_with_noise=False) # No augmentation for original samples

            if embedding is not None:
                 train_embeddings_list.append(embedding)
                 train_labels_filtered_list.append(original_label_encoded)

            # Add augmented samples for 'ship' class if noise data is available
            if original_label_str == 'ship' and noise_audio_paths:
                 augmented_embedding = extract_func(
                      path,
                      model,
                      augment_with_noise=True, # Apply augmentation
                      noise_audio_paths=noise_audio_paths,
                      noise_level=0.1
                 )
                 if augmented_embedding is not None:
                      train_embeddings_list.append(augmented_embedding)
                      train_labels_filtered_list.append(original_label_encoded) # Augmented sample keeps original label


            if (i + 1) % 50 == 0:
                 print(f"    {i+1}/{len(X_train_paths)} 훈련 파일 처리 완료 (증강 포함).")


        print("  테스트 데이터 임베딩 추출 및 필터링...")
        for i, path in enumerate(X_test_paths):
            original_label_encoded = y_test_encoded[i]
            # No augmentation for test samples
            embedding = extract_func(path, model, augment_with_noise=False)

            if embedding is not None:
                test_embeddings_list.append(embedding)
                test_labels_filtered_list.append(original_label_encoded)

            if (i + 1) % 20 == 0:
                print(f"    {i+1}/{len(X_test_paths)} 테스트 파일 처리 완료.")


        # Convert lists to NumPy arrays
        X_train_embeddings[model_name] = np.array(train_embeddings_list)
        X_test_embeddings[model_name] = np.array(test_embeddings_list)
        y_train_filtered[model_name] = np.array(train_labels_filtered_list)
        y_test_filtered[model_name] = np.array(test_labels_filtered_list)
        y_test_encoded_filtered[model_name] = np.array(test_labels_filtered_list) # Store for evaluation reports


        print(f"\n  {model_name} 임베딩 추출 및 필터링 완료.")
        print(f"  훈련 임베딩 형태 (증강 포함): {X_train_embeddings[model_name].shape}")
        print(f"  훈련 레이블 형태 (증강 포함): {y_train_filtered[model_name].shape}")
        print(f"  테스트 임베딩 형태: {X_test_embeddings[model_name].shape}")
        print(f"  테스트 레이블 형태: {y_test_filtered[model_name].shape}")

        # Check if embeddings were successfully extracted for this model with sufficient samples
        # Need at least 2 samples for training/validation split implicitly in Keras fit
        # And at least 2 classes in the filtered training data for training viability
        if X_train_embeddings[model_name].shape[0] >= 2 and model_name in y_train_filtered and len(np.unique(y_train_filtered[model_name])) >= 2:
            are_embeddings_extracted_successfully = True # Set flag to True if at least one model has data


    print("\n모델별 임베딩 추출 단계 완료.")

else:
    print("\n데이터 로드, 모델 로드, 또는 모델 정보 누락으로 임베딩 추출을 건너뜁니다.")


# --- 4. Prepare Labels for Training (One-Hot Encoding) ---
y_train_one_hot = {}
y_test_one_hot = {}

# Perform one-hot encoding only if embeddings were extracted successfully for at least one model
# and if label_encoder and num_classes are available and valid from data preparation
is_data_ready_for_training = False # Reset and determine based on one-hot encoding success

if are_embeddings_extracted_successfully and 'label_encoder' in locals() and label_encoder is not None and 'num_classes' in locals() and num_classes >= 2:
    print("\n모델별 레이블 One-Hot 인코딩 시작...")
    all_models_encoded_successfully = True # Track if all models that had embeddings are encoded

    for model_name in models_info.keys(): # Iterate through original model names that were intended to be processed
        # Check if filtered labels exist and are not empty for this model, and have at least 2 classes
        if model_name in y_train_filtered and y_train_filtered[model_name].size > 0 and len(np.unique(y_train_filtered[model_name])) >= 2 and \
           model_name in y_test_filtered and y_test_filtered[model_name].size > 0 and len(np.unique(y_test_filtered[model_name])) >= 2:

             print(f"--- {model_name} 레이블 인코딩 중 ---")
             try:
                 # One-hot Encode the filtered labels using the shared label_encoder
                 y_train_one_hot[model_name] = tf.keras.utils.to_categorical(y_train_filtered[model_name], num_classes=num_classes)
                 y_test_one_hot[model_name] = tf.keras.utils.to_categorical(y_test_filtered[model_name], num_classes=num_classes)
                 print(f"  {model_name} 훈련 레이블 형태 (One-Hot): {y_train_one_hot[model_name].shape}")
                 print(f"  {model_name} 테스트 레이블 형태 (One-Hot): {y_test_one_hot[model_name].shape}")
             except Exception as e:
                 print(f"  오 오류: {model_name} 레이블 One-Hot 인코딩 중 오류 발생: {e}")
                 # Initialize empty arrays if encoding fails
                 y_train_one_hot[model_name] = np.array([])
                 y_test_one_hot[model_name] = np.array([])
                 all_models_encoded_successfully = False # Mark failure


        else:
            print(f"  경고: {model_name}에 대한 필터링된 레이블이 부족하거나 클래스가 2개 미만이어서 One-Hot 인코딩을 건너뜁니다.")
            # Initialize empty arrays if filtered labels are missing or empty or insufficient classes
            y_train_one_hot[model_name] = np.array([])
            y_test_one_hot[model_name] = np.array([])
            all_models_encoded_successfully = False # Mark failure


    if all_models_encoded_successfully:
         print("\n모델별 레이블 One-Hot 인코딩 단계 완료.")
         is_data_ready_for_training = True # Set the flag to True if all intended models were encoded successfully
    else:
         print("\n일부 모델의 레이블 One-Hot 인코딩 실패 또는 데이터 부족.")
         is_data_ready_for_training = False # Set the flag to False if any intended model failed encoding

else:
    print("\n임베딩 추출 실패, 클래스 수 부족, 또는 label_encoder 누락으로 레이블 One-Hot 인코딩을 건너뜁니다.")
    is_data_ready_for_training = False # Set the flag to False


if is_data_ready_for_training:
    print("\n데이터 로드, 임베딩 추출 및 데이터셋 준비 단계 성공.")
else:
    print("\n데이터 로드, 임베딩 추출 및 데이터셋 준비 단계 실패: 훈련에 필요한 데이터가 부족합니다.")


# --- 5. 모델 구축, 학습 및 평가 실행 ---
# Ensure build_and_train_classifier, evaluate_and_visualize_model are defined
if 'build_and_train_classifier' not in locals():
     print("오류: 'build_and_train_classifier' 함수가 정의되지 않았습니다. 모델 학습/평가를 건너뜁니다.")
     are_models_trained = False
     trained_models = {}
     training_histories = {}
     evaluation_results = {}
elif 'evaluate_and_visualize_model' not in locals():
     print("오류: 'evaluate_and_visualize_model' 함수가 정의되지 않았습니다. 모델 평가를 건너뜁니다.")
     # Proceed with training if possible, but evaluation will be skipped/fail
     pass # Let the code below handle training if possible

else: # Functions are defined, proceed with training and evaluation if data is ready
    print("\n모델 구축, 학습 및 평가 실행 시작...")

    # Dictionaries to store trained models, histories, and evaluation results
    trained_models = {}
    training_histories = {}
    evaluation_results = {} # To store metrics for comparison

    # Ensure embedding_dims is defined (from Step 1 constants)
    if 'embedding_dims' not in locals():
         print("오류: 'embedding_dims' 전역 변수가 정의되지 않았습니다. 모델 구축/학습을 건너뜁니다.")
         are_models_trained = False
    else: # embedding_dims is defined
        are_models_trained = False # Initialize flag

        if is_data_ready_for_training:
            print("\n모델별 분류기 구축, 학습 및 평가 진행 중...")

            # Use the keys from models_info to iterate through the models we intended to process
            models_to_process_keys = models_info.keys() if 'models_info' in locals() else []

            if not models_to_process_keys:
                print("오류: 처리할 모델 정보가 누락되었습니다 (models_info).")

            for model_name in models_to_process_keys:
                print(f"\n--- {model_name} 모델 처리 중 ---")

                # Check if all necessary data for training and evaluation exists and is not empty for this specific model
                # These checks should align with the validation in build_and_train_classifier and evaluate_and_visualize_model
                if model_name in X_train_embeddings and X_train_embeddings[model_name].size > 0 and \
                   model_name in y_train_one_hot and y_train_one_hot[model_name].size > 0 and \
                   model_name in X_test_embeddings and X_test_embeddings[model_name].size > 0 and \
                   model_name in y_test_one_hot and y_test_one_hot[model_name].size > 0 and \
                   model_name in y_test_filtered and y_test_filtered[model_name].size > 0 and \
                   model_name in y_test_encoded_filtered and y_test_encoded_filtered[model_name].size > 0 and \
                   model_name in embedding_dims and 'num_classes' in locals() and num_classes >= 2 and \
                   'label_encoder' in locals() and label_encoder is not None:

                    # Get embedding dimension for the current model
                    embedding_dim = embedding_dims[model_name]
                    print(f"  {model_name} 임베딩 차원: {embedding_dim}")

                    # Build and train the classifier model
                    model, history = build_and_train_classifier(
                        model_name=model_name,
                        X_train=X_train_embeddings[model_name],
                        y_train_one_hot=y_train_one_hot[model_name],
                        X_test=X_test_embeddings[model_name], # Use X_test for validation during training
                        y_test_one_hot=y_test_one_hot[model_name], # Use y_test_one_hot for validation during training
                        embedding_dim=embedding_dim,
                        num_classes=num_classes,
                        epochs=50, # Define epochs and batch_size
                        batch_size=16
                    )

                    # Store the trained model and history if training was successful
                    if model is not None and history is not None:
                        trained_models[model_name] = model
                        training_histories[model_name] = history
                        print(f"  {model_name} 모델 학습 및 결과 저장 완료.")
                        are_models_trained = True # Set flag to True if at least one model trained

                        # Evaluate the model
                        print(f"\n  --- {model_name} 모델 평가 ---")
                        # Ensure correct arguments are passed to evaluate_and_visualize_model
                        evaluation_metrics = evaluate_and_visualize_model(
                            model_name=model_name,
                            trained_model=trained_models[model_name],
                            X_test=X_test_embeddings[model_name],
                            y_test_one_hot=y_test_one_hot[model_name],
                            y_test_encoded_filtered=y_test_encoded_filtered[model_name],
                            label_encoder=label_encoder,
                            history=training_histories[model_name]
                        )
                        evaluation_results[model_name] = evaluation_metrics # Store evaluation metrics
                        print(f"\n  {model_name} 평가 완료.")

                    else:
                        print(f"  {model_name} 모델 학습 실패. 평가를 건너뜁니다.")
                        trained_models[model_name] = None
                        training_histories[model_name] = None
                        evaluation_results[model_name] = {'loss': None, 'accuracy': None} # Store None for metrics if skipped


                else:
                    print(f"경고: {model_name} 모델 학습/평가를 위한 데이터 또는 필수 변수가 부족합니다. 건너뜁니다.")
                    trained_models[model_name] = None
                    training_histories[model_name] = None
                    evaluation_results[model_name] = {'loss': None, 'accuracy': None} # Store None for metrics if skipped


            print("\n모델 구축, 학습 및 평가 실행 단계 완료.")

        else:
            print("\n데이터 준비 실패로 모델 학습 및 평가를 건너뜁니다.")
            are_models_trained = False # Ensure flag is False if data not ready


# --- 6. 모델 성능 비교 (요약) ---
print("\n6. 모델 성능 비교 (요약) 중...")
print("\n--- 모델별 최종 성능 비교 ---")
if evaluation_results and any(metrics['accuracy'] is not None for metrics in evaluation_results.values()):
    # Sort results by accuracy (optional, but helps in comparison)
    sorted_results = sorted(evaluation_results.items(), key=lambda item: item[1].get('accuracy') if item[1].get('accuracy') is not None else -1, reverse=True)

    for model_name, metrics in sorted_results:
        print(f"  {model_name}:")
        print(f"    테스트 손실: {metrics['loss']:.4f}" if metrics['loss'] is not None else "    테스트 손실: N/A")
        print(f"    테스트 정확도: {metrics['accuracy']:.4f}" if metrics['accuracy'] is not None else "    테스트 정확도: N/A")
else:
    print("평가 결과가 없어 모델 성능 비교를 수행할 수 없습니다.")


# --- 7. 새로운 오디오 파일에 대한 예측 (예시) ---
print("\n7. 새로운 오디오 파일 예측 예시 중...")
print("\n--- 새로운 오디오 파일 예측 예시 ---")

# Choose one of the trained models for prediction, e.g., the best performing one or YAMNet
# If trained_models is not empty, attempt prediction
if trained_models:
    # Choose the first successfully trained model for prediction example
    model_to_predict_name = next(iter(trained_models)) # Get the name of the first key
    model_to_predict = trained_models[model_to_predict_name]
    print(f"\n예측에 사용할 모델: {model_to_predict_name}")


    # Need a sample audio file path for prediction
    # Use the first path from the original combined list if available, or a dummy file
    predict_audio_path = None

    # Attempt to use the first path from the original combined list if it was populated
    if 'all_audio_paths' in locals() and all_audio_paths:
        predict_audio_path = all_audio_paths[0]
        print(f"예측을 위해 데이터셋에서 샘플 파일 선택: {predict_audio_path}")
    elif 'DEEPSHIP_BASE_PATH' in locals() and os.path.exists(os.path.join(DEEPSHIP_BASE_PATH, 'Cargo', '103.wav')): # Fallback to a known DeepShip file
        predict_audio_path = os.path.join(DEEPSHIP_BASE_PATH, 'Cargo', '103.wav')
        print(f"예측을 위해 DeepShip에서 샘플 파일 선택: {predict_audio_path}")
    # Add other fallbacks here if needed

    # Ensure the chosen model, label_encoder, and yamnet_model are available for prediction
    if predict_audio_path and os.path.exists(predict_audio_path) and \
       model_to_predict is not None and \
       'label_encoder' in locals() and label_encoder is not None and \
       'yamnet_model' in locals() and yamnet_model is not None: # yamnet_model is needed for preprocessing in predict_on_new_audio

        # Ensure predict_on_new_audio is defined and available
        if 'predict_on_new_audio' not in locals():
             print("오류: 'predict_on_new_audio' 함수가 정의되지 않았습니다. 예측을 건너뜁니다.")
        else:
            # Call the predict_on_new_audio function
            predicted_label = predict_on_new_audio(model_to_predict, yamnet_model, label_encoder, predict_audio_path)
            # The predicted label is printed inside the function

    else:
        print("\n예측을 수행할 수 없습니다:")
        if not predict_audio_path or not os.path.exists(predict_audio_path):
             print("  예측할 오디오 파일을 찾을 수 없습니다.")
        if model_to_predict is None:
             print(f"  학습된 '{model_to_predict_name}' 모델이 없습니다.")
        if 'label_encoder' not in locals() or label_encoder is None:
             print("  LabelEncoder가 없습니다.")
        if 'yamnet_model' not in locals() or yamnet_model is None:
             print("  YAMNet 모델이 없습니다 (예측 전처리용).")

else: # No models were trained
    print("\n학습된 모델이 없어 예측을 수행할 수 없습니다.")


print("\n모델별 전체 파이프라인 실행 및 결과 비교 단계 완료.")

# Subtask is completed.

## Summary:

### Data Analysis Key Findings

*   The refactoring process successfully defined functions for initial setup, data loading/preparation, audio preprocessing/embedding extraction, model loading, classifier building/training, evaluation/visualization, and prediction.
*   The data loading and preparation step (`load_and_prepare_dataset`) was executed but failed to collect any audio files from either the cloned DeepShip directory or the specified MBARI noise data directory.
*   This lack of data (0 samples collected, 0 classes identified) prevented the subsequent steps of embedding extraction, label one-hot encoding, model training, and evaluation from executing, as designed by the implemented data availability checks.
*   The pipeline correctly identified the data insufficiency and skipped the computationally intensive steps.

### Insights or Next Steps

*   **Critical Data Acquisition:** The most critical next step is to acquire the necessary audio data. This includes:
    *   Verifying the structure of the cloned DeepShip repository (`/content/DeepShip`) to understand why the `load_and_prepare_dataset` function is not finding the `.wav` files within the expected class subdirectories. Manual inspection of the `/content/DeepShip` directory contents might be necessary.
    *   Acquiring a substantial amount of MBARI noise data (or other relevant noise data) and placing it in the designated `MBARI_NOISE_BASE_DIR` (`/content/MBARI_noise_data`). The previous attempts to download sample files were not sufficient or successful in populating this directory with enough noise data.
*   **Re-run Pipeline:** Once sufficient data for both 'ship' and 'noise' classes (at least 2 samples per class for stratified splitting) is placed in the correct directories, the entire pipeline (starting from Step 6 which executes data loading, embedding, training, evaluation) needs to be re-executed. The existing code is designed to handle the process once data is available.
*   **Review Data Loading Logic:** If DeepShip files are still not found after verifying the directory structure, the `load_and_prepare_dataset` function's DeepShip traversal logic may need further debugging or adjustment based on the actual file paths.

## 데이터 디렉토리 구조 및 내용 확인

DeepShip 데이터셋 및 MBARI 노이즈 데이터 디렉토리의 실제 내용을 확인하여 데이터 로드 문제의 원인을 파악합니다.

In [None]:
import os

# Assuming DEEPSHIP_BASE_PATH and MBARI_NOISE_BASE_DIR are defined

print("\nDeepShip 데이터 디렉토리 내용 확인:")
if os.path.exists(DEEPSHIP_BASE_PATH):
    print(f"'{DEEPSHIP_BASE_PATH}' 내용:")
    # List contents of the DeepShip base directory, focusing on the expected class folders
    expected_deepship_subdirs = DEEPSHIP_CLASSES # Use the global constant
    found_content = False
    for item in os.listdir(DEEPSHIP_BASE_PATH):
        item_path = os.path.join(DEEPSHIP_BASE_PATH, item)
        if os.path.isdir(item_path):
            print(f"  {item}/")
            # If it's an expected class directory, list some of its contents
            if item in expected_deepship_subdirs:
                 try:
                     files_in_class_dir = os.listdir(item_path)
                     print(f"    ({len(files_in_class_dir)} items)")
                     for f in files_in_class_dir[:5]: # List up to 5 files
                         print(f"      {f}")
                     if len(files_in_class_dir) > 5:
                         print("      ...")
                     found_content = True
                 except Exception as e:
                      print(f"    오류: 디렉토리 내용 확인 중 오류 발생: {e}")
            else:
                 # List contents of unexpected subdirectories briefly
                 try:
                      sub_items = os.listdir(item_path)
                      print(f"    ({len(sub_items)} items)")
                      for f in sub_items[:3]: # List up to 3 items in other subdirs
                          print(f"      {f}")
                      if len(sub_items) > 3:
                          print("      ...")
                 except Exception as e:
                      print(f"    오류: 디렉토리 내용 확인 중 오류 발생: {e}")

        elif os.path.isfile(item_path):
            print(f"  {item}")
            found_content = True

    if not found_content:
        print("  (디렉토리가 비어 있습니다)")

else:
    print(f"경고: DeepShip Base Path '{DEEPSHIP_BASE_PATH}'를 찾을 수 없습니다.")


print("\nMBARI 노이즈 데이터 디렉토리 내용 확인:")
if os.path.exists(MBARI_NOISE_BASE_DIR):
    print(f"'{MBARI_NOISE_BASE_DIR}' 내용:")
    found_noise_content = False
    for root, dirs, files in os.walk(MBARI_NOISE_BASE_DIR):
        level = root.replace(MBARI_NOISE_BASE_DIR, '').count(os.sep)
        indent = '  ' * level
        print(f'{indent}{os.path.basename(root)}/')
        subindent = '  ' * (level + 1)
        if dirs:
             for d in dirs[:5]: # List up to 5 subdirectories
                  print(f'{subindent}{d}/')
             if len(dirs) > 5:
                  print(f'{subindent}...')

        if files:
             print(f'{subindent}파일들 ({len(files)}개):')
             for f in files[:5]: # List up to 5 files
                 print(f'{subindent}{f}')
                 if f.endswith('.wav'):
                      found_noise_content = True # Found at least one wav file
             if len(files) > 5:
                  print(f'{subindent}...')
        if not dirs and not files:
             print(f'{subindent}(비어 있음)')


    if not found_noise_content:
        print("\n경고: MBARI 노이즈 데이터 디렉토리에서 .wav 파일을 찾지 못했습니다.")

else:
    print(f"경고: MBARI Noise Base Directory '{MBARI_NOISE_BASE_DIR}'를 찾을 수 없습니다.")

print("\n데이터 디렉토리 내용 확인 완료.")

In [None]:
# Install boto3 for S3 access
!pip install -q boto3
print("boto3 설치 완료.")

## 2. 데이터 확보: DeepShip 클론 및 노이즈 데이터 준비

DeepShip 데이터셋을 클론하고, MBARI 노이즈 데이터 디렉토리를 준비합니다. 이전에 다운로드된 MBARI 노이즈 샘플 파일이 있다면 해당 디렉토리로 이동시킵니다.

**주의**: 실제 MBARI Pacific Sound 16kHz 데이터셋은 직접 다운로드 또는 접근 설정이 필요할 수 있습니다. 아래 코드는 DeepShip 클론 및 샘플 노이즈 파일 처리를 위한 예시입니다.

In [None]:
# ==============================================================================
# 2. 데이터 확보: DeepShip 클론 및 노이즈 데이터 준비 (MBARI 다운로드 포함)
# ==============================================================================
import boto3 # Ensure boto3 is imported
from botocore import UNSIGNED # Ensure UNSIGNED config is imported
from botocore.client import Config # Ensure Config is imported
from pathlib import Path # Ensure Path is imported
import io # Ensure io is imported for potential in-memory reads (though we're downloading)
from six.moves.urllib.request import urlopen # Ensure urlopen is imported if still needed (less likely with direct S3 download)
import os # Ensure os is imported
import subprocess # Ensure subprocess is imported

print("\n2. 데이터 확보: DeepShip 클론 및 노이즈 데이터 준비 중...")

# Check if DeepShip is already cloned
if not os.path.exists(DEEPSHIP_BASE_PATH):
    print(f"DeepShip 데이터셋 클론 중: {DEEPSHIP_BASE_PATH}")
    # Clone the DeepShip repository
    # Use --depth 1 to clone only the latest commit, saving time and space
    try:
        subprocess.run(['git', 'clone', '--depth', '1', 'https://github.com/irfankamboh/DeepShip.git', DEEPSHIP_BASE_PATH], check=True, capture_output=True)
        print("DeepShip 데이터셋 클론 완료.")
    except subprocess.CalledProcessError as e:
        print(f"오류: DeepShip 데이터셋 클론 실패: {e.stderr.decode()}")
        print("수동으로 https://github.com/irfankamboh/DeepShip.git 를 클론하거나 다운로드하여")
        print(f"'{DEEPSHIP_BASE_PATH}' 경로에 위치시켜주세요.")
    except Exception as e:
         print(f"오류: DeepShip 데이터셋 클론 중 예기치 않은 오류 발생: {e}")
else:
    print(f"DeepShip 데이터셋이 이미 존재합니다: {DEEPSHIP_BASE_PATH}")

# Ensure the MBARI noise base directory exists
os.makedirs(MBARI_NOISE_BASE_DIR, exist_ok=True)
print(f"MBARI 노이즈 데이터 디렉토리 확인/생성 완료: {MBARI_NOISE_BASE_DIR}")


# --- MBARI Noise Data Download ---
# Use Boto3 to access the public S3 bucket and download a limited number of files
# Based on the provided documentation example.
s3_client = boto3.client('s3',
    aws_access_key_id='',
    aws_secret_access_key='',
    config=Config(signature_version=UNSIGNED))

bucket = 'pacific-sound-16khz'
# Define a prefix to narrow down the files (e.g., a specific year and month)
# The documentation example uses '2018/01/'. Let's keep this or choose another if needed.
prefix = '2018/01/' # Using January 2018 data as example

# Limit the number of files to download to avoid excessive processing time and storage
MAX_NOISE_FILES_TO_DOWNLOAD = 200 # Set a reasonable limit for demonstration

print(f"\nMBARI 노이즈 데이터 다운로드 시도 중 (S3 버킷: {bucket}, Prefix: {prefix}, 최대 {MAX_NOISE_FILES_TO_DOWNLOAD} 파일):")

try:
    # List objects in the specified bucket and prefix, potentially in pages
    paginator = s3_client.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

    downloaded_count = 0
    found_any_objects = False # Track if any objects were found at all

    for page in pages:
        if 'Contents' in page:
            found_any_objects = True
            # print(f"  페이지에서 {len(page['Contents'])}개의 파일 발견. 다운로드 가능한 .wav 파일 탐색 중...") # Too verbose

            for obj in page['Contents']:
                key = obj['Key']
                # Only download .wav files and avoid directories or empty files
                if key.endswith('.wav') and obj.get('Size', 0) > 0:
                    # Construct the local file path to save within the MBARI_NOISE_BASE_DIR
                    local_file_path = os.path.join(MBARI_NOISE_BASE_DIR, os.path.basename(key))

                    # Check if the file already exists locally to avoid re-downloading
                    if os.path.exists(local_file_path):
                        # print(f"    파일이 이미 존재합니다. 건너뜁니다: {os.path.basename(key)}") # Too verbose
                        pass # Skip if file already exists
                    else:
                        print(f"    다운로드 중: {os.path.basename(key)}...")
                        try:
                            s3_client.download_file(bucket, key, local_file_path)
                            downloaded_count += 1
                            print(f"      다운로드 완료 ({downloaded_count}/{MAX_NOISE_FILES_TO_DOWNLOAD})")
                        except Exception as download_e:
                             print(f"    오류: 파일 다운로드 실패 ({os.path.basename(key)}): {download_e}")

                    # Stop downloading once the limit is reached
                    if downloaded_count >= MAX_NOISE_FILES_TO_DOWNLOAD:
                        print(f"\n  최대 다운로드 파일 수({MAX_NOISE_FILES_TO_DOWNLOAD})에 도달했습니다. 다운로드를 중지합니다.")
                        break # Break from the inner loop (files in this page)

            if downloaded_count >= MAX_NOISE_FILES_TO_DOWNLOAD:
                 break # Break from the outer loop (pages)


    if not found_any_objects:
        print(f"  경고: 지정된 Prefix '{prefix}'에서 파일을 찾을 수 없습니다.")
    elif downloaded_count == 0:
         print(f"\n  총 {len(response.get('Contents', []))}개 이상의 파일이 S3 버킷에 있지만, 지정된 Prefix에서 다운로드 가능한 .wav 파일을 찾지 못했거나 모두 건너뛰었습니다.")


    print(f"\n총 {downloaded_count}개의 노이즈 .wav 파일을 다운로드했습니다.")

except Exception as e:
    print(f"오류: MBARI 노이즈 데이터 다운로드 중 오류 발생: {e}")
    print("S3 버킷 접근 권한, Prefix 설정, 또는 Boto3 설정/설치를 확인해주세요.")


# Note: To get sufficient noise data for meaningful training, you may need to adjust the 'prefix'
# or 'MAX_NOISE_FILES_TO_DOWNLOAD', or implement more complex logic to gather data from multiple
# prefixes/months, depending on your data needs and the S3 bucket structure.
# Ensure that the downloaded data includes enough samples from the 'noise' category.
print("\n2. 데이터 확보 단계 완료.")