In [14]:
! pip install "numpy<2.0"



In [15]:
! pip install --upgrade librosa
! pip install --upgrade soxr
! python.exe -m pip install --upgrade pip

Collecting pip
  Downloading pip-24.1.2-py3-none-any.whl.metadata (3.6 kB)
Downloading pip-24.1.2-py3-none-any.whl (1.8 MB)
   ---------------------------------------- 0.0/1.8 MB ? eta -:--:--
    --------------------------------------- 0.0/1.8 MB 660.6 kB/s eta 0:00:03
    --------------------------------------- 0.0/1.8 MB 660.6 kB/s eta 0:00:03
    --------------------------------------- 0.0/1.8 MB 245.8 kB/s eta 0:00:08
   -- ------------------------------------- 0.1/1.8 MB 476.3 kB/s eta 0:00:04
   -- ------------------------------------- 0.1/1.8 MB 476.3 kB/s eta 0:00:04
   -- ------------------------------------- 0.1/1.8 MB 400.9 kB/s eta 0:00:05
   --- ------------------------------------ 0.2/1.8 MB 482.7 kB/s eta 0:00:04
   --- ------------------------------------ 0.2/1.8 MB 482.7 kB/s eta 0:00:04
   ---- ----------------------------------- 0.2/1.8 MB 461.0 kB/s eta 0:00:04
   ---- ----------------------------------- 0.2/1.8 MB 492.1 kB/s eta 0:00:04
   ----- ------------------

In [3]:
import os
import numpy as np
import librosa
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

In [17]:
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'  # Синхронизация CUDA

In [4]:
def collect_file_paths_and_labels(dataset_path):
    file_paths = []
    labels = []
    emotion_folders = os.listdir(dataset_path)
    
    for emotion in emotion_folders:
        emotion_path = os.path.join(dataset_path, emotion)
        if os.path.isdir(emotion_path):
            for file_name in os.listdir(emotion_path):
                if file_name.endswith('.wav'):
                    file_paths.append(os.path.join(emotion_path, file_name))
                    labels.append(emotion)  # Название папки используется как метка

    return file_paths, labels

dataset_path = 'audios'
file_paths, labels = collect_file_paths_and_labels(dataset_path)

In [19]:
def find_max_len(file_paths):
    max_len = 0
    for file_path in file_paths:
        y, sr = librosa.load(file_path, sr=None)
        spect = librosa.feature.melspectrogram(y=y, sr=sr)
        if spect.shape[1] > max_len:
            max_len = spect.shape[1]
    return max_len

max_len = find_max_len(file_paths)
max_len

284

In [20]:
class AudioDataset(Dataset):
    def __init__(self, file_paths, labels, max_len, transform=None):
        self.file_paths = file_paths
        self.max_len = max_len
        self.transform = transform
        
        self.labels = [str(label) for label in labels]
        unique_labels = sorted(set(self.labels))
        self.label_idx = {label: idx for idx, label in enumerate(unique_labels)}
        self.indexed_labels = [self.label_idx[label] for label in self.labels]
        
        print("Original Labels:", self.labels)
        print("Label Index Map:", self.label_idx)
        print("Indexed Labels:", self.indexed_labels)
        
        print(len(self.file_paths), len(self.labels))
        assert len(self.file_paths) == len(self.labels)

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        label = self.labels[idx]

        # Загрузка аудио файла и вычисление спектрограммы
        y, sr = librosa.load(file_path, sr=None)
        spect = librosa.feature.melspectrogram(y=y, sr=sr)
        spect = librosa.power_to_db(spect, ref=np.max)
        spect = np.expand_dims(spect, axis=0)  # Добавление канала для CNN

        if self.max_len is not None:
            if spect.shape[-1] < self.max_len:
                pad_width = self.max_len - spect.shape[-1]
                spect = np.pad(spect, ((0, 0), (0, 0), (0, pad_width)), mode='constant')
            elif spect.shape[-1] > self.max_len:
                spect = spect[:, :, :self.max_len]

        if self.transform:
            spect = self.transform(spect)

        label_idx = self.label_idx[label]
        return torch.tensor(spect, dtype=torch.float32), torch.tensor(label_idx, dtype=torch.long)


