In [332]:
import torch
import torch.nn as nn
import torch.optim as optim
import os
import pandas as pd
import librosa
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import time

In [309]:
# Đường dẫn đến thư mục chứa dữ liệu training và validation
root_path = 'D:\\Study\\Deep_Learning\\DRL\\TESS\\'

In [310]:
def extract_mfcc(file_path, n_mfcc=13, n_fft=2048, hop_length=512, sample_rate=22050):
    # Load file âm thanh
    signal, sr = librosa.load(path=file_path, sr=sample_rate)
    
    # Trích xuất MFCC
    mfccs = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)
    
    # Chuyển đổi sang dạng decibel
    mfccs_db = librosa.power_to_db(S=mfccs, ref=np.max)
    
    return mfccs_db


In [311]:
# Danh sách các nhãn
labels = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']


In [312]:
# Tạo một DataFrame để lưu trữ đặc trưng MFCC và nhãn tương ứng
data = {'mfcc_features': [], 'labels': []}
# Sử dụng 'root_path' để duyệt qua các thư mục
for folder in os.listdir(root_path):
    folder_path = os.path.join(root_path, folder)
    if os.path.isdir(folder_path):  # Kiểm tra xem có phải là thư mục không
        for file_name in os.listdir(folder_path):
            if file_name.endswith('.wav'):
                audio_file_path = os.path.join(folder_path, file_name)

                # Preprocess and extract features from the audio signal
                features = extract_mfcc(audio_file_path)
                features = np.mean(features.T, axis=0)
                
                # Lưu đặc trưng MFCC và nhãn vào DataFrame
                data['mfcc_features'].append(features)
                data['labels'].append(folder)

# Tạo DataFrame từ dữ liệu
df = pd.DataFrame(data)

In [313]:
# In ra để kiểm tra
print(df.head())

class CustomRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(CustomRNN, self).__init__()
        self.hidden_size = hidden_size
        
        # Khai báo các layer của mạng RNN
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden):
        # Đưa đầu vào x qua lớp RNN
        out, hidden = self.rnn(x, hidden)
        
        # Lấy output tại thời điểm cuối cùng
        out = self.fc(out[:, -1, :])  # Lấy output tại thời điểm cuối cùng của mỗi batch
        
        return out, hidden
    
    def init_hidden(self, batch_size):
        # Khởi tạo hidden state ban đầu
        return torch.zeros(1, batch_size, self.hidden_size)

                                       mfcc_features     labels
