In [1]:
!pip install jiwer -q
!pip install torchaudio -q
!pip install git+https://github.com/huggingface/transformers.git -q
!pip install librosa==0.8.1 -q
!pip install git+https://github.com/huggingface/datasets.git -q
!pip install transformers==4.21.1 -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/3.2 MB[0m [31m4.0 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━[0m [32m2.1/3.2 MB[0m [31m30.3 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m30.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m302.0/302.0 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m73.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0

In [2]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
from transformers import AutoConfig, Wav2Vec2Processor

import librosa
import IPython.display as ipd
import numpy as np
import pandas as pd

In [3]:
# load model
!mkdir -p /content/data
file_id = "1CeUHCmCKCnjlibvP6MPn4WqyOFAaMrfv"
!gdown --id $file_id -O /content/data/model_1.zip
!unzip -q /content/data/model_1.zip -d /content/data/

Downloading...
From: https://drive.google.com/uc?id=1CeUHCmCKCnjlibvP6MPn4WqyOFAaMrfv
To: /content/data/model_1.zip
100% 1.17G/1.17G [00:20<00:00, 56.0MB/s]


In [4]:
!mkdir -p /content/test_data
file_id = "1C-XEnwmfvxGjDDvrsI4M8jXuBPE_AOr_"
!gdown --id $file_id -O /content/test_data/test_emotion_dataset.zip
!unzip -q /content/test_data/test_emotion_dataset.zip -d /content/test_data/

Downloading...
From: https://drive.google.com/uc?id=1C-XEnwmfvxGjDDvrsI4M8jXuBPE_AOr_
To: /content/test_data/test_emotion_dataset.zip
100% 124M/124M [00:02<00:00, 43.6MB/s]


In [5]:
folder_path = '/content/test_data/original'

# Списки для хранения путей и имен файлов
file_paths = []
file_names = []

# Обойдем файлы в папке
for root, dirs, files in os.walk(folder_path):
    for file in files:
        file_paths.append(os.path.join(root, file))
        file_names.append(file)

# Создаем DataFrame
data = {'File Path': file_paths, 'File Name': file_names}
test_data = pd.DataFrame(data)

# Вывести первые строки DataFrame
test_data

Unnamed: 0,File Path,File Name
0,/content/test_data/original/2_44_4.wav,2_44_4.wav
1,/content/test_data/original/7_20_0.wav,7_20_0.wav
2,/content/test_data/original/6_26_4.wav,6_26_4.wav
3,/content/test_data/original/10_21_2.wav,10_21_2.wav
4,/content/test_data/original/3_46_4.wav,3_46_4.wav
...,...,...
1545,/content/test_data/original/6_37_4.wav,6_37_4.wav
1546,/content/test_data/original/2_40_3.wav,2_40_3.wav
1547,/content/test_data/original/2_21_2.wav,2_21_2.wav
1548,/content/test_data/original/2_29_0.wav,2_29_0.wav


In [6]:
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
from transformers.file_utils import ModelOutput


@dataclass
class SpeechClassifierOutput(ModelOutput):
    loss: Optional[torch.FloatTensor] = None
    logits: torch.FloatTensor = None
    hidden_states: Optional[Tuple[torch.FloatTensor]] = None
    attentions: Optional[Tuple[torch.FloatTensor]] = None

In [7]:
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss

from transformers.models.wav2vec2.modeling_wav2vec2 import (
    Wav2Vec2PreTrainedModel,
    Wav2Vec2Model
)


class Wav2Vec2ClassificationHead(nn.Module):
    """Head for wav2vec classification task."""

    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.final_dropout)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x


class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="mean"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        hidden_states = outputs[0]
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SpeechClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name_or_path = "/content/data"
config = AutoConfig.from_pretrained(model_name_or_path)
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path)
sampling_rate = processor.feature_extractor.sampling_rate
model = Wav2Vec2ForSpeechClassification.from_pretrained(model_name_or_path).to(device)

In [9]:
def speech_file_to_array_fn(path, sampling_rate=16000):
    speech_array, _sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(_sampling_rate)
    speech = resampler(speech_array).squeeze().numpy()
    return speech


def predict(path, sampling_rate=16000):
    speech = speech_file_to_array_fn(path, sampling_rate)
    features = processor(speech, sampling_rate=sampling_rate, return_tensors="pt", padding=True)

    input_values = features.input_values.to(device)
    attention_mask = features.attention_mask.to(device)

    with torch.no_grad():
        logits = model(input_values, attention_mask=attention_mask).logits

    scores = F.softmax(logits, dim=1).detach().cpu().numpy()[0]
    outputs = [{"Emotion": config.id2label[i], "Score": f"{round(score * 100, 3):.1f}%"} for i, score in enumerate(scores)]
    return outputs

In [10]:
predict('/content/test_data/original/10_20_4.wav')

[{'Emotion': 'angry', 'Score': '7.3%'},
 {'Emotion': 'calm', 'Score': '0.8%'},
 {'Emotion': 'disgust', 'Score': '76.6%'},
 {'Emotion': 'fearful', 'Score': '0.3%'},
 {'Emotion': 'happy', 'Score': '0.6%'},
 {'Emotion': 'neutral', 'Score': '0.2%'},
 {'Emotion': 'sad', 'Score': '0.7%'},
 {'Emotion': 'surprised', 'Score': '13.5%'}]

In [11]:
import os

# Задайте путь к папке с файлами и путь к файлу results.txt
input_folder = "/content/test_data/original"
output_file = "results.txt"

# Получите список имен файлов в папке и отсортируйте их
files = [filename for filename in os.listdir(input_folder) if filename.endswith(".wav")]
files.sort()

# Откройте файл для записи
with open(output_file, "w") as result_file:
    # Запишите заголовок таблицы
    result_file.write("file_id |01 |02 |03 |04 |05 |06 |07 |08 |\n")

    # Перебирайте отсортированные файлы
    for filename in files:
        # Запишите имя файла в результаты
        result_file.write(f"{filename} |")

        # Примените функцию predict() к файлу и запишите коэффициенты в результаты
        predictions = predict(os.path.join(input_folder, filename))
        for prediction in predictions:
            result_file.write(f"{prediction['Score']}|")

        # Завершите строку после записи коэффициентов для данного файла
        result_file.write("\n")

print("Результаты записаны в results.txt")


Результаты записаны в results.txt


In [14]:
from google.colab import files
files.download('results.txt')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>