In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data

import librosa
import soundfile

import numpy as np

import os
from glob import glob
from tqdm import tqdm
import random

In [2]:
# 데이터셋 경로
input_data_dir = "./original"
output_data_dir = "./splitted"

def preprocess_all_wav_files(directory, output_directory, duration=1.0):
    for file_name in tqdm(os.listdir(directory)):
        if file_name.endswith('.m4a'):
            file_path = os.path.join(directory, file_name)
            preprocess_and_split_wav(file_path, output_directory, duration)

# 데이터 전처리 및 분할
def preprocess_and_split_wav(file_path, output_dir, duration=1.0):
    y, sr = librosa.load(file_path, sr=16000, mono=False)
    y = y[:, 160000:16000 * 60 * 32]
    total_samples = len(y[0])
    num_segments = int(np.ceil(total_samples / sr / duration))

    for i in range(num_segments):
        start = int(i * sr * duration)
        end = int(min((i + 1) * sr * duration, total_samples))
       
        segment0 = y[0][start:end]
        if len(segment0) < sr * duration:
            padding = np.zeros(int(sr * duration - len(segment0)))  # 수정된 부분
            segment0 = np.concatenate((segment0, padding))
        
        segment1 = y[1][start:end]
        if len(segment1) < sr * duration:
            padding = np.zeros(int(sr * duration - len(segment1)))  # 수정된 부분
            segment1 = np.concatenate((segment1, padding))
           
        output_file = os.path.join(output_dir, f"{os.path.splitext(os.path.basename(file_path))[0]}_{i}.wav")
        soundfile.write(output_file, np.swapaxes(np.array([segment0, segment1]), 0, 1), sr)

# 모든 wav 파일에 대해 전처리 수행
preprocess_all_wav_files(input_data_dir, output_data_dir, duration=1.0)

  y, sr = librosa.load(file_path, sr=16000, mono=False)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(file_path, sr=16000, mono=False)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(file_path, sr=16000, mono=False)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(file_path, sr=16000, mono=False)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)
  y, sr = librosa.load(file_path, sr=16000, mono=False)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_l

In [2]:
class DirectionDataset(data.Dataset):
    def __init__(self, rpath):
        bg_noises = glob('./bg_noise/TUT-acoustic-scenes-2016-evaluation/audio/*.wav')

        self.noise,_ = librosa.load(bg_noises[0], sr=16000)
        for bg_noise in bg_noises[1:10]:
            tmp, _ = librosa.load(bg_noise, sr=16000)
            self.noise = np.concatenate((self.noise, tmp), axis=0)
        
        self.audio_samples = glob(f"{rpath}/*.wav")
        self.label_map = {
            'left':0,
            'front':1,
            'right':2,
            'back':3,
        }
    
    def __len__(self):
        return len(self.audio_samples)
    
    def __getitem__(self, idx):
        audio, sr = librosa.load(self.audio_samples[idx], sr=16000, mono=False)
        
        for direction in self.label_map.keys():
            if direction in self.audio_samples[idx]:
                label = self.label_map[direction]
                break
        
        if random.random() > 0.3:
            random_sample = int(random.random() * len(self.noise)) - sr
            ramdom_sample = 0 if random_sample <= 0 else random_sample
            noise = self.noise[random_sample:random_sample+sr*1]

            # 원하는 SNR 설정 (예: 10 dB)
            desired_snr_db = random.choices(range(10, 20))[0]

            # SNR 계산
            clean_power = np.mean(audio ** 2)
            noise_power = np.mean(noise ** 2)
            snr_db = 10 * np.log10(clean_power / noise_power)

            # 배경 소음 스케일링
            scaling_factor = 10 ** ((snr_db - desired_snr_db) / 20)
            scaled_noise = noise * scaling_factor

            # 배경 소음을 음성에 추가
            try:
                audio = audio + scaled_noise
            except:
                audio = audio
        
        
        win_length = 320 # 320은 20ms 의미
        
        stft1 = librosa.stft(audio[0], n_fft=512, hop_length=win_length, win_length=win_length)
        stft2 = librosa.stft(audio[1], n_fft=512, hop_length=win_length, win_length=win_length)
        
        feature_set1 = torch.tensor(np.concatenate((stft1.real, stft2.real), axis=0))
        feature_set2 = torch.tensor(np.concatenate((stft1.imag, stft2.imag), axis=0))
        
        feature = torch.concatenate((feature_set1, feature_set2), axis=0)
        
        return feature.unsqueeze(0).to('cuda'), torch.tensor(label).to('cuda')

