# 5-7. 프로젝트: Spectrogram classification 모델 구현

### [루브릭]

평가문항	상세기준
   1. 음성데이터를 2차원 Spectrogram 으로 변환하여 데이터셋을 구성하였다.  
        -스펙트로그램 시각화 및 train/test 데이터셋 구성이 정상진행되었다.
    
    
   2. 1,2차원 데이터를 처리하는 음성인식 모델이 정상 작동한다.  
        -스펙트로그램을 입력받은 모델이 학습과정에서 안정적으로 수렴하며, evaluation/test 단계를 무리없이 진행가능하다.
    
    
   3. 테스트셋 수행결과 음성인식 모델의 Accuracy가 일정 수준에 도달하였다.  
        -evaluation 결과 75% 이상의 정확도를 달성하는 모델이 하나 이상 존재한다.
    
    
### [학습 과정]
1. 데이터 처리와 분류
2. 학습을 위한 하이퍼파라미터 설정
3. 데이터셋 구성
4. 2차원 Spectrogram 데이터를 처리하는 모델 구성
5. 학습 후, 학습이 어떻게 진행됐는지 그래프로 출력
6. Test dataset을 이용해서 모델의 성능을 평가

### [결론 및 회고]

---
### 데이터 처리와 분류
* 라벨 데이터 처리하기
* sklearn의 train_test_split함수를 이용하여 train, test 분리

In [1]:
import numpy as np
import os

data_path = os.getenv("HOME")+'/aiffel/E05_SpeechToText/data/speech_wav_8000.npz'
speech_data = np.load(data_path)

print("✅")

✅


In [2]:
print("Wave data shape : ", speech_data["wav_vals"].shape)   # data
print("Label data shape : ", speech_data["label_vals"].shape) #features
print("✅")

Wave data shape :  (50620, 8000)
Label data shape :  (50620, 1)
✅


In [3]:
import IPython.display as ipd
import random

rand = random.randint(0, len(speech_data["wav_vals"])) #데이터를 랜덤하게 선택
print("rand num : ", rand)

sr = 8000   # 1초동안 재생되는 샘플의 갯수. sample rate.
data = speech_data["wav_vals"][rand]
print("Wave data shape : ", data.shape)
print("label : ", speech_data["label_vals"][rand])

ipd.Audio(data, rate=sr)

rand num :  13911
Wave data shape :  (8000,)
label :  ['right']


#### Label data 처리

In [4]:
target_list = ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go']

label_value = target_list
label_value.append('unknown')
label_value.append('silence')

print('LABEL : ', label_value)

new_label_value = dict()
for i, l in enumerate(label_value):
    new_label_value[l] = i
label_value = new_label_value

print('Indexed LABEL : ', new_label_value)

LABEL :  ['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go', 'unknown', 'silence']
Indexed LABEL :  {'yes': 0, 'no': 1, 'up': 2, 'down': 3, 'left': 4, 'right': 5, 'on': 6, 'off': 7, 'stop': 8, 'go': 9, 'unknown': 10, 'silence': 11}


In [5]:
temp = []

# Label data를 int로 바꿔주기
for v in speech_data["label_vals"]:
    temp.append(label_value[v[0]])
label_data = np.array(temp)

label_data

array([ 3,  3,  3, ..., 11, 11, 11])

#### train, test data split

In [6]:
from sklearn.model_selection import train_test_split

sr = 8000
X_train, X_test, y_train, y_test = train_test_split(speech_data["wav_vals"],
                                                   label_data,
                                                   test_size=0.1,
                                                    shuffle = True)
print(X_train)

X_train = X_train.reshape([-1,sr,1])  # add channel for CNN
X_test = X_test.reshape([-1, sr, 1])

print("✅")

[[-1.2383825e-04  8.5144420e-04  1.0770605e-03 ... -1.3129720e-03
  -1.0830584e-03  1.8275771e-04]
 [ 3.5200752e-03  5.1622144e-03 -2.6506241e-06 ...  2.8476422e-03
   2.4099741e-03 -1.4102276e-03]
 [ 8.2653237e-04  1.4297181e-03  9.5593289e-04 ...  1.1702370e-03
   1.3654147e-03  1.4000607e-03]
 ...
 [ 3.5496005e-03  5.1288921e-03  4.3123064e-04 ...  2.6259827e-03
   2.4437841e-03 -1.3070083e-03]
 [-1.1166940e-04 -1.2065521e-04 -2.0386340e-04 ...  5.3304429e-05
   7.3213530e-05  1.6120664e-04]
 [-1.1086896e-02 -1.2538433e-03 -1.2884424e-02 ... -6.7341099e-03
  -2.9162155e-02 -5.5060279e-02]]
