# Speaker Recognition

이 [예제](https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/audio/ipynb/speaker_recognition_using_cnn.ipynb#scrollTo=U2huHC_4Fycs)는 FFT(고속 푸리에 변환)를 통해 얻은 음성 녹음의 주파수 영역 표현에서 화자를 분류하는 모델을 만드는 방법을 보여줍니다.

__Process:__<br>
1. 화자의 이름을 레이블로 하여 서로 다른 화자의 음성 샘플 데이터 세트를 준비합니다.
2. 데이터를 보강하기 위해 이 샘플에 배경 잡음을 추가합니다.
3. 샘플의 FFT를 취합니다.
4. 잡음이 있는 FFT 음성 샘플이 주어지면 label을 예측하기 위해 1D convnet을 훈련합니다.

> Fourier Transform
    푸리에 변환은 기본적으로 시간 도메인을 주파수 도메인으로 변환시키는, 시간에 대한 함수를 구성하고 있는 주파수 성분으로 분해하는 과정입니다.

__Data__<br>
- 저명한 지도자들의 연설 데이터
- 각 폴더 명에 지도자들의 이름이 명시
- 각 오디오 파일은 1초 16000 샘플 레이트 PCM으로 인코딩
(참조:[https://ospace.tistory.com/101](https://ospace.tistory.com/101))
- background_noise 폴더에는 화자의 주변에서 찾을 수 있는 오디오가 포함되어 있습니다.(청중의 웃음이나 박수 등)

## Setup

In [1]:
import os
import shutil
import numpy as np

import tensorflow as tf
from tensorflow import keras

from pathlib import Path
from IPython.display import display, Audio

# Get the data from https://www.kaggle.com/kongaevans/speaker-recognition-dataset/download
# and save it to the 'Downloads' folder in your HOME directory
DATASET_ROOT = os.path.join(os.getenv('HOME') + "/aiffel/16000_pcm_speeches")

# The folders in which we will put the audio samples and the noise samples
AUDIO_SUBFOLDER = "audio"
NOISE_SUBFOLDER = "noise"

DATASET_AUDIO_PATH = os.path.join(DATASET_ROOT, AUDIO_SUBFOLDER)
DATASET_NOISE_PATH = os.path.join(DATASET_ROOT, NOISE_SUBFOLDER)

# Percentage of samples to use for validation
VALID_SPLIT = 0.1

# Seed to use when shuffling the dataset and the noise
SHUFFLE_SEED = 43

# The sampling rate to use.
# This is the one used in all of the audio samples.
# We will resample all of the noise to this sampling rate.
# This will also be the output size of the audio wave samples
# (since all samples are of 1 second long)
SAMPLING_RATE = 16000

# The factor to multiply the noise with according to:
#   noisy_sample = sample + noise * prop * scale
#      where prop = sample_amplitude / noise_amplitude
SCALE = 0.5

BATCH_SIZE = 128
EPOCHS = 100

## Data preparation

데이터 세트는 2개의 그룹으로 나누어진 7개의 폴더로 구성됩니다.

- **Speech samples**, 5명의 다른 스피커를 위한 5개의 폴더 포함. 각 폴더에는 1500개의 오디오 파일이 포함되어 있으며 각각 1초 길이이며 16000Hz에서 샘플링됩니다.
- **Background noise samples**, 2개의 폴더와 총 6개의 파일이 있습니다. 이 파일은 1초보다 깁니다(원래 16000Hz에서 샘플링되지 않았지만 16000Hz로 다시 샘플링합니다). 이 6개의 파일을 사용하여 훈련에 사용할 1초 길이의 노이즈 샘플 354개를 만듭니다.

이 2개의 범주를 2개의 폴더로 정렬해 보겠습니다.

- 화자별 음성 샘플 폴더가 모두 포함될 'audio' 폴더
- 모든 노이즈 샘플이 포함될 'noise' 폴더

In [2]:
# If folder `audio`, does not exist, create it, otherwise do nothing
if os.path.exists(DATASET_AUDIO_PATH) is False:
    os.makedirs(DATASET_AUDIO_PATH)

# If folder `noise`, does not exist, create it, otherwise do nothing
if os.path.exists(DATASET_NOISE_PATH) is False:
    os.makedirs(DATASET_NOISE_PATH)

for folder in os.listdir(DATASET_ROOT):
    if os.path.isdir(os.path.join(DATASET_ROOT, folder)):
        if folder in [AUDIO_SUBFOLDER, NOISE_SUBFOLDER]:
            # If folder is `audio` or `noise`, do nothing
            continue
        elif folder in ["other", "_background_noise_"]:
            # If folder is one of the folders that contains noise samples,
            # move it to the `noise` folder
            shutil.move(
                os.path.join(DATASET_ROOT, folder),
                os.path.join(DATASET_NOISE_PATH, folder),
            )
        else:
            # Otherwise, it should be a speaker folder, then move it to
            # `audio` folder
            shutil.move(
                os.path.join(DATASET_ROOT, folder),
                os.path.join(DATASET_AUDIO_PATH, folder),
            )

작업할 디렉토리들은 아래와 같은 구조를 가지게됩니다.

main_directory/<br>
...audio/<br>
......speaker_a/<br>
......speaker_b/<br>
......speaker_c/<br>
......speaker_d/<br>
......speaker_e/<br>
...noise/<br>
......other/<br>
......_background_noise_/<br>

## Noise preparation

노이즈 샘플을 초당 16000개 샘플 덩어리로 분할합니다.

In [3]:
# Get the list of all noise files
noise_paths = []
for subdir in os.listdir(DATASET_NOISE_PATH):
    subdir_path = Path(DATASET_NOISE_PATH) / subdir
    if os.path.isdir(subdir_path):
        noise_paths += [
            os.path.join(subdir_path, filepath)
            for filepath in os.listdir(subdir_path)
            if filepath.endswith(".wav")
        ]

print(
    "Found {} files belonging to {} directories".format(
        len(noise_paths), len(os.listdir(DATASET_NOISE_PATH))
    )
)

Found 6 files belonging to 2 directories


노이즈 샘플들을 16000 Hz로 리샘플링 해줍니다.

In [4]:
command = (
    "for dir in `ls -1 " + DATASET_NOISE_PATH + "`; do "
    "for file in `ls -1 " + DATASET_NOISE_PATH + "/$dir/*.wav`; do "
    "sample_rate=`ffprobe -hide_banner -loglevel panic -show_streams "
    "$file | grep sample_rate | cut -f2 -d=`; "
    "if [ $sample_rate -ne 16000 ]; then "
    "ffmpeg -hide_banner -loglevel panic -y "
    "-i $file -ar 16000 temp.wav; "
    "mv temp.wav $file; "
    "fi; done; done"
)

os.system(command)

# Split noise into chunks of 16000 each
def load_noise_sample(path):
    sample, sampling_rate = tf.audio.decode_wav(
        tf.io.read_file(path), desired_channels=1
    )
    if sampling_rate != SAMPLING_RATE:
        # Number of slices of 16000 each that can be generated from the noise sample
        slices = int(sample.shape[0] / SAMPLING_RATE)
        sample = tf.split(sample[: slices * SAMPLING_RATE], slices)
        return sample
    else:
        print("Sampling rate for {} is correct. Ignoring it".format(path))
        return None


noises = []
for path in noise_paths:
    sample = load_noise_sample(path)
    if sample:
        noises.extend(sample)
noises = tf.stack(noises)

print(
    "{} noise files were split into {} noise samples where each is {} sec. long".format(
        len(noise_paths), noises.shape[0], noises.shape[1] // SAMPLING_RATE
    )
)

6 noise files were split into 510 noise samples where each is 1 sec. long


In [31]:
sample

[<tf.Tensor: shape=(16000, 1), dtype=float32, numpy=
 array([[-3.0517578e-05],
        [-9.1552734e-05],
        [ 6.1035156e-05],
        ...,
        [ 3.0517578e-05],
        [ 9.1552734e-05],
        [ 3.0517578e-05]], dtype=float32)>,
 <tf.Tensor: shape=(16000, 1), dtype=float32, numpy=
 array([[ 2.7465820e-04],
        [ 3.0517578e-05],
        [ 3.0517578e-04],
        ...,
        [-3.0517578e-05],
        [ 0.0000000e+00],
        [-6.1035156e-05]], dtype=float32)>,
 <tf.Tensor: shape=(16000, 1), dtype=float32, numpy=
 array([[-9.1552734e-05],
        [-6.1035156e-05],
        [-9.1552734e-05],
        ...,
        [-9.1552734e-05],
        [ 6.1035156e-05],
        [ 0.0000000e+00]], dtype=float32)>,
 <tf.Tensor: shape=(16000, 1), dtype=float32, numpy=
 array([[-6.1035156e-05],
        [ 9.1552734e-05],
        [-9.1552734e-05],
        ...,
        [ 9.1552734e-05],
        [-1.2207031e-04],
        [ 6.1035156e-05]], dtype=float32)>,
 <tf.Tensor: shape=(16000, 1), dtype=flo

## Dataset generation

In [5]:
# 연설 오디오 파일및 폴더명(label)을 텐서로 만들어준다.
def paths_and_labels_to_dataset(audio_paths, labels):
    """Constructs a dataset of audios and labels."""
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths) # 음성을 tensor dataset으로 만들어준다
    audio_ds = path_ds.map(lambda x: path_to_audio(x)) # 16비트 PCM WAV 파일을 실수 텐서로 디코딩
    label_ds = tf.data.Dataset.from_tensor_slices(labels) # label을 tensor dataset으로 만들어준다
    return tf.data.Dataset.zip((audio_ds, label_ds))

In [6]:
# paths_and_labels_to_dataset 함수에서 사용
# 16비트 PCM WAV 파일을 실수 텐서로 디코딩 함수
def path_to_audio(path): 
    """Reads and decodes an audio file."""
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
    return audio

In [7]:
def add_noise(audio, noises=None, scale=0.5):
    if noises is not None:
        # 오디오와 같은 크기의 랜덤 텐서를 생성
        # 0은 noise stream sample의 수
        tf_rnd = tf.random.uniform(                                      # Outputs random values from a uniform distribution.
            (tf.shape(audio)[0],), 0, noises.shape[0], dtype=tf.int32    # uniform(shape, mainval, maxval)
        )
        noise = tf.gather(noises, tf_rnd, axis=0) # tf.gather(params, indices, validate_indices=None, axis=None)
                                                     # Gather slices from params axis axisaccording to indices. (deprecated arguments)

        # 오디오와 노이즈 사이에 진폭 비율 얻기
        prop = tf.math.reduce_max(audio, axis=1) / tf.math.reduce_max(noise, axis=1)
        prop = tf.repeat(tf.expand_dims(prop, axis=1), tf.shape(audio)[1], axis=1)

        # 크기가 조정된 노이즈를 오디오에 추가
        audio = audio + noise * prop * scale

    return audio

### tf.gather() - Gather slices from params axis `axis` according to indices
`params = tf.constant(['p0', 'p1', 'p2', 'p3', 'p4', 'p5'])
indices = [2, 0, 2, 5]
tf.gather(params, indices).numpy()`

![img](https://www.tensorflow.org/images/Gather.png)

In [9]:
def audio_to_fft(audio):
    # tf.signal.fft는 가장 안쪽 차원에 FFT를 적용하기 때문에 
    # 차원을 압축한 다음 
    # FFT 후에 다시 확장해야 합니다.
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    # Return the absolute value of the first half of the FFT
    # which represents the positive frequencies
    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])

In [10]:
# Get the list of audio file paths along with their corresponding labels

class_names = os.listdir(DATASET_AUDIO_PATH)
print("Our class names: {}".format(class_names,))

audio_paths = []
labels = []
for label, name in enumerate(class_names):
    print("Processing speaker {}".format(name,))
    dir_path = Path(DATASET_AUDIO_PATH) / name
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.endswith(".wav")
    ]
    audio_paths += speaker_sample_paths
    labels += [label] * len(speaker_sample_paths)

print(
    "Found {} files belonging to {} classes.".format(len(audio_paths), len(class_names))
)

# Shuffle
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

# Split into training and validation
num_val_samples = int(VALID_SPLIT * len(audio_paths))
print("Using {} files for training.".format(len(audio_paths) - num_val_samples))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]

