# **Urban Sound Classification**

In [None]:
import IPython.display as ipd
import librosa
import librosa.display
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data_utils

In [None]:
path = '/content/drive/MyDrive/Colab Notebooks/디지털사운드/13주차/'
# path = 'D:\data/'
path_train = '/content/drive/MyDrive/Colab Notebooks/디지털사운드/13주차/Train/'
df = pd.read_csv(path + 'train.csv')
test_df = pd.read_csv(path + 'test.csv')
df

Unnamed: 0,ID,Class
0,0,siren
1,1,street_music
2,2,drilling
3,3,siren
4,4,dog_bark
...,...,...
5430,8725,engine_idling
5431,8726,dog_bark
5432,8727,engine_idling
5433,8728,engine_idling


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **데이터 전처리**

**범주형(Categorical) 데이터셋으로 변환**

In [None]:
# Converting classes into numeric format
df['numeric_class'] = df['Class'].astype('category').cat.codes
df

Unnamed: 0,ID,Class,numeric_class
0,0,siren,8
1,1,street_music,9
2,2,drilling,4
3,3,siren,8
4,4,dog_bark,3
...,...,...,...
5430,8725,engine_idling,5
5431,8726,dog_bark,3
5432,8727,engine_idling,5
5433,8728,engine_idling,5


**Train Dataset과 Validation Dataset으로 나누기**

In [None]:
def train_val_split(df):
    train_df = pd.DataFrame(columns = df.columns)
    val_df = pd.DataFrame(columns = df.columns)

    train_df = df[:int(df['ID'].count()*0.8)]
    val_df = df[4348:]
        
    return train_df, val_df

In [None]:
train_df, val_df = train_val_split(df)
train_df.shape, val_df.shape

((4348, 3), (1087, 3))

**진행 시각화(Visualization)**

In [None]:
import cv2
import sys

def drawProgressBar(current, total, string = '', barLen = 20):

    percent = current/total
    arrow = ">"
    if percent == 1:
        arrow = ""
   
    sys.stdout.write("\r")
    sys.stdout.write("Progress: [{:<{}}] {}/{}".format("=" * int(barLen * percent) + arrow, 
                                                         barLen, current, total) + string)
    sys.stdout.flush()

**동일한 크기의 오디오 클립 만들기**

In [None]:
def get_audio_same_len(wav, sr):
    if wav.shape[0] < 4 * sr:

      #4초보다 작은 것은 4초로 확대하고 복제.
        wav = np.pad(wav, int(np.ceil((4 * sr - wav.shape[0])/2)), mode = 'reflect')
    
    #4초까지 잘라내기
    wav = wav[:4 * sr]
    
    return wav

**스펙트로그램 만들기**

In [None]:
def get_melspectrogram_db(wav, sr):

  #mel 스펙트로그램 만들기
  
    wav = get_audio_same_len(wav, sr)
        
    spec = librosa.feature.melspectrogram(wav, sr, n_fft = 2048, hop_length = 512, 
                          n_mels = 128, fmin = 20, fmax = 8300)
    
    spec = librosa.power_to_db(spec, top_db = 80)
    return spec

**표준화와 정규화**

In [None]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler

def standard_norm(spec):
    mMscaler = MinMaxScaler()
    sdscaler = StandardScaler()

    spec = sdscaler.fit_transform(spec)
    spec = mMscaler.fit_transform(spec)
    spec_scaled = spec*255

    return spec_scaled

In [None]:
BATCH_SIZE = 32

**음성 데이터 로딩(loading)**

In [None]:
def load_data(df):
    audio_data = []
    sample_rates = []
    labels = []
    
    tot = len(df)
    curr = 0
    
    for idx in df.index:
        try:
            file_name = str(df['ID'][idx]) + '.wav'
            #CNN
            wav, sr = librosa.load('/content/drive/MyDrive/Colab Notebooks/data/Train/' + file_name)
            
            wav = get_audio_same_len(wav, sr)
    
            audio_data.append(wav)
            sample_rates.append(sr)
            
            labels.append(df['numeric_class'][idx])
            
            curr += 1
            drawProgressBar(curr, tot, barLen = 40)
        
        #예외처리
        except KeyboardInterrupt:
            print('KeyBoardInterrupt')
            break
        
        except Exception:
            print("Couldn't read file", df['ID'][idx])
            curr += 1
            
    print('\n')
    return np.stack(audio_data, axis = 0), np.array(sample_rates), np.array(labels)
    #3가지 형태로 저장됨. 오디오 데이터|샘플레이트|라벨