✅


In [7]:
print("train data : ", X_train.shape)
print("train labels : ", y_train.shape)
print("test data : ", X_test.shape)
print("test labels : ", y_test.shape)
print("✅")

train data :  (45558, 8000, 1)
train labels :  (45558,)
test data :  (5062, 8000, 1)
test labels :  (5062,)
✅


### 학습을 위한 하이퍼파라미터 설정

In [8]:
batch_size = 32
max_epochs = 10

# the save point
checkpoint_dir = os.getenv('HOME')+'/aiffel/E05_SpeechToText/models/wav'

checkpoint_dir

'/home/ssac18/aiffel/E05_SpeechToText/models/wav'

### 데이터셋 구성

In [9]:
def one_hot_label(wav, label):
    label = tf.one_hot(label, depth=12)
    return wav, label
print("✅")

✅


In [10]:
import tensorflow as tf

# for train
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.map(one_hot_label)
train_dataset = train_dataset.repeat().batch(batch_size=batch_size)
print(train_dataset)

# for test
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_dataset = test_dataset.map(one_hot_label)
test_dataset = test_dataset.batch(batch_size=batch_size)
print(test_dataset)
print("✅")

<BatchDataset shapes: ((None, 8000, 1), (None, 12)), types: (tf.float32, tf.float32)>
<BatchDataset shapes: ((None, 8000, 1), (None, 12)), types: (tf.float32, tf.float32)>
✅


In [11]:
del speech_data

### 2차원 Spectrogram 데이터를 처리하는 모델 구성
# 1. 기본 Model ver.

In [22]:
from tensorflow.keras import layers

input_tensor = layers.Input(shape=(sr, 24, 1))   

x = layers.Conv2D(32, (3,3), padding='same', activation='relu')(input_tensor)
x = layers.Conv2D(32, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2D(64, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(64, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(128, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = layers.Conv2D(256, (3,3), padding='same', activation='relu')(x)
x = layers.MaxPool2D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Flatten()(x)
x = layers.Dense(256)(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)

output_tensor = layers.Dense(12)(x)             # 최종 출력은 12차원. 타겟 개수가 12개니까!

model_wav = tf.keras.Model(input_tensor, output_tensor)

model_wav.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_11 (InputLayer)        [(None, 8000, 24, 1)]     0         
_________________________________________________________________
conv2d_52 (Conv2D)           (None, 8000, 24, 32)      320       
_________________________________________________________________
conv2d_53 (Conv2D)           (None, 8000, 24, 32)      9248      
_________________________________________________________________
max_pooling2d_22 (MaxPooling (None, 4000, 12, 32)      0         
_________________________________________________________________
dropout_14 (Dropout)         (None, 4000, 12, 32)      0         
_________________________________________________________________
conv2d_54 (Conv2D)           (None, 4000, 12, 64)      18496     
_________________________________________________________________
conv2d_55 (Conv2D)           (None, 4000, 12, 64)      36928 

In [23]:
optimizer=tf.keras.optimizers.Adam(1e-4)          # adam optimizer 
model_wav.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),       # categorical loss
             optimizer=optimizer,
             metrics=['accuracy'])

✅


In [24]:
# 모델 가중치를 저장
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_dir,
                                                 save_weights_only=True,
                                                 monitor='val_loss',
                                                 mode='auto',
                                                 save_best_only=True,
                                                 verbose=1)

In [None]:
history_wav = model_wav.fit(train_dataset, epochs=max_epochs,
                    steps_per_epoch=len(train_wav) // batch_size,
                    validation_data=test_dataset,
                    validation_steps=len(test_wav) // batch_size,
                    callbacks=[cp_callback]
                    )

In [None]:
import matplotlib.pyplot as plt

acc = history_wav.history['accuracy']
val_acc = history_wav.history['val_accuracy']

loss=history_wav.history['loss']
val_loss=history_wav.history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

#### evaluation

In [None]:
model_wav.load_weights(checkpoint_dir)

In [None]:
results = model_wav.evaluate(test_dataset)

In [None]:
# loss
print("loss value: {:.3f}".format(results[0]))
# accuracy
print("accuracy value: {:.4f}%".format(results[1]*100))

#### Model set

In [None]:
inv_label_value = {v: k for k, v in label_value.items()}
batch_index = np.random.choice(len(test_wav), size=1, replace=False)

batch_xs = test_wav[batch_index]
batch_ys = test_label[batch_index]
y_pred_ = model_wav(batch_xs, training=False)

print("label : ", str(inv_label_value[batch_ys[0]]))

ipd.Audio(batch_xs.reshape(8000,), rate=8000)

In [None]:
if np.argmax(y_pred_) == batch_ys[0]:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Correct!)')
else:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Incorrect!)')