In [3]:
dataset = DirectionDataset('./splitted')
train_loader = data.DataLoader(dataset, batch_size=64, shuffle=True)

In [19]:
idx = 4500
dataset.__getitem__(idx)[1], dataset.audio_samples[idx]

(tensor(0, device='cuda:0'), './splitted/left_concat2_810.wav')

In [5]:
class DOAModel(nn.Module):
    def __init__(self):
        super(DOAModel, self).__init__()
        self.CNNLayer = nn.Sequential(
            nn.Conv2d(1, 64, (2,2), stride=(2,2)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2)),
            
            nn.Conv2d(64, 64, (2,2), stride=(2,2)),
            nn.ReLU(),
            
            nn.Conv2d(64, 64, (2,2), stride=(2,2)),
            nn.ReLU(),
        )
        
        self.flatten = nn.Flatten()
        
        self.LinearLayer = nn.Sequential(
            nn.Linear(12288, 512),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(256, 4),
            nn.Softmax(dim=1),
        )
    
    def forward(self, x):
        conv_output = self.CNNLayer(x)
        flatten = self.flatten(conv_output)
        linear_output = self.LinearLayer(flatten)
        
        return linear_output

In [9]:
model = DOAModel().to('cuda')
model.forward(torch.randn((1, 1, 1028, 51)).to('cuda'))

tensor([[0.2390, 0.2482, 0.2569, 0.2559]], device='cuda:0',
       grad_fn=<SoftmaxBackward0>)

In [10]:
big_loss = 10

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0003)

num_epochs = 100
for epoch in range(num_epochs):
    for batch_x, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    if loss.item() <= big_loss:
        big_loss = loss.item()
        torch.save(model, './best_model.pt')
        print('best model saved. loss : ', big_loss)

Epoch [1/100], Loss: 1.1559
best model saved. loss :  1.1559103727340698
Epoch [2/100], Loss: 1.0744
best model saved. loss :  1.0743571519851685
Epoch [3/100], Loss: 0.8516
best model saved. loss :  0.8515655994415283
Epoch [4/100], Loss: 0.8583
Epoch [5/100], Loss: 0.8163
best model saved. loss :  0.8162583708763123
Epoch [6/100], Loss: 0.8516
Epoch [7/100], Loss: 0.8315
Epoch [8/100], Loss: 0.8885
Epoch [9/100], Loss: 0.8184
Epoch [10/100], Loss: 0.9051
Epoch [11/100], Loss: 0.8340
Epoch [12/100], Loss: 0.8719
Epoch [13/100], Loss: 0.8259
Epoch [14/100], Loss: 0.8066
best model saved. loss :  0.8066262602806091
Epoch [15/100], Loss: 0.7657
best model saved. loss :  0.7657250761985779
Epoch [16/100], Loss: 0.8276
Epoch [17/100], Loss: 0.7561
best model saved. loss :  0.7560704350471497
Epoch [18/100], Loss: 0.8082
Epoch [19/100], Loss: 0.7614
Epoch [20/100], Loss: 0.7665
Epoch [21/100], Loss: 0.7771
Epoch [22/100], Loss: 0.7719
Epoch [23/100], Loss: 0.7660
Epoch [24/100], Loss: 0.839

KeyboardInterrupt: 

In [None]:
model = torch.load(model, './best_model.pt')

In [23]:
audio, sr = librosa.load('./┐╖2.m4a', sr=16000, mono=False)
audio = audio[:, :16000]
#audio = librosa.util.normalize(audio, axis=1, norm=2)

win_length = 320 # 320은 20ms 의미
stft1 = librosa.stft(audio[0], n_fft=512, hop_length=win_length, win_length=win_length)
stft2 = librosa.stft(audio[1], n_fft=512, hop_length=win_length, win_length=win_length)

feature_set1 = torch.tensor(np.concatenate((stft1.real, stft2.real), axis=0))
feature_set2 = torch.tensor(np.concatenate((stft1.imag, stft2.imag), axis=0))
        
feature = torch.concatenate((feature_set1, feature_set2), axis=0)
        
feature = feature.unsqueeze(0).to('cuda')
model.forward(feature.unsqueeze(0))

  audio, sr = librosa.load('./┐╖2.m4a', sr=16000, mono=False)
	Deprecated as of librosa version 0.10.0.
	It will be removed in librosa version 1.0.
  y, sr_native = __audioread_load(path, offset, duration, dtype)


tensor([[9.0302e-01, 6.0750e-02, 3.6184e-02, 4.2839e-05]], device='cuda:0',
       grad_fn=<SoftmaxBackward0>)

In [24]:
torch.save(model.state_dict(), './4direction_best.pt')