def create_dataloader(file_paths, labels, batch_size, max_len=None,  transform=None):
    dataset = AudioDataset(file_paths, labels, max_len, transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    return dataloader

batch_size = 32
dataloader = create_dataloader(file_paths, labels, batch_size, max_len)

Original Labels: ['angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', '

In [5]:
print(labels)

['angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 'angry', 

In [21]:
for batch in dataloader:
    spectrograms, batch_labels = batch
    print(spectrograms.shape, batch_labels)
    break

torch.Size([32, 1, 128, 284]) tensor([0, 5, 3, 5, 4, 6, 2, 1, 5, 6, 4, 6, 6, 6, 3, 6, 2, 1, 6, 3, 2, 6, 6, 2,
        3, 4, 4, 0, 0, 5, 6, 5])


In [22]:
class EmotionRecognitionModel(nn.Module):
    def __init__(self, num_classes):
        super(EmotionRecognitionModel, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        
        # cnn_output_size = 64 * 32
        self.cnn_output_size = 64 * max_len // 4
        
        self.rnn = nn.LSTM(input_size=self.cnn_output_size, hidden_size=128, num_layers=2, batch_first=True, bidirectional=True)

        self.fc = nn.Sequential(
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.cnn(x)
        x = x.permute(0, 2, 3, 1)
        x = x.reshape(x.size(0), x.size(1), -1)
        x, _ = self.rnn(x)
        x = x[:, -1, :]
        x = self.fc(x)
        return x

num_classes = 7
model = EmotionRecognitionModel(num_classes)

In [23]:
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# # device = torch.device("cpu")
# model.to(device)

# criterion = nn.CrossEntropyLoss()
# # optimizer = optim.Adam(model.parameters(), lr=0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.001)

# num_epochs = 100

# running_loss = 0.0
# correct_predictions = 0
# total_predictions = 0

# with torch.no_grad():
#     for spects, labels in dataloader:
#         spects = spects.to(device)
#         labels = labels.to(device)
        
#         outputs = model(spects)
#         loss = criterion(outputs, labels)
#         running_loss += loss.item() * spects.size(0)
        
#         _, predicted = torch.max(outputs, 1)
#         correct_predictions += (predicted == labels).sum().item()
#         total_predictions += labels.size(0)
    
#     epoch_loss = running_loss / len(dataloader.dataset)
#     accuracy = correct_predictions / total_predictions
    
#     print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.4f}, Val Loss: {val_epoch_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

        

# for epoch in range(num_epochs):
#     model.train()
#     running_loss = 0.0
#     for spects, labels in dataloader:
#         spects = spects.to(device)
#         labels = labels.to(device)
        
#         # print(f"Batch spects: {spects.shape}, labels: {labels.shape}")
#         # print(f"Labels min: {labels.min()}, labels max: {labels.max()}")
        
#         optimizer.zero_grad()
        
#         outputs = model(spects)
#         # print(f"Outputs: {outputs.shape}")
#         # print(outputs)
#         # print(f"Labels: {labels}")
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
        
#         running_loss += loss.item() * spects.size(0)
    
#     epoch_loss = running_loss / len(dataloader.dataset)
#     print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')


In [24]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

num_epochs = 100  # Максимальное количество эпох
patience = 10     # Количество эпох без улучшений для раннего завершения обучения

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

best_val_loss = float('inf')
patience_counter = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for spects, labels in dataloader:
        spects = spects.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(spects)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * spects.size(0)
        
        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)

    epoch_loss = running_loss / len(dataloader.dataset)
    accuracy = correct_predictions / total_predictions

    # # Validation step (assuming you have a validation dataloader)
    # model.eval()
    # val_running_loss = 0.0
    # val_correct_predictions = 0
    # val_total_predictions = 0

    # with torch.no_grad():
    #     for val_spects, val_labels in val_dataloader:
    #         val_spects = val_spects.to(device)
    #         val_labels = val_labels.to(device)

    #         val_outputs = model(val_spects)
    #         val_loss = criterion(val_outputs, val_labels)
    #         val_running_loss += val_loss.item() * val_spects.size(0)

    #         _, val_predicted = torch.max(val_outputs, 1)
    #         val_correct_predictions += (val_predicted == val_labels).sum().item()
    #         val_total_predictions += val_labels.size(0)

    # val_epoch_loss = val_running_loss / len(val_dataloader.dataset)
    # val_accuracy = val_correct_predictions / val_total_predictions
    
    val_epoch_loss = 0.0
    val_accuracy = 0.0

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.4f}, Val Loss: {val_epoch_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

    # # Early stopping
    # if val_epoch_loss < best_val_loss:
    #     best_val_loss = val_epoch_loss
    #     patience_counter = 0
    #     # Save the best model if needed
    #     torch.save(model.state_dict(), 'best_model.pth')
    # else:
    #     patience_counter += 1

    # if patience_counter >= patience:
    #     print("Early stopping triggered")
    #     break


Epoch 1/100, Loss: 1.9506, Accuracy: 0.1268, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 2/100, Loss: 1.9467, Accuracy: 0.1454, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 3/100, Loss: 1.9432, Accuracy: 0.1629, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 4/100, Loss: 1.9408, Accuracy: 0.1843, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 5/100, Loss: 1.9391, Accuracy: 0.1911, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 6/100, Loss: 1.9362, Accuracy: 0.2032, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 7/100, Loss: 1.9350, Accuracy: 0.2068, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 8/100, Loss: 1.9322, Accuracy: 0.2261, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 9/100, Loss: 1.9297, Accuracy: 0.2364, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 10/100, Loss: 1.9274, Accuracy: 0.2471, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 11/100, Loss: 1.9246, Accuracy: 0.2779, Val Loss: 0.0000, Val Accuracy: 0.0000
Epoch 12/100, Loss: 1.9219, Accuracy: 0.2864, Val Loss: 0.0000, Val Accura

In [None]:
torch.save(model.state_dict(), 'best_model.pth')