print("Using {} files for validation.".format(num_val_samples))
valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

# Create 2 datasets, one for training and the other for validation
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)


# Add noise to the training set
train_ds = train_ds.map(
    lambda x, y: (add_noise(x, noises, scale=SCALE), y),
    num_parallel_calls=tf.data.AUTOTUNE,
)

# Transform audio wave to the frequency domain using `audio_to_fft`
train_ds = train_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
train_ds = train_ds.prefetch(tf.data.AUTOTUNE)

valid_ds = valid_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
valid_ds = valid_ds.prefetch(tf.data.AUTOTUNE)

Our class names: ['Benjamin_Netanyau', 'Magaret_Tarcher', 'Nelson_Mandela', 'Julia_Gillard', 'Jens_Stoltenberg']
Processing speaker Benjamin_Netanyau
Processing speaker Magaret_Tarcher
Processing speaker Nelson_Mandela
Processing speaker Julia_Gillard
Processing speaker Jens_Stoltenberg
Found 7501 files belonging to 5 classes.
Using 6751 files for training.
Using 750 files for validation.


## Model Definition

In [11]:
def residual_block(x, filters, conv_num=3, activation="relu"):
    # Shortcut
    s = keras.layers.Conv1D(filters, 1, padding="same")(x)
    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding="same")(x)
        x = keras.layers.Activation(activation)(x)
    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Add()([x, s])
    x = keras.layers.Activation(activation)(x)
    return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)


