# E5-Spectrogram classification 모델 구현

## 1. 데이터 준비하기

### 데이터 다운받기 (1.6G 대용량)

mkdir -p ~/aiffel/speech_recognition/data
mkdir -p ~/aiffel/speech_recognition/models
wget https://aiffelstaticdev.blob.core.windows.net/dataset/speech_wav_8000.npz -P ~/aiffel/speech_recognition/data

In [1]:
import numpy as np
import os

data_path = os.getenv("HOME")+'/aiffel/speech_recognition/data/speech_wav_8000.npz'
speech_data = np.load(data_path)

print("Wave data shape : ", speech_data["wav_vals"].shape) # 50620*8000 행렬이란 소리 아닌가..8000*1사이즈 데이터 50620개
print("Label data shape : ", speech_data["label_vals"].shape) # 50620 by 1 행렬... 

Wave data shape :  (50620, 8000)
Label data shape :  (50620, 1)


### 데이터 확인해 보기

In [2]:
import IPython.display as ipd
import random

# 데이터 선택 (랜덤하게 선택하고 있으니, 여러번 실행해 보세요)
rand = random.randint(0, len(speech_data["wav_vals"]))  # 실행할 때마다 'rand num','label' 바뀜
print("rand num : ", rand)

sr = 8000 # 1초동안 재생되는 샘플의 갯수
data1 = speech_data["wav_vals"]  # 웨이브 데이터??
data1[rand]
data2 = speech_data["label_vals"]
data2[rand]
print("Wave data shape : ", data1[rand].shape)
print("label data shape : ", data2[rand].shape)
print("label : ", data2[rand])

#ipd.Audio(data, rate=sr)

rand num :  13818
Wave data shape :  (8000,)
label data shape :  (1,)
label :  ['right']


### Spectrogram 으로 변환 후, 입력 데이터에 저장

* wav 데이터를 해석하는 방법 중 하나로, 일정 시간동안 wav 데이터 안의 다양한 주파수들이 얼마나 포함되어 있는 지를 보여줌  
* X축은 시간, Y축은 주파수  
* 해당 시간/주파수에서의 음파 강도에 따라 밝은색으로 표현  
* wav 데이터가 단위 시간만큼 Short Time Fourier Transform을 진행해 매 순간의 주파수 데이터들을 얻어서 Spectrogram을 완성

`pip install librosa`   <- `liblosa` 설치

In [3]:
import librosa

# 1차원 웨이브 데이터를 2차원 스펙토그램 데이터로 바꿔서 분류한다고 이해했는데 맞는지 모르겠음...
# 아무튼 일단 웨이브->스펙으로 바꾸는 함수 정의!!
def wav2spec(wav, fft_size=258): # spectrogram shape을 맞추기위해서 size 변형
    D = np.abs(librosa.stft(wav, n_fft=fft_size))
    return D
print("✅")

✅


In [4]:
spec = wav2spec(data1[rand]) # 웨이브 데이터를 스펙토그램으로 변환 (1-dim data -> 2-dim data)
print("Waveform shape : ",data1[rand].shape) #웨이브 데이터 8000개??
print("Spectrogram shape : ",spec.shape)
print(type(spec))

spec = np.array(spec, np.float32)

Waveform shape :  (8000,)
Spectrogram shape :  (130, 126)
<class 'numpy.ndarray'>


In [5]:
spec_data = []
#for wav in data1[0]:  <- 이건 행벡터 하나 뽑아서 예시로 보여준 것인듯... 
#모든 데이터를 스펙으로 바꿔야 한다면 아래 코드가 맞는듯.... 그래야 라벨 데이터랑 수가 맞으니까
for i in range(0,len(data1)):
    spec = wav2spec(data1[i])
    spec_data.append(spec)
    
spec_data = np.array(spec_data)
spec_data.shape

(50620, 130, 126)

### 라벨 데이터 처리

In [6]:
target_list = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go']

label_value = target_list
label_value.append('unknown')
label_value.append('silence')

print('LABEL : ', label_value)

new_label_value = dict()
for i, l in enumerate(label_value):
    new_label_value[l] = i
    
label_value = new_label_value

print('Indexed LABEL : ', new_label_value)

LABEL :  ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go', 'unknown', 'silence']
Indexed LABEL :  {'yes': 0, 'no': 1, 'up': 2, 'down': 3, 'left': 4, 'right': 5, 'on': 6, 'off': 7, 'stop': 8, 'go': 9, 'unknown': 10, 'silence': 11}


In [7]:
temp = []
for v in data2:
    temp.append(data2[rand])
label_data = np.array(temp)

label_data.shape

(50620, 1)

In [8]:
print(len(spec_data))
print(len(label_data))

50620
50620


### 학습을 위한 데이터 분리