In [None]:
train_data, train_sr, train_labels = load_data(train_df)
#3가지로 저장된 것을 각각 분배.
val_data, val_sr, val_labels = load_data(val_df)





In [None]:
train_data.shape, val_data.shape

#88200 = 4 * 샘플링레이트 = 타임값 (ANN&DNN)

((4348, 88200), (1087, 88200))

**데이터 변환(Coversion)과 Tensor Dataset 구축**

In [None]:
# Convert numpy arrays to torch tensors | numpy에서 tensor로
train_data = torch.from_numpy(train_data)
train_labels = torch.from_numpy(train_labels).long()

val_data = torch.from_numpy(val_data)
val_labels = torch.from_numpy(val_labels).long()

# Create data loaders
train_data = data_utils.TensorDataset(train_data, train_labels)
val_data = data_utils.TensorDataset(val_data, val_labels)

## **Convolutional Neural Network(CNN)** on Spectrogram Images

In [None]:
set(train_sr), set(val_sr)

({22050}, {22050})

In [None]:
train_sr = 22050
val_sr = 22050

**DataLoader 구축하기**

In [None]:
def get_spectrogram_loader(audio_data, sr, batch_size, shuffle = False):

    hop_length = 512 # 샘플의 수
    n_fft = 2048 # 윈도우 # spectral resolution / window length

    audio_spec_img = []
    labels = []
    curr = 0
    tot = len(audio_data)

    for wav, label in audio_data:
        spec_img = standard_norm(get_melspectrogram_db(wav.numpy(), sr)) #스펙트로그램 만들기 && 표준화와 정규화
        spec_img = np.expand_dims(spec_img, axis = 0) #데이터의 차원 확대. 확대하지 않으면 데이터의 차원이 맞지 않음. => 대괄호 하나 더 만들어줌.
        audio_spec_img.append(spec_img) #각각 스펙트로그램 이미지를 추가
        labels.append(label)

        curr += 1
        drawProgressBar(curr, tot, barLen = 40)

    audio_spec_img = torch.Tensor(audio_spec_img) #스펙트로그램 이미지를 텐서로 저장
    audio_spec_img = audio_spec_img / 255
    
    labels = torch.Tensor(labels).long()

    audio_spec_img = data_utils.TensorDataset(audio_spec_img, labels) #스펙트로그램 이미지와 라벨을 텐서 데이터셋으로 모음.
    #audio_loader = data_utils.DataLoader(audio_spec_img, batch_size = batch_size, shuffle = shuffle)
    
    #return audio_loader
    return audio_spec_img

In [None]:
train_spec_dataset = get_spectrogram_loader(train_data, train_sr, BATCH_SIZE, shuffle = True)

#train_data(tensor dataset)를 함수를 거치면서 spectrogram 이미지 값이 포함됨 tensor 데이터셋으로 저장



In [None]:
train_spec_dataset[0][0].size()

#[채널수 , 높이, 넓이] = 이미지 값으로 변경됨(audio_spec_img). 이것이 input 값 | ANN에서는 타임값이였는데 CNN에서는 이미지 값으로 
#채널 : 1ch :스펙트로그램은 rgb값 안 넣어도 흑백이어도 상관 없어서 1채널로 저장.

torch.Size([1, 128, 173])

In [None]:
train_loader = data_utils.DataLoader(train_spec_dataset, batch_size = BATCH_SIZE, shuffle = False)
#모델에 입력 가능한 형태인 loader 제작.

In [None]:
#위와 똑같은 과정으로 valid dataset 제작.
val_spec_dataset  = get_spectrogram_loader(val_data, val_sr, BATCH_SIZE)



In [None]:
val_spec_dataset[0][0].size()

torch.Size([1, 128, 173])

In [None]:
val_loader = data_utils.DataLoader(train_spec_dataset, batch_size = BATCH_SIZE, shuffle = False)

**CNN 모델 구축하기**

