In [None]:
import itertools
import numpy as np
import os
import pandas as pd
import re
import tensorflow as tf
from tensorflow import keras
from keras.models import load_model
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, confusion_matrix

## Sessão comum para utilizção tanto com HTK quanto com FFT

## Inicialização de parâmetros globais

In [None]:
DATASET_ROOT = os.path.join(os.path.expanduser("~"),'dataSet/audio/agender_distribution/')
VALID_SPLIT = 0.1
SAMPLING_RATE = 8000
SHUFFLE_SEED = 43
BATCH_SIZE = 128
EPOCHS = 100

In [None]:
# Read train files and split class from file
train_file_list = pd.read_csv('7_class_train.csv')
train_audio_files = train_file_list['file']
train_classes = train_file_list['class']
train_audio_df = pd.DataFrame(train_audio_files)
train_class_df = pd.DataFrame(train_classes)

In [None]:
# Read test files and split class from file
test_file_list = pd.read_csv('7_class_test.csv')
test_audio_files = test_file_list['file']
test_classes = test_file_list['class']
test_audio_df = pd.DataFrame(test_audio_files)
test_class_df = pd.DataFrame(test_classes)

Get the list of audio file paths along with their corresponding labels

In [None]:
train_class_labels = list(train_classes.unique())

In [None]:
print("Age categories identified: {}".format(train_class_labels,))

In [None]:
# TODO: mostrar tabela identificando cada categotia

In [None]:
audio_paths = []
labels = []

In [None]:
for label, category in enumerate(train_class_labels):
    print("Processing category {}".format(category,))
    speaker_sample_paths = [os.path.join(DATASET_ROOT, train_audio_files[i]) for i in range(len(train_audio_files)) if train_classes[i] == category]
    audio_paths += speaker_sample_paths
    labels += [label] * len(speaker_sample_paths)

In [None]:
print("Found {} files belonging to {} classes.".format(len(audio_paths), len(train_class_labels)))

Shuffle

In [None]:
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

Split into training and validation

In [None]:
num_val_samples = int(VALID_SPLIT * len(audio_paths))
print("Using {} files for training.".format(len(audio_paths) - num_val_samples))
print("Using {} files for validation.".format(num_val_samples))

In [None]:
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]
valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

## Extração de características e preparação dos dados

## Para utilização com as features extraídas com a FFT

In [None]:
def FFT_paths_and_labels_to_dataset(audio_paths, labels):
    """Constructs a dataset of audios and labels."""
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(lambda x: path_to_audio(x))
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))

In [None]:
def path_to_audio(path):
    """Reads and decodes an audio file."""
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
    return audio

In [None]:
def audio_to_fft(audio):
    # Since tf.signal.fft applies FFT on the innermost dimension,
    # we need to squeeze the dimensions and then expand them again
    # after FFT
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64))
    fft = tf.expand_dims(fft, axis=-1)
    # Return the absolute value of the first half of the FFT
    # which represents the positive frequencies
    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])

In [None]:
# Create 2 datasets, one for training and the other for validation
train_ds = FFT_paths_and_labels_to_dataset(train_audio_paths, train_labels)
valid_ds = FFT_paths_and_labels_to_dataset(valid_audio_paths, valid_labels)

In [None]:
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(BATCH_SIZE)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)

Transform audio wave to the frequency domain

In [None]:
train_ds = train_ds.map(lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)
valid_ds = valid_ds.map(lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)

In [None]:
train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)
valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)

## Para utilização com as features extraídas com o HTK

In [None]:
# Parâmetros de normalização ao se utilizar as características extraídas com HTK
NORM_TRAIN = 200
NORM_TEST = 239

In [None]:
def path_to_mfc(path, dim):
    csv_file = tf.io.read_file(path, dim)
    tensor = tf.convert_to_tensor(csv_file)
    return tensor

In [None]:
def ReadCSV(file, dim):
    fin = file
    fout = pd.read_csv(fin, dtype=np.float32, header=None, nrows = dim)
    return fout

In [None]:
def HTK_paths_and_labels_to_dataset(audio_paths, labels, dim):
    for i in range(len(audio_paths)):
        audio_paths[i] = re.sub(r'.wav', '.mfc.csv', audio_paths[i])
    """Constructs a dataset of audios and labels."""
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(lambda x: path_to_mfc(x, dim))
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))

In [None]:
train_audio_paths_mfc = []
for i in range(len(train_audio_paths)):
    train_audio_paths_mfc.append(re.sub(r'\.wav', '.mfc.csv', train_audio_paths[i]))

In [None]:
train_audio_paths_mfc

In [None]:
# Create 2 datasets, one for training and the other for validation
train_ds = HTK_paths_and_labels_to_dataset(train_audio_paths[:5], train_labels[:5], NORM_TRAIN)
#valid_ds = HTK_paths_and_labels_to_dataset(valid_audio_paths, valid_labels, NORM_TRAIN)