* sklearn의 train_test_split 함수 이용 : train data와 test data 분리  
* test_size의 인자를 조절해주면, 설정해 준 값만큼 Test dataset의 비율을 조정 가능

In [9]:
# len(spec_data)와 len(label_data) 같으니까 실행되겠지??
from sklearn.model_selection import train_test_split

sr = 130
sc = 126

train_spec, test_spec, train_label, test_label = train_test_split(spec_data,
                                                                  label_data,
                                                                test_size=0.1,
                                                                shuffle=True)
#print(train_wav)

train_spec = train_spec.reshape([-1, sr, sc,1]) # add channel for CNN
test_spec = test_spec.reshape([-1, sr, sc,1])
print("✅")

✅


* 분리된 데이터셋 확인

In [10]:
print("train data : ", train_spec.shape)
print("train labels : ", train_label.shape)
print("test data : ", test_spec.shape)
print("test labels : ", test_label.shape)
print("✅")

train data :  (45558, 130, 126, 1)
train labels :  (45558, 1)
test data :  (5062, 130, 126, 1)
test labels :  (5062, 1)
✅


### Hyper-parameters setting

* 학습을 위한 하이퍼파라미터 설정  
* 모델 체크포인트 저장을 위한 체크포인트의 경로 설정  
* 후에 모델 체크포인트 Callback 함수를 설정하거나, 모델을 불러올 때 사용

In [11]:
batch_size = 32
max_epochs = 10

# the save point
checkpoint_dir = os.getenv('HOME')+'/aiffel/speech_recognition/models/wav'

checkpoint_dir

'/home/aiffel-dj42/aiffel/speech_recognition/models/wav'

### Data setting

* `tf.data.Dataset`을 이용하여 데이터셋 구성 (`Tensorflow`에 포함된 데이터셋 관리 패키지)  
* 위 패키지는 데이터셋 전처리, 배치처리 등을 쉽게 할 수 있도록 해 줌  
* `tf.data.Dataset.from_tensor_slices` 함수에 return 받길 원하는 데이터를 튜플 (data, label) 형태로 넣어서 사용  
  
* map 함수는 dataset이 데이터를 불러올때마다 동작시킬 데이터 전처리 함수를 매핑해 주는 역할  
* 첫번째 map 함수는 `from_tensor_slice` 에 입력한 튜플 형태로 데이터를 받으며 return 값으로 어떤 데이터를 반환할지 결정  
* map 함수는 중첩해서 사용 가능  
  
* 아래와 같이, map 함수에 넘겨줄 데이터 전처리 함수를 작성해보기

In [12]:
# 여기부턴 진짜 모르겠음... 뭔 소리인지.... 난 최선을 다했다.... ㅠㅠㅠ
def one_hot_label(wav, label):
    wav = 
    label = tf.one_hot(label_data, depth=12)
    return wav, label
print("✅")

✅


* `tf.data.Dataset` 함수 구성  
* `batch`는 dataset에서 제공하는 튜플 형태의 데이터를 얼마나 가져올지 결정하는 함수

In [13]:
import tensorflow as tf

# for train
train_dataset = tf.data.Dataset.from_tensor_slices((train_spec, train_label))
train_dataset = train_dataset.map(one_hot_label)
train_dataset = train_dataset.repeat().batch(batch_size=batch_size)
print(train_dataset)

# for test
test_dataset = tf.data.Dataset.from_tensor_slices((test_spec, test_label))
test_dataset = test_dataset.map(one_hot_label)
test_dataset = test_dataset.batch(batch_size=batch_size)
print(test_dataset)
print("✅")

TypeError: in user code:

    <ipython-input-12-740a6a900a6a>:3 one_hot_label  *
        label = tf.one_hot(label_data, depth=12)
    /home/aiffel-dj42/anaconda3/envs/aiffel/lib/python3.7/site-packages/tensorflow/python/util/dispatch.py:180 wrapper  **
        return target(*args, **kwargs)
    /home/aiffel-dj42/anaconda3/envs/aiffel/lib/python3.7/site-packages/tensorflow/python/ops/array_ops.py:4010 one_hot
        name)
    /home/aiffel-dj42/anaconda3/envs/aiffel/lib/python3.7/site-packages/tensorflow/python/ops/gen_array_ops.py:6200 one_hot
        off_value=off_value, axis=axis, name=name)
    /home/aiffel-dj42/anaconda3/envs/aiffel/lib/python3.7/site-packages/tensorflow/python/framework/op_def_library.py:578 _apply_op_helper
        param_name=input_name)
    /home/aiffel-dj42/anaconda3/envs/aiffel/lib/python3.7/site-packages/tensorflow/python/framework/op_def_library.py:61 _SatisfiesTypeConstraint
        ", ".join(dtypes.as_dtype(x).name for x in allowed_list)))

    TypeError: Value passed to parameter 'indices' has DataType string not in list of allowed values: uint8, int32, int64