In [None]:
# 파라미터 설정 방법
input = torch.Tensor(1,1,128,173) #이미지
conv1 = nn.Conv2d(1, 8, (5, 6)) #1ch이 들어가서 8ch이 나오고 / kernel_size가 5,6 (가로,세로) *가로 세로 동일 시 괄호 필요 없음. 한 번만 쓰면 됨.
conv2 = nn.Conv2d(8, 16, 3 #이전에 나온 수와 동일한 값을 집어 넣는 걸로 설정해야 함. 
pool =  nn.MaxPool2d(2) #pool연산 -> 특징이 모아짐 ->크기 감소
out=conv1(input)
out=pool(out)

out.shape
#1,128,173 - > 8, 62, 84
#밑의 과정도 동일

torch.Size([1, 8, 62, 84])

In [None]:
NUM_CLASSES = 10
LEARNING_RATE = 0.001
EPOCHS = 10

class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()

        #CNN은 이미지 사이즈가 일관성을 가지고 줄어들어야 함. 따라서 [1, 128,173] && 64*10*15 && NUM_CLASSES는 함부로 바꿔서는 안 됨.
        #위에 파라미터 설정 방법 탭을 이용해서 미리 계산 가능.
        #pytorch는 직접 계산해야 함.. keras는 자동으로 해줌.
        
        # Layer 1, Input shape (1, 128, 173) ->  Output shape (8, 62, 84)
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels = 1, out_channels = 8, kernel_size = (5, 6)), 
            nn.ReLU(), #이미지 크기를 변화시키지는 않음. 그래서 Conv2d 사용.
            nn.MaxPool2d(kernel_size = (2, 2)))
        
        # Layer 2, Input shape (8, 62, 84) -> Output shape (16, 30, 41)
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels = 8, out_channels = 16, kernel_size = (3, 3)), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = (2, 2)))
        
        # Layer 3, Input shape (16, 30, 41) -> Output shape (64, 10, 15)
        self.layer3 = nn.Sequential(
            nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size = (6, 7)), 
            nn.ReLU(), 
            nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = (6, 6)), 
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = (2, 2)))
        
        #flatten시킨 후 이어지는 과정
        #폈을 때의 값이 64*10*15가 나오는데, 코딩을 처음 짤 때는 예측을 해야 한다.

        # Fully Connected layer 1, Input features 64 * 10 * 15 -> Output features 512
        self.fc1 = nn.Linear(in_features = 64 * 10 * 15, out_features = 512)
        
        # Fully Connected layer 2, Input features 512 -> Output features 256
        self.fc2 = nn.Linear(in_features = 512, out_features = 256)
        
        # Fully Connected layer 3, Input features 256 -> Output features 128
        self.fc3 = nn.Linear(in_features = 256, out_features = 128)
        
        # Fully Connected layer 4, Input features 128 -> Output features 10
        self.fc4 = nn.Linear(in_features = 128, out_features = NUM_CLASSES)
        #최종 분류.
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        
        x = x.view(-1, self.num_flat_features(x))
        
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        
        return x
    
    def num_flat_features(self, x):
        size = x.size()[1:]
        n_features = 1
        for s in size:
            n_features = n_features * s
        
        return n_features

**CNN 클래스 불러오기**

In [None]:
model = ConvNet()

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = LEARNING_RATE)

**학습(Training)**

In [None]:
THRESHOLD = 0.001 
num_train_batches = len(train_loader)

for epoch in range(EPOCHS):
    print("Epoch " + str(epoch + 1) + ":")
    
    for i, batch in enumerate(train_loader):
        
        data, labels = batch
        
        outputs = model(data)
        loss = loss_fn(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total = labels.size(0)
        _, predicted = torch.max(outputs, dim = 1)
        correct = (predicted == labels).sum().item()
        accuracy = correct / total
        
        drawProgressBar((i + 1), num_train_batches, 
                              '\t loss: {:.4f} \t acc: {:.4f}'.format(round(loss.item(), 4), round(accuracy, 4)))
    
    print('\n\n')

Epoch 1:


Epoch 2:


Epoch 3:


Epoch 4:


Epoch 5:


Epoch 6:


Epoch 7:


Epoch 8:


Epoch 9:


Epoch 10:




**평가**

In [None]:
def evaluate(model, test_loader):

    model.eval()
    num_test_batches = len(test_loader)
    with torch.no_grad():
        correct = 0
        total = 0
        total_loss = 0
        for i, batch in enumerate(test_loader):
            inputs, labels = batch
            outputs = model(inputs)
            _, predicted = torch.max(outputs, dim = 1)
            loss = loss_fn(outputs, labels)
            total_loss += loss.item()

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            drawProgressBar((i+1), num_test_batches)
        
        accuracy = correct/total
        test_loss = total_loss/num_test_batches
    
    return accuracy, test_loss

In [None]:
val_acc, val_loss = evaluate(model, val_loader)

print("\n\nValidation accuracy: {:.4f}".format(round(val_acc, 4)))
print("Validation loss: {:.4f}".format(round(val_loss, 4)))

# 정확도가 20% -> 90%


Validation accuracy: 0.9121
Validation loss: 0.2446