0  [-80.0, -13.448291, -56.086445, -50.102047, -4...  OAF_angry
1  [-80.0, -15.355418, -47.254032, -53.20676, -56...  OAF_angry
2  [-80.0, -27.663326, -47.71378, -40.9348, -42.1...  OAF_angry
3  [-80.0, -17.30403, -45.760113, -55.309486, -46...  OAF_angry
4  [-80.0, -15.00253, -41.794884, -33.471375, -44...  OAF_angry


In [314]:
# Định nghĩa kích thước đầu vào, hidden state và đầu ra của mạng
input_size = len(df['mfcc_features'].values[0])  # Số chiều của đầu vào
hidden_size = 128  # Kích thước của hidden state
output_size = len(labels)  # Số lớp đầu ra


In [315]:
class CustomRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(CustomRNN, self).__init__()
        self.hidden_size = hidden_size
        
        # Khai báo các layer của mạng RNN
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x, hidden):
        # Đưa đầu vào x qua lớp RNN
        out, hidden = self.rnn(x, hidden)
        
        # Lấy output tại thời điểm cuối cùng
        out = self.fc(out[:, -1, :])  # Lấy output tại thời điểm cuối cùng của mỗi batch
        
        return out, hidden
    
    def init_hidden(self, batch_size):
        # Khởi tạo hidden state ban đầu
        return torch.zeros(1, batch_size, self.hidden_size)

In [316]:
# Khởi tạo mô hình
rnn = CustomRNN(input_size, hidden_size, output_size)


In [317]:
# Giả định có dữ liệu và nhãn
batch_size = 32
seq_length = 20
input_data = torch.randn(batch_size, seq_length, input_size)  # batch_size x seq_length x input_size
hidden = rnn.init_hidden(batch_size)  # Khởi tạo hidden state


In [318]:
# Forward pass
output, hidden = rnn(input_data, hidden)
print("Output shape:", output.shape)  # In ra kích thước của output


Output shape: torch.Size([32, 7])


In [319]:
class EarlyStopping:
    """Early stops the training if validation loss doesn't improve after a given patience."""
    def __init__(self, patience=7, verbose=False, delta=0, path='best_model.pth'):
        """
        Args:
            patience (int): How long to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement.
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                            Default: 0
            path (str): Path for the checkpoint to be saved to.
                            Default: 'best_model.pth'
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''Saves model when validation loss decrease.'''
        if self.verbose:
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [320]:
# Chuyển mô hình và dữ liệu lên GPU nếu có sẵn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
rnn.to(device)

CustomRNN(
  (rnn): RNN(13, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=7, bias=True)
)

In [321]:
# Định nghĩa kích thước tập huấn luyện (ví dụ: 80% dữ liệu) và tập kiểm tra (20% dữ liệu)
train_size = 0.8
test_size = 1 - train_size

In [322]:
# Chia dữ liệu và nhãn thành tập huấn luyện và tập kiểm tra
X_train, X_test, y_train, y_test = train_test_split(df['mfcc_features'], df['labels'], train_size=train_size, test_size=test_size, random_state=42)


In [323]:
# Chuyển đổi dữ liệu từ DataFrame thành mảng NumPy
X_train_array = np.stack(X_train.values)
X_test_array = np.stack(X_test.values)


In [324]:
# Chuyển dữ liệu thành Tensor và chuyển về kiểu dữ liệu float32
X_train_tensor = torch.tensor(X_train_array, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_array, dtype=torch.float32)


In [325]:
# Chuyển nhãn sang dạng số và chuyển thành Tensor
y_train_tensor = torch.tensor(pd.Categorical(y_train, categories=labels).codes, dtype=torch.int64)
y_test_tensor = torch.tensor(pd.Categorical(y_test, categories=labels).codes, dtype=torch.int64)


In [326]:
# Tạo DataLoader từ TensorDataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)


In [327]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)


In [328]:
def init_hidden(self, batch_size):
    return torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)

In [333]:
# Khởi tạo hidden state cho toàn bộ dữ liệu huấn luyện và kiểm tra
train_hidden = rnn.init_hidden(len(X_train_tensor)).to(device)
test_hidden = rnn.init_hidden(len(X_test_tensor)).to(device)

# Ví dụ về quá trình huấn luyện
num_epochs = 10
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(rnn.parameters(), lr=0.001)
early_stopping = EarlyStopping(patience=3)  # Khởi tạo early stopping

for epoch in range(num_epochs):
    rnn.train()
    running_loss = 0.0
    
    # Sử dụng hidden state đã khởi tạo cho toàn bộ dữ liệu huấn luyện
    hidden = train_hidden.clone().detach()
    
    with tqdm(total=len(train_loader), desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch') as pbar:
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            
            outputs, hidden = rnn(inputs, hidden)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            pbar.update(1)
            pbar.set_postfix({'Loss': running_loss/len(train_loader)})
    
    # Đánh giá mô hình trên tập validation và kiểm tra early stopping
    rnn.eval()
    val_loss = 0.0
    with torch.no_grad():
        hidden = test_hidden.clone().detach()
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs, hidden = rnn(inputs, hidden)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
        val_loss /= len(test_loader)
        print(f'Validation Loss: {val_loss}')

        # Kiểm tra early stopping
        early_stopping(val_loss, rnn)
        if early_stopping.early_stop:
            print("Early stopping")
            break

# Load lại mô hình tốt nhất trước khi early stopping
rnn.load_state_dict(torch.load('best_model.pth'))

Epoch 1/10:   0%|          | 0/70 [00:00<?, ?batch/s]


RuntimeError: For unbatched 2-D input, hx should also be 2-D but got 3-D tensor