## 2. 스펙토그램 classification 모델 구현

### Model

In [None]:
from tensorflow.keras import layers

input_tensor = layers.Input(shape=(sr, sc, 1))  # sc 추가해서 차원 맞춰주고

x = layers.Conv2D(32, (3,3), padding='same', activation='relu')(input_tensor)  # Conv1D는 Conv2D로, 9는 (3,3)으로 변경
x = layers.Conv2D(32, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(64, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(64, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)

x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Flatten()(x)
x = layers.Dense(256)(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)

output_tensor = layers.Dense(12)(x)

model_wav = tf.keras.Model(input_tensor, output_tensor)

model_wav.summary()

### Loss

* 현재 라벨이 될 수 있는 12개의 단어 class 가지고 있음  
* 해당 class를 구분하기 위해 `multi-class classification` 필요  
* 이를 수행하기 위한 `Loss`로 `Categorical Cross-Entropy loss`를 사용할 것임

In [None]:
optimizer=tf.keras.optimizers.Adam(1e-4)
model_wav.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
             optimizer=optimizer,
             metrics=['accuracy'])
print("✅")

### Training

#### Callback

* `model.fit` 함수를 이용할 때, `callback` 함수를 이용하여 학습 중간 중간 원하는 동작을 하도록 설정 가능  
* 모델을 재사용하기 위해 모델 가중치를 저장하는 `callback` 함수 추가  
  
* `Model Checkpoint callback`은 모델을 학습을 진행하며, `fit` 함수 내 다양한 인자를 지정해 모니터하며 동작하게 설정 가능  
* 현재 모델은 `validation loss`를 모니터하며, `loss`가 낮아지면 모델 파라미터를 저장하도록 구성되어 있음

In [None]:
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_dir,
                                                 save_weights_only=True,
                                                 monitor='val_loss',
                                                 mode='auto',
                                                 save_best_only=True,
                                                 verbose=1)
print("✅")

* 아래는 모델 학습 코드  
* 이전 스텝의 하이퍼파라미터 세팅에서 `batch_size=32, max_epochs=10`으로 세팅한 경우라면 30분 가량 소요될 것  
* 메모리 사용량에 주의하며 적절히 하이퍼파라미터 세팅을 조절  
* 메모리가 부족하다면 batch_size를 작게 조절해 주는게 좋음

In [None]:
#30분 내외 소요 (메모리 사용량에 주의해 주세요.)
history_wav = model_wav.fit(train_dataset, epochs=max_epochs,
                    steps_per_epoch=len(train_spec) // batch_size,
                    validation_data=test_dataset,
                    validation_steps=len(test_spec) // batch_size,
                    callbacks=[cp_callback]
                    )
print("✅")

### 학습 결과 Plot

* `model.fit` 함수는 학습 동안의 결과를 return  
* return 값을 기반으로 loss와 accuracy를 그래프로 표현  
* `fit` 함수에서 전달 받은 Loss와 Accuracy의 값을 이용해 모델이 어떻게 학습되고 있는지 확인 가능  
* `train loss`와 `val_loss`의 차이가 커지는 경우 오버피팅이 일어나는 것이기 때문에 이를 수정할 필요가 있음  
  
* 출력된 그래프를 기반으로 모델의 학습이 어떻게 진행됐는지 확인

In [None]:
import matplotlib.pyplot as plt

acc = history_wav.history['accuracy']
val_acc = history_wav.history['val_accuracy']

loss=history_wav.history['loss']
val_loss=history_wav.history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()
print("✅")

### Evaluation

* Test dataset을 이용해서 모델의 성능 평가  
  
* 실습삼아 `checkpoint callback` 함수가 저장한 weight를 다시 불러와서 테스트 준비

In [None]:
model_wav.load_weights(checkpoint_dir)
print("✅")

* Test data을 이용하여 모델의 예측값과 실제값이 얼마나 일치하는지 확인

In [None]:
results = model_wav.evaluate(test_dataset)
print("✅")

In [None]:
# loss
print("loss value: {:.3f}".format(results[0]))
# accuracy
print("accuracy value: {:.4f}%".format(results[1]*100))
print("✅")

### Model Test

* Test data 셋을 골라 직접 들어보고 모델의 예측이 맞는지 확인

In [None]:
inv_label_value = {v: k for k, v in label_value.items()}
batch_index = np.random.choice(len(test_wav), size=1, replace=False)

batch_xs = test_wav[batch_index]
batch_ys = test_label[batch_index]
y_pred_ = model_wav(batch_xs, training=False)

print("label : ", str(inv_label_value[batch_ys[0]]))

ipd.Audio(batch_xs.reshape(8000,), rate=8000)

