# Подключение библиотеки

In [1]:
import os
from datasets import load_dataset, load_from_disk

In [2]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
from torchaudio.transforms import Resample
from torch.nn import CTCLoss
from torch.optim import Adam
import torch.nn as nn

## Загрузка датасет

In [3]:
# Проверка, существует ли уже сохраненный датасет
if os.path.exists('/home/redalexdad/recognition_speech/common_voice_11/'):
    # Загружаем сохраненный датасет, если он уже существует
    cv_11_train = load_from_disk('/home/redalexdad/recognition_speech/common_voice_11/train/')
    cv_11_test = load_from_disk('/home/redalexdad/recognition_speech/common_voice_11/test/')
else:
    # Иначе, скачиваем и сохраняем датасет
    cv_11_train = load_dataset("mozilla-foundation/common_voice_11_0", "ru", split="train")
    cv_11_train.save_to_disk('/home/redalexdad/recognition_speech')

## Содержимое датасет

In [4]:
cv_11_train

In [5]:
cv_11_test

In [6]:
cv_11_train[0]

## Создание класса датасета и обучение

In [7]:
class SpeechDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        audio_data = self.data[idx]['audio']['array']
        target_text = self.data[idx]['sentence']
        return audio_data, target_text

In [8]:
# Преобразование данных в тензоры и загрузчик данных
def collate_fn(batch):
    audios, texts = zip(*batch)
    audio_lengths = [len(audio) for audio in audios]
    texts = [text.lower() for text in texts]

    # Применяем трансформации, такие как ресемплирование
    # resample = Resample(orig_freq=48000, new_freq=16000)
    # Уменьшаем частоту дискретизации
    resample = Resample(orig_freq=48000, new_freq=8000)
    audios = [resample(torch.FloatTensor(audio)) for audio in audios]

    # Приводим все аудио к одной длине, например, путем дополнения нулями
    audios = torch.nn.utils.rnn.pad_sequence(audios, batch_first=True)

    # Добавляем третье измерение
    audios = audios.unsqueeze(1)

    # Преобразуем текст в числовое представление
    char_map = {char: idx for idx, char in enumerate(set(''.join(texts)))}
    target_lengths = [len(text) for text in texts]
    targets = [torch.LongTensor([char_map[char] for char in text]) for text in texts]
    targets = torch.nn.utils.rnn.pad_sequence(targets, batch_first=True)

    return audios, audio_lengths, targets, target_lengths

In [9]:
%%time
# Создаем char_map
char_map = {char: idx for idx, char in enumerate(set(''.join([data_point['sentence'] for data_point in cv_11_train])))}

In [10]:
# Инициализация параметров
input_size = 1  # Один канал для моноаудио
hidden_size = 256  # Размер скрытого слоя
num_classes = len(char_map)  # Количество классов, равное числу уникальных символов в текстах

In [11]:
print(f'Количество классов, равное числу уникальных символов в текстах: {num_classes}')

In [12]:
# Создаем модель
class SpeechRecognitionModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(SpeechRecognitionModel, self).__init__()
        self.rnn = nn.LSTM(input_size, hidden_size, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        out, _ = self.rnn(x)
        out = self.fc(out)
        return out

In [13]:
# Создаем модель, оптимизатор и функцию потерь
model = SpeechRecognitionModel(input_size, hidden_size, num_classes)
optimizer = Adam(model.parameters(), lr=0.001)
criterion = CTCLoss(blank=num_classes - 1)

In [14]:
# Переводим модель и данные на GPU, если доступен
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device('cpu')
model.to(device)

In [15]:
print(f'Used device: {device}')

In [16]:
# Создаем DataLoader и обучаем модель
dataset = SpeechDataset(cv_11_train)
dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)

In [None]:
%%time
for batch in dataloader:
    # Параметры батча
    audios, audio_lengths, targets, target_lengths = batch
    
    # Преобразование в тензоры
    audio_lengths = torch.LongTensor(audio_lengths)
    target_lengths = torch.LongTensor(target_lengths)
    
    # Проверка размерности
    assert audio_lengths.size(0) == audios.size(0), "Размерность audio_lengths не соответствует размерности батча"

In [None]:
# Пример обучения на нескольких эпохах
num_epochs = 5

In [None]:
for epoch in range(num_epochs):
    for batch in dataloader:
        audios, audio_lengths, targets, target_lengths = batch
        audios, targets = audios.to(device), targets.to(device)
        audios = audios.permute(0, 2, 1).to(device)

        # Обнуляем градиенты
        optimizer.zero_grad()

        # Прямой проход и вычисление потерь
        outputs = model(audios)
        outputs = nn.functional.log_softmax(outputs, dim=2)

        # Проверка размерности input_lengths
        assert outputs.size(0) == audio_lengths.size(0), "Размерность audio_lengths не соответствует размерности батча"

        loss = criterion(outputs, targets, audio_lengths, target_lengths)

        # Обратное распространение и оптимизация
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item()}")

In [None]:
# Сохранение обученной модели
torch.save(model.state_dict(), 'speech_recognition_model.pth')
print("Обучение завершено. Модель сохранена.")

In [None]:
# Проверка на тестовом наборе данных
model.eval()  # Установка модели в режим оценки

In [None]:
with torch.no_grad():
    test_loss = 0.0
    total = 0
    correct = 0

    for batch in test_dataloader:  # Предполагается, что у вас есть загрузчик данных для тестирования
        audios, audio_lengths, targets, target_lengths = batch
        audios, targets = audios.to(device), targets.to(device)
        audios = audios.permute(0, 2, 1).to(device)

        outputs = model(audios)
        outputs = nn.functional.log_softmax(outputs, dim=2)
        loss = criterion(outputs, targets, audio_lengths, target_lengths)
        test_loss += loss.item()

        _, predicted = outputs.max(2)
        total += targets.size(1) * targets.size(0)
        correct += predicted.eq(targets).sum().item()

    accuracy = correct / total
    average_test_loss = test_loss / len(test_dataloader)

print(f"Точность на тестовом наборе: {accuracy * 100:.2f}%")
print(f"Средний loss на тестовом наборе: {average_test_loss}")