In [None]:
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(BATCH_SIZE)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)

# MODEL DEFINITION

In [None]:
def residual_block(x, filters, conv_num=3, activation="relu"):
    # Shortcut
    s = keras.layers.Conv1D(filters, 1, padding="same")(x)
    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding="same")(x)
        x = keras.layers.Activation(activation)(x)
    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Add()([x, s])
    x = keras.layers.Activation(activation)(x)
    return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)

In [None]:
def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape=input_shape, name="input")

    x = residual_block(inputs, 16, 2)
    x = residual_block(x, 32, 2)
    x = residual_block(x, 64, 3)
    x = residual_block(x, 128, 3)
    x = residual_block(x, 128, 3)

    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dense(128, activation="relu")(x)

    outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)

    return keras.models.Model(inputs=inputs, outputs=outputs)

In [None]:
model = build_model((SAMPLING_RATE // 2, 1), len(train_class_labels))

In [None]:
model.summary()

In [None]:
# plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=True)

In [None]:
# Compile the model using Adam's default learning rate
model.compile(optimizer="Adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

In [None]:
# Add callbacks:
# 'EarlyStopping' to stop training when the model is not enhancing anymore
# 'ModelCheckPoint' to always keep the model that has the best val_accuracy
model_save_filename = "CNN_model.h5"
earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(model_save_filename, monitor="val_accuracy", save_best_only=True)

# TRAINING

In [None]:
history = model.fit(train_ds, epochs=EPOCHS, validation_data=valid_ds, callbacks=[earlystopping_cb, mdlcheckpoint_cb],)

In [None]:
print(model.evaluate(valid_ds))

# DEMONSTRATION

In [None]:
# Loading model trained previously
model = load_model('CNN_model_7_class.h5')

In [None]:
# Get the labels of test data
test_class_labels = list(test_classes.unique())
print("Age categories identified: {}".format(test_class_labels,))

In [None]:
# Get the list of test audio file paths along with their corresponding labels
test_audio_paths = []
test_labels = []
for label, category in enumerate(test_class_labels):
    print("Processing category {}".format(category,))
    speaker_sample_paths = [os.path.join(DATASET_ROOT, test_audio_files[i]) for i in range(len(test_audio_files)) if test_classes[i] == category]
    test_audio_paths += speaker_sample_paths
    test_labels += [label] * len(speaker_sample_paths)
print("Found {} files belonging to {} classes.".format(len(test_audio_paths), len(test_class_labels)))

In [None]:
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(test_audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(test_labels)

In [None]:
# Creating the test dataset
test_ds = paths_and_labels_to_dataset(test_audio_paths, test_labels)

In [None]:
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(BATCH_SIZE)

In [None]:
test_ds = test_ds.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
test_ds = test_ds.map(lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE)

In [None]:
audios_lista = []
labels_lista = []
y_pred_lista = []

In [None]:
for j in range(len()):    
    for audios, labels in test_ds.take(1):
        # Get the signal FFT
        ffts = audio_to_fft(audios)
        # Predict
        y_pred = model.predict(ffts)
        audios = audios.numpy()
        labels = labels.numpy()
        y_pred = np.argmax(y_pred, axis=-1)
    audios_lista.append(audios)
    labels_lista.append(labels)
    y_pred_lista.append(y_pred)

In [None]:
y_pred_list = []
for i in range(1, len(test_ds)):
    for audios, labels in test_ds.take(i):
        # Get the signal FFT
        ffts = audio_to_fft(audios)
        # Predict
        y_pred = model.predict(ffts)
    y_pred_list.append(y_pred)

In [None]:
y_pred_transf = []
for i in range(len(y_pred_list)):
    for j in range(len(y_pred)):
        y_pred_label = np.argmax(y_pred_list[i][j], axis=-1)
        y_pred_transf.append(y_pred_label)

In [None]:
audios_list = []
labels_list = []
for w in range(1, len(test_ds)):
    for audios, labels in test_ds.take(w):
        audios = audios.numpy()
        labels = labels.numpy()
    audios_list.append(audios)
    labels_list.append(labels)
flatten_labels_list = list(itertools.chain(*labels_list))

In [None]:
real_output = flatten_labels_list
predicted_output = y_pred_transf

Metrics

In [None]:
accuracy_score(real_output, predicted_output)

In [None]:
f1_score(real_output, predicted_output, average='macro'), f1_score(real_output, predicted_output, average='micro')

In [None]:
precision_score(real_output, predicted_output, average='macro'), precision_score(real_output, predicted_output, average='micro')

In [None]:
recall_score(real_output, predicted_output, average='macro'), recall_score(real_output, predicted_output, average='micro')