def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape=input_shape, name="input")

    x = residual_block(inputs, 16, 2)
    x = residual_block(x, 32, 2)
    x = residual_block(x, 64, 3)
    x = residual_block(x, 128, 3)
    x = residual_block(x, 128, 3)

    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)

    outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)

    return keras.models.Model(inputs=inputs, outputs=outputs)


model = build_model((SAMPLING_RATE // 2, 1), len(class_names))

model.summary()

# Compile the model using Adam's default learning rate
model.compile(
    optimizer="Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

# Add callbacks:
# 'EarlyStopping' to stop training when the model is not enhancing anymore
# 'ModelCheckPoint' to always keep the model that has the best val_accuracy
model_save_filename = "model.h5"

earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(
    model_save_filename, monitor="val_accuracy", save_best_only=True
)

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input (InputLayer)              [(None, 8000, 1)]    0                                            
__________________________________________________________________________________________________
conv1d_1 (Conv1D)               (None, 8000, 16)     64          input[0][0]                      
__________________________________________________________________________________________________
activation (Activation)         (None, 8000, 16)     0           conv1d_1[0][0]                   
__________________________________________________________________________________________________
conv1d_2 (Conv1D)               (None, 8000, 16)     784         activation[0][0]                 
______________________________________________________________________________________________

## Training

In [12]:
history = model.fit(
    train_ds,
    epochs=EPOCHS,
    validation_data=valid_ds,
    callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100


## Evaluation

In [14]:
print(model.evaluate(valid_ds))

[0.013559524901211262, 0.9946666955947876]


We get ~ 99% validation accuracy.

## Demonstration

몇 가지 샘플을 살펴보고 다음을 수행해보겠습니다.

- 화자를 예측합니다.
- 실제 label과 예측 label을 비교해봅니다.
- 샘플에 노이즈를 추가한 파일에 오디오를 들어봅니다.

In [17]:
SAMPLES_TO_DISPLAY = 10

test_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

test_ds = test_ds.map(lambda x, y: (add_noise(x, noises, scale=SCALE), y))

for audios, labels in test_ds.take(1):
    # Get the signal FFT
    ffts = audio_to_fft(audios)
    # Predict
    y_pred = model.predict(ffts)
    # Take random samples
    rnd = np.random.randint(0, BATCH_SIZE, SAMPLES_TO_DISPLAY)
    audios = audios.numpy()[rnd, :, :]
    labels = labels.numpy()[rnd]
    y_pred = np.argmax(y_pred, axis=-1)[rnd]

    for index in range(SAMPLES_TO_DISPLAY):
        # For every sample, print the true and predicted label
        # as well as run the voice with the noise
        print(
            "Speaker:\33{} {}\33[0m\tPredicted:\33{} {}\33[0m".format(
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[labels[index]],
                "[92m" if labels[index] == y_pred[index] else "[91m",
                class_names[y_pred[index]],
            )
        )
        display(Audio(audios[index, :, :].squeeze(), rate=SAMPLING_RATE))

Speaker:[92m Magaret_Tarcher[0m	Predicted:[92m Magaret_Tarcher[0m


Speaker:[92m Julia_Gillard[0m	Predicted:[92m Julia_Gillard[0m


Speaker:[91m Benjamin_Netanyau[0m	Predicted:[91m Jens_Stoltenberg[0m


Speaker:[92m Magaret_Tarcher[0m	Predicted:[92m Magaret_Tarcher[0m


Speaker:[92m Magaret_Tarcher[0m	Predicted:[92m Magaret_Tarcher[0m


Speaker:[92m Magaret_Tarcher[0m	Predicted:[92m Magaret_Tarcher[0m


Speaker:[92m Benjamin_Netanyau[0m	Predicted:[92m Benjamin_Netanyau[0m


Speaker:[92m Nelson_Mandela[0m	Predicted:[92m Nelson_Mandela[0m


Speaker:[92m Julia_Gillard[0m	Predicted:[92m Julia_Gillard[0m


Speaker:[92m Benjamin_Netanyau[0m	Predicted:[92m Benjamin_Netanyau[0m
