1. 전처리
2. 학습
3. 검증

In [None]:
from google.colab import drive
drive.mount('/content/drive/')  # 구글 드라이브 마운트
import os

# 데이터셋이 저장된 디렉토리 경로 설정
directory = '/content/drive/MyDrive/'
for filename in os.listdir(directory):
    f = os.path.join(directory, filename)
    # 파일 이름 출력
    print(filename)


In [None]:

# TensorFlow 및 필요한 패키지 설치
!pip install -U -q tensorflow tensorflow_datasets
!apt install —allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2


In [None]:

# 필요한 라이브러리 불러오기
import os
import pathlib
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display

# 시드 값 설정 (재현성 보장)
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# 데이터셋 경로 설정
DATASET_PATH = 'data/sounds'
data_dir = pathlib.Path(DATASET_PATH)

# 데이터셋이 없으면 구글 드라이브에서 다운로드하여 압축 해제
if not data_dir.exists():
  tf.keras.utils.get_file(
      'soundsources.zip',
      origin="file:///content/drive/MyDrive/soundsource2.zip",
      extract=True,
      cache_dir='.', cache_subdir='data')


In [None]:

# 추가 라이브러리 설치 및 불러오기
from pathlib import Path
%pip install tensorflow-io
import tensorflow_io as tfio

# 데이터셋에서 파일을 불러와서 오디오 데이터를 처리
directory = 'data/sounds'
files = Path(directory).glob('*')
sample = []
x = []
y = []


In [None]:

# 오디오 파일을 불러와서 L/R 채널 분리 및 전처리
for file in files:
    path = str(file)
    if path[-5] == 'L':
      aziangle =path.split("azi_")[1].split("_")[0]  # 방위각 추출
      polangle = path.split("pol_")[1].split("L")[0]  # 고도각 추출
      side = 'L'
      fileR = path[:-5] + "R.wav"  # 오른쪽 채널 파일 경로 생성
      audioLarray = np.array(tfio.audio.AudioIOTensor(path).to_tensor())  # 왼쪽 채널 오디오 데이터 불러오기
      audioRarray = np.array(tfio.audio.AudioIOTensor(fileR).to_tensor())  # 오른쪽 채널 오디오 데이터 불러오기
      audioLarray = audioLarray.transpose()[0]
      audioRarray = audioRarray.transpose()[0]
      x.append(np.array([audioLarray, audioRarray]))  # L/R 채널 오디오 데이터 저장
      y.append(np.array([int(aziangle), int(polangle)]))  # 방위각 및 고도각 저장


In [None]:

# STFT 변환 함수 정의
def get_stft(waveform):
  spectrogram = tf.signal.stft(
      waveform, frame_length=255, frame_step=128)  # STFT 변환
  spectrogram = tf.abs(spectrogram)  # STFT의 절대값 (크기)
  spectrogram = spectrogram[..., tf.newaxis]  # 채널 차원 추가 (CNN 입력을 위해)
  return spectrogram


In [None]:

# 입력 데이터의 형태를 정의
input_shape = x[0][0].shape
norm_layer = layers.Normalization()  # 정규화 레이어
stft_layer = layers.Lambda(lambda x : get_stft(x))  # STFT 레이어

# 활성화 함수 설정
acti  = "relu"

# 모델 입력층 정의 (왼쪽 및 오른쪽 채널)
inputL = layers.Input(shape = input_shape, name = "inputL")
inputR = layers.Input(shape = input_shape, name = "inputR")

# STFT 변환 및 크기 조정
stft_transformL = stft_layer(inputL)
stft_transformR = stft_layer(inputR)
resizelyr_L = layers.Resizing(32, 32)(stft_transformL)
resizelyr_R = layers.Resizing(32, 32)(stft_transformR)

# 정규화 레이어 적용
norm_layerL = norm_layer(resizelyr_L)
norm_layerR = norm_layer(resizelyr_R)

# CNN 층 구성 (컨볼루션, 풀링 등)
convlayer1L = layers.Conv2D(32, 3, activation=acti)(norm_layerL)
convlayer1R = layers.Conv2D(32, 3, activation=acti)(norm_layerR)
maxpool1L = layers.AveragePooling2D()(convlayer1L)
maxpool1R = layers.AveragePooling2D()(convlayer1R)
convlayer2L = layers.Conv2D(64, 3, activation=acti)(maxpool1L)
convlayer2R = layers.Conv2D(64, 3, activation=acti)(maxpool1R)
maxpool2L = layers.AveragePooling2D()(convlayer2L)
maxpool2R = layers.AveragePooling2D()(convlayer2R)

