## 소리 데이터에 대한 이해

In [None]:
import librosa

In [None]:
import sounddevice as sd

In [None]:
example_file_path = librosa.ex('trumpet')
print(f'예제 파일은 다음 위치에 있습니다\n파일 경로: {example_file_path}')

In [None]:
y, sr = librosa.load(example_file_path)

In [None]:
sd.play(y)

In [None]:
print(f'y : {y}\nsampling_rate : {sr}')

In [None]:
print(f'y는 {len(y)} 길이의 벡터')

In [None]:
print(f'y의 앞 20개 값은?\n{y[:20]}')

In [None]:
print(f'y는 {len(y) / sr} 초의 신호 벡터!')

In [None]:
from matplotlib import pyplot as plt

In [None]:
sr

In [None]:
x = [i / sr for i in range(len(y))]

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(x, y)
plt.title('Audio Example Data (trumpet sound)', fontsize=20)
plt.xlabel('Time (sec)', fontsize=16)
plt.ylabel('Signal Amplitude', fontsize=16)
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(x[:sr], y[:sr])
plt.title('Audio Example Data (trumpet sound)', fontsize=20)
plt.xlabel('Time (sec)', fontsize=16)
plt.ylabel('Signal Amplitude', fontsize=16)
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(x[: sr // 10], y[: sr // 10])
plt.title('Audio Example Data (trumpet sound)', fontsize=20)
plt.xlabel('Time (sec)', fontsize=16)
plt.ylabel('Signal Amplitude', fontsize=16)
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(x[: sr // 100], y[: sr // 100])
plt.title('Audio Example Data (trumpet sound)', fontsize=20)
plt.xlabel('Time (sec)', fontsize=16)
plt.ylabel('Signal Amplitude', fontsize=16)
plt.show()

In [None]:
sd.play(y)

## Frame Processing

In [None]:
import librosa
import pandas as pd
import numpy as np

In [None]:
example_audio_path = './example/꿩+klankbeeld.wav'
example_label_path = './example/꿩+klankbeeld.txt'

frame_size = 1
stride = 0.2

In [None]:
example_audio, example_sr = librosa.load("./example/꿩+klankbeeld.wav")

In [None]:
print(
    f'audio 벡터 길이 : {len(example_audio)}\
    \naudio 파일 길이 : {len(example_audio) / example_sr} 초'
    
)

In [None]:
example_label = pd.read_csv(
    example_label_path, sep='\t', names=['start(s)', 'end(s)', 'label']
)

In [None]:
print(
    f'label :\
    \n{example_label}'
)

In [None]:
idx = example_label['end(s)'] - example_label['start(s)'] > frame_size  # (1)

example_label = example_label[idx]

In [None]:
each_n_frames = (
    np.floor(
        (
            (
                (
                    example_label['end(s)'] - example_label['start(s)']
                ) - frame_size
            ) / stride
        ) + 1
    )
).astype(int).values

In [None]:
total_n_frames = sum(each_n_frames)

In [None]:
print(f'각각의 구간마다 프레임 개수 : {each_n_frames}')
print(f'총 구간의 프레임 개수 : {total_n_frames}')

In [None]:
frame_vectors = np.zeros((int(total_n_frames), frame_size * example_sr))
target_vectors = np.zeros((int(total_n_frames)))

In [None]:
print(
    f'frame_vectors의 shape : {frame_vectors.shape}\
    \ntarget_vectors의 shape : {target_vectors.shape}'
)

In [None]:
for i in range(len(frame_vectors)):
    print(f'{i}-th frame vector[:5] : {frame_vectors[i][:5]}')
print(f'target_vectors : {target_vectors}')

In [None]:
print(
    f'frame vectors의 shape : {frame_vectors.shape}'
)

In [None]:
current_idx = 0

for i, (start, end, y) in example_label.iterrows():
    for j in range(each_n_frames[i]):
        start_idx = int((start + j * stride) * example_sr)
        end_idx = start_idx + frame_size * example_sr
        frame_vectors[current_idx] = example_audio[start_idx:end_idx]
        target_vectors[current_idx] = y
        current_idx += 1

In [None]:
print(
    f'frame_vectors의 shape : {frame_vectors.shape}\
    \ntarget_vectors의 shape : {target_vectors.shape}'
)

In [None]:
for i in range(len(frame_vectors)):
    print(f'{i}-th frame vector[:5] : {frame_vectors[i][:5]}')
print(f'target_vectors : {target_vectors}')

## Feature Extraction: Mel Spectrogram

In [None]:
from matplotlib import pyplot as plt
import sounddevice as sd
from librosa.display import specshow

In [None]:
y = frame_vectors[1]

In [None]:
sd.play(y)

In [None]:
print(
    f'벡터의 shape : {y.shape}\
    \n벡터의 대략적인 모습은.. :\n {y}'
)

In [None]:
x = [t / example_sr for t in range(len(y))]

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(x, y)
plt.title('Example Data (bird sound)', fontsize=20)
plt.xlabel('Time (sec)', fontsize=16)
plt.ylabel('Signal Amplitude', fontsize=16)
plt.show()


In [None]:
plt.figure(figsize=(10, 6))
plt.plot(x, y)
plt.title('Example Data (bird sound)', fontsize=20)
plt.xlabel('Time (sec)', fontsize=16)
plt.ylabel('Signal Amplitude', fontsize=16)
plt.axvspan(xmin=0, xmax=2048 / example_sr, color='gray', alpha=0.5)
plt.axvspan(
    xmin=512 / example_sr, xmax=(2048 + 512) / example_sr,
    color='orange', alpha=0.5
)
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(x, y)
plt.title('Example Data (bird sound)', fontsize=20)
plt.xlabel('Time (sec)', fontsize=16)
plt.ylabel('Signal Amplitude', fontsize=16)
plt.axvspan(
    xmin=0, xmax=2048 / example_sr, color='gray', alpha=0.5
)
plt.axvspan(
    xmin=512 / example_sr, xmax=(2048 + 512 ) / example_sr,
    color='orange', alpha=0.5
)
plt.axvspan(
    xmin=(example_sr - 2048) / example_sr, xmax=example_sr / example_sr,
    color='darkgoldenrod', alpha=0.5
)
plt.show()

In [None]:
S = librosa.feature.melspectrogram(y=y, sr=example_sr)

In [None]:
S.shape

In [None]:
fig, ax = plt.subplots()
S_dB = librosa.power_to_db(S, ref=np.max)
img = specshow(
    S_dB, x_axis='time', y_axis='mel',
    sr=example_sr, fmax=8000, ax=ax
)
fig.colorbar(img, ax=ax, format='%+2.0f dB')
ax.set(title='Mel-frequency spectrogram')
plt.show()

In [None]:
fig, ax = plt.subplots()
img = specshow(
    S, x_axis='time', y_axis='mel',
    sr=example_sr, fmax=8000, ax=ax
)
fig.colorbar(img, ax=ax, format='%+2.0f')
ax.set(title='Mel-frequency spectrogram')
plt.show()

In [None]:
print(
    f'1초 신호 데이터는 다음과 같이 변합니다.\
    \nBefore shape (signal) : {y.shape}\
    \nAfter shape (Mel Spectrogram) : {S_dB.shape}'
)

In [None]:
S_power = librosa.db_to_power(S_dB)

In [None]:
sd.play(y, 22050)

In [None]:
sd.play(librosa.feature.inverse.mel_to_audio(S_power), 22050)

In [None]:
sd.play(librosa.feature.inverse.mel_to_audio(S_dB), 22050)

## 레이블 데이터 정리

In [None]:
from os import listdir
from os.path import join, splitext
import pandas as pd

In [None]:
label_path = './data/data/labels/'
label_files = [
    f for f in listdir(label_path)
    if splitext(join(label_path, f))[-1] == '.txt'
]

In [None]:
print(
    f'label file 목록 예시 : {label_files[:3]}'
)

In [None]:
list_file_fn = 'file_list.xlsx'  # (1)
file_n = 0
with pd.ExcelWriter(list_file_fn) as writer:  # (2)
    audio_names = []  # (3)
    for i, file_name in enumerate(label_files):
        if file_name[:-3] + 'wav' not in listdir('./data/data/raw_data/'):
            raise FileNotFoundError('file not match')
        if file_name[:-3] + 'wav' in listdir('./data/data/raw_data/'):
            temp = pd.read_csv(label_path + file_name, sep='\t', header=None)
            temp.columns = ['start(s)', 'end(s)', 'label']
            temp.to_excel(writer, index=False, sheet_name=str(file_n))  # (4)
            file_n += 1
            audio_names.append(file_name[:-3] + 'wav')  # (5)
        if i == len(label_files) - 1:  # (6)
            pd.DataFrame(
                audio_names, columns=['audio_name']
            ).to_excel(writer, sheet_name='list')

## 전체 데이터 전처리

In [None]:
import time
import math

import numpy as np
import pandas as pd
from tqdm import tqdm
import librosa
from torch.utils.data import Dataset, DataLoader

In [None]:
def load_label_data(file_path, i, frame_size):
    audio_label = pd.read_excel(file_path, sheet_name=str(i))  # (1)
    audio_label['label'] = audio_label['label'].apply(
        lambda x: int(str(x)[0])
    )
 
    audio_label = np.array(audio_label)  # (2)
    audio_label = audio_label[
        (audio_label[:, 1] - audio_label[:, 0] > frame_size)
    ]  # (3)
    return audio_label
 
def load_sound_data(file_path, sampling_rate):
    audio, sr = librosa.load(file_path, sr=sampling_rate)  # (4)
    return audio, sr

In [None]:
def process_frame(audio, sr, label, frame_size=1, stride=0.2):
    n = np.sum(
        np.floor(
            (
                (label[:, 1] - label[:, 0]) - frame_size
            ) / stride + frame_size
        )
    )  # (1)
 
    frame_vector = np.zeros((int(n), frame_size * sr))  # (2)
    target_vector = np.zeros((int(n), 1))  # (3)
 
    i = 0
 
    for temp_label in label:  # (4)
        start = temp_label[0]  # (5)
        end = temp_label[1]  # (6)
        y = temp_label[2]  # (7)
 
        eter = int(
            np.floor((((end - start) - frame_size) / stride) + 1)
        )  # (8)
 
        for j in range(eter):  # (9)
            start_idx = int((start + j * stride) * sr)
            end_idx = start_idx + frame_size * sr
            frame_vector[i] = audio[start_idx:end_idx]  # (10)
            target_vector[i] = y  # (11)
            i += 1
    return frame_vector, target_vector

In [None]:
def extract_mel_feature(frame_vector, sr, n_mels, st=512):
    mel_feature = librosa.feature.melspectrogram(
        y=frame_vector, sr=sr, n_mels=n_mels, hop_length=st
    )  # (1)
    mel_feature = librosa.core.power_to_db(
        mel_feature, ref=np.max
    )  # (2)
    return mel_feature

In [None]:
def extract_mel_power_feature(frame_vector, sr, n_mels, st=512):
    mel_feature = librosa.feature.melspectrogram(
        y=frame_vector, sr=sr, n_mels=n_mels, hop_length=st
    )  # (1)
    return mel_feature

In [122]:
sr = 22050
frame_size = 1
stride = 0.2
n_mels = 64
n_feature = 34
short_time = 512
 
file_list = pd.read_excel('./file_list.xlsx', sheet_name='list')  # (1)
 
frame_vectors = []
label_vectors = []
idx_count = []
 
for file_i, file_name in enumerate(file_list['audio_name']):  # (2)
    if file_i % 10 == 0:
        print(f'{file_i}-th 파일을 처리하고 있습니다.')
 
    label_data_temp = load_label_data(
        './file_list.xlsx', file_i, frame_size
    )  # (3)
 
    if len(label_data_temp) == 0:  # (4)
        idx_count.append(0)
        continue
 
    audio_path = './data/data/raw_data/' + file_name
    audio_data_temp, _ = load_sound_data(audio_path, sr)  # (5)
 
    frame_vector, label_vector = process_frame(
        audio=audio_data_temp, sr=sr, label=label_data_temp,
        frame_size=frame_size, stride=stride
    )  # (6)
    frame_vectors.append(frame_vector)
    label_vectors.append(label_vector)
    
    idx_count.append(frame_vector.shape[0])
    
frame_vectors = np.concatenate(frame_vectors)  # (7)
label_vectors = np.concatenate(label_vectors)

0-th 파일을 처리하고 있습니다.


In [None]:
print(
    f'label vectors 의 shape: {label_vectors.shape}\
    \n비명 갯수 : {int(sum(label_vectors == 1))}\
    \n비명 아닌 갯수 : {int(sum(label_vectors == 0))}'
)

In [None]:
mel_features = []
for frame_vector in frame_vectors:
    mel_feature = extract_mel_feature(
        frame_vector=frame_vector, sr=sr, n_mels=n_mels
    )
    mel_features.append(mel_feature)
mel_features = np.stack(mel_features)
print(f'mel spectrogram shape: {mel_features.shape}')

In [None]:
train_ratio = 0.7  # (1)

total_n = frame_vectors.shape[0]  # (2)
train_n = int(total_n * train_ratio)  # (3)

train_idxes = np.random.choice(total_n, train_n, replace=False)  # (4)

train_mask = np.zeros(shape=total_n, dtype=bool)  # (5)
train_mask[train_idxes] = True  # (6)

In [None]:
print(train_mask[:10])

In [None]:
train_idxes[:10]

In [None]:
# idxes = []
# now = 0
# for i, count in enumerate(idx_count):
#     idxes.append([j for j in range(now, now + count)])
#     now += count

In [None]:
# import random
# random.shuffle(idxes)
# # idxes[:5]

In [None]:
# train_idxes = []
# for idx in idxes:
#     train_idxes.extend(idx)
#     if len(train_idxes) > train_idx:
#         break 

In [None]:
# len(train_idxes)

In [None]:
train_mel_features = mel_features[train_mask]
valid_mel_features = mel_features[~train_mask]

train_label_vectors = label_vectors[train_mask]
valid_label_vectors = label_vectors[~train_mask]

In [None]:
print(sum(train_label_vectors==0))
print(sum(train_label_vectors==1))

In [None]:
print(sum(valid_label_vectors==0))
print(sum(valid_label_vectors==1))

## PyTorch 데이터셋 & 로더 구현

In [None]:
class ScreamDataset(Dataset):
    def __init__(self, x, y):
        self.audio_features = x
        self.labels = y
    
    def __len__(self):
        return len(self.audio_features)
    
    def __getitem__(self, idx):
        audio_feature = torch.FloatTensor(self.audio_features[idx])
        audio_feature = audio_feature.unsqueeze(0)
        label = self.labels[idx]
        return (audio_feature, label)

In [None]:
batch_size = 128
learning_rate = 0.001
epochs = 10

In [None]:
train_dataset = ScreamDataset(
    train_mel_features, train_label_vectors
)  # (1)
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True
)  # (2)

In [None]:
valid_dataset = ScreamDataset(
    valid_mel_features, valid_label_vectors
)  # (1)
valid_loader = DataLoader(
    valid_dataset, batch_size=batch_size, shuffle=False
)  # (2)

## 모델 클래스 구현

In [None]:
import torch
from torch import nn

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

model = nn.Sequential(
    # |x| = (n, 1, 64, 44)
    nn.Conv2d(
        in_channels=1,
        out_channels=32,
        kernel_size=(64, 1),
    ),
    # |x| = (n, 32, 1, 44)
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.Dropout2d(p=0.3),
    nn.Conv2d(
        in_channels=32,
        out_channels=64,
        kernel_size=(1, 9),
        stride=4
    ),
    # |x| = (n, 64, 1, 9)
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.Dropout2d(p=0.3),
    nn.Flatten(),
    # |x| = (n, 64 * 1 * 9)
    nn.Linear(64 * 1 * 9, 1),
    # |x| = (n, 1)
).to(device)

## 모델 학습

In [None]:
import torch
from torch import optim
from sklearn.metrics import f1_score, accuracy_score

In [None]:
criterion = nn.BCEWithLogitsLoss()  # (1)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)  # (2)

In [None]:
train_n = len(train_dataset)
valid_n = len(valid_dataset)

for e in range(100):
    # init metrics for this epoch  # (1)
    train_loss = 0
    train_acc = 0
    train_f1_score = 0
    valid_loss = 0
    valid_acc = 0
    valid_f1_score = 0
    
        # train part
    model.train()  # (2)
    for audio_feature, label in train_loader:
        audio_feature = audio_feature.to(device)  # (3)
        label = label.to(device)
        optimizer.zero_grad()  # (4)
        
        pred = model(audio_feature)  # (5)
        
        loss = criterion(pred, label)  # (6)
        loss.backward()  # (7)
        optimizer.step()  # (8)

        pred = torch.round(torch.sigmoid(pred))  # (9)
        pred = pred.cpu().detach().numpy()  # (10)
        label = label.cpu().detach().numpy()

        acc = accuracy_score(label, pred)  # (11)
        f1 = f1_score(label, pred)

        n = len(label)  # (12)
        train_loss += loss.item() * n
        train_acc += acc * n  # 수정된 부분
        train_f1_score += f1 * n

    # validation part
    model.eval()
    with torch.no_grad():
        for audio_feature, label in valid_loader:
            audio_feature = audio_feature.to(device)
            label = label.to(device)
            
            pred = model(audio_feature)

            loss = criterion(pred, label)

            pred = torch.round(torch.sigmoid(pred))
            pred = pred.cpu().numpy()  # (13)
            label = label.cpu().numpy()
            
            acc = accuracy_score(label, pred)
            f1 = f1_score(label, pred)

            n = len(label)
            valid_acc += acc * n  # 동일하게 수정
            valid_loss += loss.item() * n
            valid_f1_score += f1 * n


    # calculate metrics for this epoch  # (14)
    train_loss = f'{train_loss / train_n : .4f}'
    train_acc = f'{train_acc / train_n : .3f}'
    train_f1_score = f'{train_f1_score / train_n : .3f}'    
    valid_loss = f'{valid_loss / valid_n : .4f}'
    valid_acc = f'{valid_acc / valid_n : .3f}'
    valid_f1_score = f'{valid_f1_score / valid_n : .3f}'

    # log metrics
    print(f'Epoch {e+0 : 03}')
    print('      |   loss  |   acc  |   f1   |')
    print(f'TRAIN | {train_loss} | {train_acc} | {train_f1_score} |')
    print(f'VALID | {valid_loss} | {valid_acc} | {valid_f1_score} |')
    print('----------------------------------')
    print()

## 모델 저장

In [None]:
torch.save(model.cpu().state_dict(), 'test.pth')

## 모델 로드

In [None]:
from torch import nn
import torch

In [None]:
model = nn.Sequential(
    nn.Conv2d(
        in_channels=1,
        out_channels=32,
        kernel_size=(64, 1),
    ),
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.Dropout2d(p=0.3),
    nn.Conv2d(
        in_channels=32,
        out_channels=64,
        kernel_size=(1, 9),
        stride=4
    ),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.Dropout2d(p=0.3),
    nn.Flatten(),
    nn.Linear(64 * 1 * 9, 1),
)

In [None]:
model.load_state_dict(torch.load('test.pth', map_location='cpu'))

## 데모 실행

In [None]:
import sys
import torch
import torch.nn as nn
from PyQt5 import QtCore
from PyQt5.QtWidgets import QApplication
from demo import MyWindow, MicrophoneRecorder

sampling_rate = 22050  # Hz
chunk_size = 22050  # samples

model_dir = 'test.pth'
model = nn.Sequential(
    nn.Conv2d(
        in_channels=1,
        out_channels=32,
        kernel_size=(64, 1),
    ),
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.Dropout2d(p=0.3),
    nn.Conv2d(
        in_channels=32,
        out_channels=64,
        kernel_size=(1, 9),
        stride=4
    ),
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.Dropout2d(p=0.3),
    nn.Flatten(),
    nn.Linear(64 * 1 * 9, 1),
)
model.load_state_dict(torch.load(model_dir, map_location='cpu'))

prediction_i = 0
predictions_collection = []

app = QApplication(sys.argv)
myWindow = MyWindow(model=model)
mic = MicrophoneRecorder(myWindow.read_collected)

# 시간 간격(초) 계산
interval = int(1000 * chunk_size / sampling_rate)  # ms 단위로 변경
t = QtCore.QTimer()
t.timeout.connect(mic.read)
t.start(interval)  # 계산된 interval 사용

myWindow.show()
app.exec_()