# 2. Skip Connection Model ver.

In [None]:
input_tensor = layers.Input(shape=(sr, 24, 1))

x = layers.Conv1D(32, (3,3), padding='same', activation='relu')(input_tensor)
x = layers.Conv1D(32, (3,3), padding='same', activation='relu')(x)
skip_1 = layers.MaxPool1D()(x)

x = layers.Conv1D(64, (3,3), padding='same', activation='relu')(skip_1)
x = layers.Conv1D(64, (3,3), padding='same', activation='relu')(x)
x = tf.concat([x, skip_1], -1)
skip_2 = layers.MaxPool1D()(x)

x = layers.Conv1D(128, (3,3), padding='same', activation='relu')(skip_2)
x = layers.Conv1D(128, (3,3), padding='same', activation='relu')(x)
x = layers.Conv1D(128, (3,3), padding='same', activation='relu')(x)
x = tf.concat([x, skip_2], -1)
skip_3 = layers.MaxPool1D()(x)

x = layers.Conv1D(256, (3,3), padding='same', activation='relu')(skip_3)
x = layers.Conv1D(256, (3,3), padding='same', activation='relu')(x)
x = layers.Conv1D(256, (3,3), padding='same', activation='relu')(x)
x = tf.concat([x, skip_3], -1)
x = layers.MaxPool1D()(x)
x = layers.Dropout(0.3)(x)

x = layers.Flatten()(x)
x = layers.Dense(256)(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)

output_tensor = layers.Dense(12)(x)

model_wav_skip = tf.keras.Model(input_tensor, output_tensor)

model_wav_skip.summary()

In [None]:
optimizer=tf.keras.optimizers.Adam(1e-4)
model_wav_skip.compile(loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
             optimizer=optimizer,
             metrics=['accuracy'])

In [None]:
# the save point
checkpoint_dir = os.getenv('HOME')+'/aiffel/E05_SpeechToText/models/wav_skip'

cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_dir,
                                                 save_weights_only=True,
                                                 monitor='val_loss',
                                                 mode='auto',
                                                 save_best_only=True,
                                                 verbose=1)

In [None]:
history_wav_skip = model_wav_skip.fit(train_dataset, epochs=max_epochs,
                    steps_per_epoch=len(train_wav) // batch_size,
                    validation_data=test_dataset,
                    validation_steps=len(test_wav) // batch_size,
                    callbacks=[cp_callback]
                    )

In [None]:
import matplotlib.pyplot as plt

acc = history_wav_skip.history['accuracy']
val_acc = history_wav_skip.history['val_accuracy']

loss=history_wav_skip.history['loss']
val_loss=history_wav_skip.history['val_loss']

epochs_range = range(len(acc))

plt.figure(figsize=(8, 8))
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Training Accuracy')
plt.plot(epochs_range, val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.title('Training and Validation Accuracy')

plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Training Loss')
plt.plot(epochs_range, val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.title('Training and Validation Loss')
plt.show()

In [None]:
# Evaluation 
model_wav_skip.load_weights(checkpoint_dir)
results = model_wav_skip.evaluate(test_dataset)

# loss
print("loss value: {:.3f}".format(results[0]))
# accuracy
print("accuracy value: {:.4f}%".format(results[1]*100))


In [None]:
# Test 

inv_label_value = {v: k for k, v in label_value.items()}
batch_index = np.random.choice(len(test_wav), size=1, replace=False)

batch_xs = test_wav[batch_index]
batch_ys = test_label[batch_index]
y_pred_ = model_wav_skip(batch_xs, training=False)

print("label : ", str(inv_label_value[batch_ys[0]]))

ipd.Audio(batch_xs.reshape(8000,), rate=8000)

In [None]:
if np.argmax(y_pred_) == batch_ys[0]:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Correct!)')
else:
    print("y_pred: " + str(inv_label_value[np.argmax(y_pred_)]) + '(Incorrect!)')