# 왼쪽과 오른쪽 채널의 특성 맵을 병합
concat = layers.concatenate([maxpool2L, maxpool2R])

# 드롭아웃 및 밀집층 구성
dropout = layers.Dropout(0.25)(concat)
flattt = layers.Flatten()(dropout)
dense1 = layers.Dense(128, activation= acti)(flattt)
dropout1 = layers.Dropout(0.3)(dense1)
dense2 = layers.Dense(64, activation = acti)(dense1)
dropout2 = layers.Dropout(0.3)(dense2)
dense3 = layers.Dense(32, activation = acti)(dense2)
dropout3 = layers.Dropout(0.3)(dense3)
dense4 = layers.Dense(8, activation = acti)(dense3)

# 최종 출력층 (방위각 및 고도각 예측)
output = layers.Dense(2, activation = 'tanh', name = "output")(dense4)
model = tf.keras.Model(inputs = [[inputL], [inputR]], outputs = [output])

# 모델 컴파일 (손실 함수 및 옵티마이저 설정)
model.compile(loss = 'mse', optimizer = tf.keras.optimizers.SGD(learning_rate = 0.00001), metrics = ['accuracy'])


In [None]:

# 데이터 전처리 (L/R 채널 분리 및 정규화)
x0 = []
x1 = []
for lists in x : 
  x0.append(lists[0])
  x1.append(lists[1])
x0 = np.array(x0)
x1 = np.array(x1)

y_ = []
for lists in y : 
  y_.append([lists[0]/360, lists[1]/180])  # 방위각 및 고도각 정규화
x = np.array(x)
y = np.array(y)
y_ = np.array(y_)


In [None]:

# 모델 학습
EPOCHS = 20
history = model.fit((x0, x1), y_, validation_split = 0.2,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(monitor='val_loss',verbose=1, patience=2),  # 조기 종료 콜백
)


In [None]:

# 학습 결과 시각화
metrics = history.history
plt.figure(figsize=(16,6))
plt.subplot(1,2,1)
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch')
plt.ylabel('Loss [MSE]')

plt.subplot(1,2,2)
plt.plot(history.epoch, 100*np.array(metrics['accuracy']), 100*np.array(metrics['val_accuracy']))
plt.legend(['accuracy', 'val_accuracy'])
plt.ylim([0, 100])
plt.xlabel('Epoch')
plt.ylabel('Accuracy [%]')


In [None]:

# 최종 출력층 (방위각 및 고도각 예측)
output = layers.Dense(2, activation = 'tanh', name = "output")(dense4)
model = tf.keras.Model(inputs = [[inputL], [inputR]], outputs = [output])

# 모델 컴파일 (손실 함수 및 옵티마이저 설정)
model.compile(loss = 'mse', optimizer = tf.keras.optimizers.SGD(learning_rate = 0.00001), metrics = ['accuracy'])

# 데이터 전처리 (L/R 채널 분리 및 정규화)
x0 = []
x1 = []
for lists in x : 
  x0.append(lists[0])
  x1.append(lists[1])
x0 = np.array(x0)
x1 = np.array(x1)

y_ = []
for lists in y : 
  y_.append([lists[0]/360, lists[1]/180])  # 방위각 및 고도각 정규화
x = np.array(x)
y = np.array(y)
y_ = np.array(y_)

# 모델 학습
EPOCHS = 20
history = model.fit((x0, x1), y_, validation_split = 0.2,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(monitor='val_loss',verbose=1, patience=2),  # 조기 종료 콜백
)

# 학습 결과 시각화
metrics = history.history
plt.figure(figsize=(16,6))
plt.subplot(1,2,1)
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch')
plt.ylabel('Loss [MSE]')

plt.subplot(1,2,2)
plt.plot(history.epoch, 100*np.array(metrics['accuracy']), 100*np.array(metrics['val_accuracy']))
plt.legend(['accuracy', 'val_accuracy'])
plt.ylim([0, 100])
plt.xlabel('Epoch')
plt.ylabel('Accuracy [%]')