* 위에서 확인해본 테스트셋의 라벨과 우리 모델의 실제 prediction 결과를 비교

In [None]:
if np.argmax(y_pred_) == batch_ys[0]:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Correct!)')
else:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Incorrect!)')
print("✅")

## 3. Skip-Connection model을 추가해보자

### Skip-Connection model 구현

* 기존의 모델을 `skip-connection`이 추가된 모델로 변경해 학습을 진행  
* 위쪽의 데이터가 레이어를 뛰어넘어 레이어를 통과한 값에 더해주는 형식으로 구현  
* Concat을 이용한 방식으로 구현  
  
* `tf.concat([#layer output tensor, layer output tensor#], axis=#)`  
  
* 우리가 사용하는 데이터가 1차원 audio 데이터이기 때문에 1차원 데이터를 처리하는 모델을 구성

In [None]:
input_tensor = layers.Input(shape=(sr, sc, 1))  # 여기 마찬가지로 sc 추가

x = layers.Conv2D(32, (3,3), padding='same', activation='relu')(input_tensor) # Conv1D는 Conv2D로, 9는 (3,3)으로 변경
x = layers.Conv2D(32, (3,3), padding='same', activation='relu')(x)
skip_1 = layers.MaxPool2D()(x)

x = layers.Conv2D(64, (3,3), padding='same', activation='relu')(skip_1)
x = layers.Conv2D(64, (3,3), padding='same', activation='relu')(x)
x = tf.concat([x, skip_1], -1)
skip_2 = layers.MaxPool2D()(x)

x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(skip_2)
x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = tf.concat([x, skip_2], -1)
skip_3 = layers.MaxPool2D()(x)

x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(skip_3)
x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = tf.concat([x, skip_3], -1)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Flatten()(x)
x = layers.Dense(256)(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)

output_tensor = layers.Dense(12)(x)

model_wav_skip = tf.keras.Model(input_tensor, output_tensor)

model_wav_skip.summary()

* 모델 구성만 달라졌을 뿐, 그 외 Task구성이나 데이터셋 구성, 훈련 과정은 동일

In [None]:
optimizer=tf.keras.optimizers.Adam(1e-4)
model_wav_skip.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
             optimizer=optimizer,
             metrics=['accuracy'])
print("✅")

In [None]:
# the save point
checkpoint_dir = os.getenv('HOME')+'/aiffel/speech_recognition/models/wav_skip'

cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_dir,
                                                 save_weights_only=True,
                                                 monitor='val_loss',
                                                 mode='auto',
                                                 save_best_only=True,
                                                 verbose=1)
print("✅")

In [None]:
#30분 내외 소요
history_wav_skip = model_wav_skip.fit(train_dataset, epochs=max_epochs,
                    steps_per_epoch=len(train_wav) // batch_size,
                    validation_data=test_dataset,
                    validation_steps=len(test_wav) // batch_size,
                    callbacks=[cp_callback]
                    )
print("✅")

### 학습결과의 시각화 및 evaluation

In [None]:
import matplotlib.pyplot as plt

acc = history_wav_skip.history['accuracy']
val_acc = history_wav_skip.history['val_accuracy']

loss=history_wav_skip.history['loss']
val_loss=history_wav_skip.history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()
print("✅")

In [None]:
# Evaluation 

model_wav_skip.load_weights(checkpoint_dir)
results = model_wav_skip.evaluate(test_dataset)

# loss
print("loss value: {:.3f}".format(results[0]))
# accuracy
print("accuracy value: {:.4f}%".format(results[1]*100))
print("✅")

In [None]:
# Test 

inv_label_value = {v: k for k, v in label_value.items()}
batch_index = np.random.choice(len(test_wav), size=1, replace=False)

batch_xs = test_wav[batch_index]
batch_ys = test_label[batch_index]
y_pred_ = model_wav_skip(batch_xs, training=False)

print("label : ", str(inv_label_value[batch_ys[0]]))

ipd.Audio(batch_xs.reshape(8000,), rate=8000)

### 위에서 확인해본 테스트셋의 라벨과 우리 모델의 실제 prediction 결과 비교

In [None]:
if np.argmax(y_pred_) == batch_ys[0]:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Correct!)')
else:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Incorrect!)')
print("✅")

## 4. 회고

데이터 준비하기부터 이해가 안 가서 전 과정이 이해가 안 간다.  
1차원 데이터를 2차원 데이터로 변경한 후 분류하는 과정이라고 이해했는데, 내가 지금 대체 데이터 수를 제대로 이해했는지도 모르겠고, 전반적인 이해도 프로젝트를 고민할 시간도 많이 부족했던 것 같다.  
다시 만나고 싶지 않은 음성데이터... 최선을 다했음... 미래의 나에게 나머지 부분을 맡긴다.