In [None]:
# необходимые пакеты

# ! pip install ipywidgets
# ! pip install IPython
# ! pip install ipyfilechooser
# ! pip install wave
# ! pip install vosk
# ! pip install pydub
# ! pip install torch
# ! pip install transformers
# ! pip install pyaudio

In [None]:
# UI
import ipywidgets as widgets
from IPython.display import display

# выбор рабочей директории
from ipyfilechooser import FileChooser

# работа с wav-файлами
import wave

# запись звука
import pyaudio

# работа с потоками
import threading

# библиотека транскрибации Vosk и необходимые ей пакеты
from vosk import Model
from vosk import KaldiRecognizer
from pydub import AudioSegment

# закачка файлов и работа с архивами
import urllib
import zipfile

# вспомогательные библиотеки
from enum import Enum
from datetime import datetime
from pathlib import Path
import json
import os


In [None]:
# общий класс для логирования
class logger:


    class log_level(Enum):
        INFO    = 'INFO'
        WARNING = 'WARNING'
        ERROR   = 'ERROR'


    def __init__(self, work_folder, print_output = False):

        # помимо записи в файл выводить print
        self.print_output = print_output

        self.file = str(Path(work_folder, 'logs').with_suffix('.txt'))

        if not os.path.exists(work_folder):
            return

        dt = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

        with open(self.file, 'a', encoding = 'utf-8') as f:
            f.write(f'\n ============[{dt}]============')


    # общий метод логирования
    def log(self, message, level):

        if not os.path.exists(self.file):
            return

        dt = datetime.now().strftime("%d/%m/%Y %H:%M:%S")

        msg = f'\n [{dt}][{level}]: {message}'
        with open(self.file, 'a', encoding = 'utf-8') as f:
            f.write(msg)

        if self.print_output == True:
            print(msg)


    # INFO
    def info(self, message):
        self.log(message, self.log_level.INFO.value)


    # WARN
    def warn(self, message):
        self.log(message, self.log_level.WARNING.value)


    # ERROR
    def error(self, message):
        self.log(message, self.log_level.ERROR.value)

In [None]:
# работа с файлами
class file_io:

    def __init__(self, log, root_work_folder):
        self.logger = log if log is not None else logger(print_output = True)
        self.root_work_folder = root_work_folder


    def save(self, file, text):

        file_name = Path(file).stem
        save_path = str(Path(self.root_work_folder, file_name).with_suffix('.txt'))

        self.logger.info(f"Сохранение распознанного файла: {save_path}")

        with open(save_path, 'w', encoding = 'utf-8') as f:
            f.write(text)

In [None]:
# загрузка модели для распознавания русской речи
class model_loader:

     @staticmethod
     def load_language_model(log, root_work_folder, language_model_url = "https://alphacephei.com/vosk/models/vosk-model-ru-0.22.zip"):

        try:

            language_model_path = str(Path(root_work_folder, 'vosk-model-ru-0.22'))

            if not os.path.exists(language_model_path):

                log.info(f"Скачивание языковой модели: {language_model_url}...")

                zip_path, _ = urllib.request.urlretrieve(language_model_url)
                with zipfile.ZipFile(zip_path, "r") as f:
                    f.extractall(root_work_folder)

                log.info(f"Языковая модель сохранена в: {root_work_folder}")

            return language_model_path

        except Exception as e:

            log.error(f'Ошибка при скачивании и распаковке языковой модели: {str(e)}')

In [None]:
# настройки канальности и частоты звукового файла
class sound_settings:
    FRAME_RATE = 16000
    CHANNELS = 1


# состояния классов обработки звука
class worker_state(Enum):
    STOPPED = 0
    RUNNING = 1

In [None]:
# запись звука
class speech_recorder:


    def __init__(self, log):

        self.logger = log

        self.logger.info('Инициализация класса звукозаписи')

        self.state = worker_state.STOPPED

        self.audio_stream = None
        self.recorder = None


    # запись аудио с помощью pyaudio в открытый поток
    def open_stream(self):

        self.logger.info('Аудиопоток открыт')

        self.state = worker_state.RUNNING

        self.recorder = pyaudio.PyAudio()

        self.audio_stream = self.recorder.open(
            format = pyaudio.paInt16,
            channels = sound_settings.CHANNELS,
            rate = sound_settings.FRAME_RATE,
            input = True,
            frames_per_buffer = sound_settings.FRAME_RATE
        )

        self.audio_stream.start_stream()

        return self.audio_stream


    # закрыть аудиопоток
    def close_stream(self):

        self.state = worker_state.STOPPED

        if self.audio_stream is not None:

            self.logger.info('Остановка потока аудиозаписи')

            self.audio_stream.stop_stream()
            self.audio_stream.close()
            self.audio_stream = None


    # очистка ресурсов pyaudio
    def dispose_pyaudio(self):

        if self.recorder is not None:

            self.logger.info('Очистка ресурсов аудиозаписи')
            self.recorder.terminate()
            self.recorder = None


In [None]:
# распознавание звукового файла
class speech_transcriber:


    def __init__(self, log, speech_recorder, language_model_path, root_work_folder):

        self.logger = log
        self.speech_recorder = speech_recorder

        self.logger.info('Инициализация класса распознавания речи')

        # пока только .wav файлы
        self.supported_files = ['wav']

        self.state = worker_state.STOPPED

        self.transcribed_text = None

        self.root_work_folder = root_work_folder

        # инициализация модели распознавания
        self.model = Model(language_model_path)
        self.recognizer = KaldiRecognizer(self.model, sound_settings.FRAME_RATE)
        self.recognizer.SetWords(True)


    # распознавание аудиофайла
    def transcribe_file(self, file_path):

        if(self.state != worker_state.STOPPED):
            self.logger.warn('Идёт распознавание')
            return

        self.state = worker_state.RUNNING

        is_valid, ext = self.__is_valid_file(file_path)

        file_name = Path(file_path).name

        if not is_valid:
            self.logger.error(f'Недопустимый формат файла: {file_name}')
            return

        self.logger.info(f'Распознавание загруженного аудиофайла: {file_path}')

        self.recognizer = KaldiRecognizer(self.model, sound_settings.FRAME_RATE)
        self.recognizer.SetWords(True)
        self.transcribed_text = ' '
        self.transcribed_text = None

        data = None

        # wav
        if ext == self.supported_files[0]:
            data = AudioSegment.from_wav(file_path)
        else:
            return

        data = data.set_channels(sound_settings.CHANNELS)
        data = data.set_frame_rate(sound_settings.FRAME_RATE)

        # текстовый результат
        self.recognizer.AcceptWaveform(data.raw_data)
        result = self.recognizer.Result()
        self.transcribed_text = json.loads(result)["text"]

        self.logger.info(f'Аудиофайл {file_name} распознан')

        self.state = worker_state.STOPPED

        return self.transcribed_text


    # распознавание речи в реальном времени
    def transcribe_stream(self, partial_result_callback = None):

        self.logger.info('Запуск распознавания речи в реальном времени')

        self.transcribed_text = None

        self.state = worker_state.RUNNING

        self.recognizer = KaldiRecognizer(self.model, sound_settings.FRAME_RATE)

        stream = self.speech_recorder.open_stream()

        result = ''
        while self.state == worker_state.RUNNING:
            data = stream.read(int(sound_settings.FRAME_RATE))

            chunk = self.recognizer.Result() if self.recognizer.AcceptWaveform(data) else self.recognizer.PartialResult()

            partial = ''
            text = ''
            # если есть только частичные результаты - выведем их на экран в реальном времени
            # как только появится полностью распознанная часть текста - добавим её к финальному результату
            json_ = json.loads(chunk)
            if 'text' in json_:
                if len(json_['text']) > 0:
                    text = json_['text']
                    result += f' {text}'
            if 'partial' in json_:
                partial = json_['partial']

            if partial_result_callback is not None:
                if len(partial) > 0:
                    partial_result_callback(result + f' {partial}')
                if len(text) > 0:
                    partial_result_callback(result)

        result += json.loads(self.recognizer.FinalResult())['text']

        self.transcribed_text = result


    # остановить распознавание в реальном времени
    def stop_transcribing_stream(self):

        self.logger.info('Распознавание речи остановлено')

        self.state = worker_state.STOPPED
        self.speech_recorder.close_stream()
        self.speech_recorder.dispose_pyaudio()


    # проверка расширения файла
    def __is_valid_file(self, filepath):

        is_valid = False

        ext = Path(filepath).suffix
        ext = ext.replace('.','')
        if ext in self.supported_files:
            is_valid = True

        return is_valid, ext



In [None]:
# графический интерфейс для класса звукозаписи
class speech_recorder_ui:


    def __init__(self, log, root_work_folder):

        self.logger = log

        self.logger.info('Инициализация графического интерфейса класса звукозаписи')

        self.root_work_folder = root_work_folder
        self.speech_recorder = speech_recorder(log)

        self.recorder_thread = None
        self.frames = []

        # кнопка записи
        self.__record_button_widget = widgets.Button(
            disabled = False,
            tooltip = 'Запись звука',
            icon = 'microphone',
            button_style = "primary"
        )
        self.__record_button_widget.on_click(self.__record_onclick)

        # выбор рабочей директории для сохранения записи
        self.__file_chooser_widget = FileChooser(
                                path = self.root_work_folder,
                                select_default = True,
                                select_desc = 'Выбрать',
                                change_desc = 'Изменить',
                                title = 'Директория для сохранения аудиофайла:',
                                show_only_dirs = True
                            )
        self.__file_chooser_widget.layout.width = '100%'
        self.__file_chooser_widget.register_callback(self.__file_chooser_callback)


    # получить виджеты для рендера
    def get_widgets(self):
        return widgets.VBox([self.__file_chooser_widget, self.__record_button_widget])


    # изменить внешний вид кнопки в зависимости от состояния класса
    def __toggle_button_state(self, button, state):
        button.button_style = state[0]
        button.tooltip = state[1]
        button.icon = state[2]


    # нажатие кнопки записи
    def __record_onclick(self, button):

        if(self.__file_chooser_widget.selected_path is None):
            self.logger.warn('Выберите директорию для сохранения аудиофайла')
            return

        if(self.speech_recorder.state == worker_state.STOPPED):

            self.recorder_thread = threading.Thread(target = self.__recorder_worker_thread)
            self.recorder_thread.start()

            self.__toggle_button_state(self.__record_button_widget, ['success', 'Остановить', 'microphone-slash'])

        else:

            self.speech_recorder.close_stream()
            self.recorder_thread.join()
            self.recorder_thread = None

            self.__save_recording()
            self.__toggle_button_state(self.__record_button_widget, ['primary', 'Запись звука', 'microphone'])
            self.speech_recorder.dispose_pyaudio()


    def __file_chooser_callback(self, chooser):
        self.logger.info(f'Выбрана рабочая директория для сохранения аудиозаписи: {chooser.selected_path}')
        self.root_work_folder = chooser.selected_path


    # запуск звукозаписи
    def __recorder_worker_thread(self):

        stream = self.speech_recorder.open_stream()

        while self.speech_recorder.state == worker_state.RUNNING:
            data = stream.read(sound_settings.FRAME_RATE)
            self.frames.append(data)


    # сохранение записи
    def __save_recording(self):

        if len(self.frames) <= 0:
            self.logger.warn('Буфер аудиозаписи пуст')
            return

        filename = f'recording_{datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}'
        path = str(Path(self.root_work_folder, filename).with_suffix('.wav'))

        self.logger.info(f'Сохранение аудиозаписи: {path}')

        with wave.open(path, 'w') as file:

            file.setnchannels(sound_settings.CHANNELS)
            file.setsampwidth(self.speech_recorder.recorder.get_sample_size(pyaudio.paInt16))
            file.setframerate(sound_settings.FRAME_RATE)

            file.writeframes(b''.join(self.frames))

        self.frames = []

In [None]:
# графический интерфейс для класса распознавания звука
class speech_transcriber_ui:


    def __init__(self, log, file_io, speech_recorder, language_model_path, root_work_folder):

        self.logger = log

        self.logger.info('Инициализация графического интерфейса класса распознавания речи')

        self.file_io = file_io

        self.root_work_folder = root_work_folder

        self.__tanscribe_on_fly_thread = None

        self.speech_transcriber = speech_transcriber(log, speech_recorder, language_model_path, root_work_folder)

         # выбор аудиофайла
        self.__file_chooser_widget = FileChooser(
                                path = self.root_work_folder,
                                select_desc = 'Выбрать',
                                change_desc = 'Изменить',
                                title = 'Выбрать аудиофайл для распознавания:',
                                filter_pattern = ['*.wav']
                            )
        self.__file_chooser_widget.layout.width = '100%'
        self.__file_chooser_widget.register_callback(self.__file_chooser_callback)

         # кнопка распознавания загруженного аудиофайла
        self.__button_transcribe_widget = widgets.Button(
            disabled = False,
            tooltip = 'Распознать загруженный аудиофайл',
            icon = 'book',
            button_style = "primary",
            filter_pattern = []
        )

        # кнопка распознавания аудиофайла на лету
        self.__button_transcribe_on_the_fly_widget = widgets.Button(
            disabled = False,
            tooltip = 'Распознать на лету',
            icon = 'microphone',
            button_style = "primary",
            filter_pattern = []
        )

        self.__button_transcribe_widget.on_click(self.__transcribe_file_onclick)
        self.__button_transcribe_on_the_fly_widget.on_click(self.__transcribe_on_the_fly_click)


        # поле для отображения распознанного текста
        self.__transcribe_text_widget = widgets.Textarea(
            value = 'Распознанный текст:',
            placeholder = '',
            description = '',
            disabled = False,
            layout = widgets.Layout(width = 'auto', height = '300px')
        )
        self.__transcribe_text_widget.style.font_size = '20px'


     # получить виджеты для рендера
    def get_widgets(self):
        return widgets.VBox([self.__file_chooser_widget,
                             self.__button_transcribe_widget,
                             self.__button_transcribe_on_the_fly_widget,
                             self.__transcribe_text_widget,
                            ])


    # изменить внешний вид кнопки в зависимости от состояния класса
    def __toggle_button_state(self, button, state):
        button.button_style = state[0]
        button.tooltip = state[1]
        button.icon = state[2]


    # обработка промежуточных результатов транскрибации
    def __transcribe_on_the_fly_callback(self, text):
        self.__transcribe_text_widget.value = text


    # нажатие кнопки распознавания загруженного файла
    def __transcribe_file_onclick(self, button):

        self.__transcribe_text_widget.value = ' '

        if(self.__file_chooser_widget.selected_path is None):
            self.logger.warn('Выберите .wav аудиофайл')
            return

        self.__toggle_button_state(self.__button_transcribe_widget, ['success', 'Идёт распознование', 'spinner'])

        text = self.speech_transcriber.transcribe_file(self.__file_chooser_widget.selected)

        self.__toggle_button_state(self.__button_transcribe_widget, ['primary', 'Распознать аудиофайл', 'book'])

        self.__transcribe_text_widget.value = text

        self.file_io.save(self.__file_chooser_widget.selected, text)


    # нажатие кнопки распознавания аудиопотока в реальном времени
    def __transcribe_on_the_fly_click(self, button):

        self.__transcribe_text_widget.value = ' '

        if self.speech_transcriber.state == worker_state.STOPPED:

            self.__toggle_button_state(self.__button_transcribe_on_the_fly_widget, ['success', 'Идёт распознование', 'microphone-slash'])

            self.__tanscribe_on_fly_thread = threading.Thread(target = self.__transcribe_on_fly_worker_thread)
            self.__tanscribe_on_fly_thread.start()

        else:

            self.__toggle_button_state(self.__button_transcribe_on_the_fly_widget, ['primary', 'Распознать на лету', 'microphone'])

            self.speech_transcriber.stop_transcribing_stream()
            self.__tanscribe_on_fly_thread.join()
            self.__tanscribe_on_fly_thread = None

            self.__transcribe_text_widget.value = self.speech_transcriber.transcribed_text

            filename = f'transcribe_on_the_fly_{datetime.now().strftime("%d_%m_%Y_%H_%M_%S")}'
            path = str(Path(self.root_work_folder, filename).with_suffix('.txt'))

            self.file_io.save(path, self.speech_transcriber.transcribed_text)


    # запуск распознавания на лету
    def __transcribe_on_fly_worker_thread(self):
        self.speech_transcriber.transcribe_stream(partial_result_callback = self.__transcribe_on_the_fly_callback)


    def __file_chooser_callback(self, chooser):
        self.logger.info(f'Выбран файл для распознавания: {chooser.selected}')


In [None]:
# точка входа программы
class runner:


    def __init__(self, root_work_folder = None):

        folder = root_work_folder

        if folder == None:
            folder = self.__init_root_folder()

        log = logger(folder)
        language_model_path = model_loader.load_language_model(log, folder)
        f = file_io(log, folder)
        self.recorder_ui = speech_recorder_ui(log, folder)
        self.transcriber_ui = speech_transcriber_ui(log, f, self.recorder_ui.speech_recorder, language_model_path, folder)

        log.print_output = True


    # рендер виджетов
    def render(self):

        header_record = widgets.HTML(value = 'Запись:')
        header_transcribe = widgets.HTML(value = 'Распознавание:')

        box_record = self.recorder_ui.get_widgets()
        box_transcribe = self.transcriber_ui.get_widgets()

        header_record.style.font_size, header_transcribe.style.font_size = '20px', '20px'
        header_record.layout.padding, header_transcribe.layout.padding = '25px 0 0 0','50px 0 0 0'
        box_record.layout.padding, box_transcribe.layout.padding = '10px 0 10px 0','10px 0 10px 0'

        controls = widgets.GridBox(children = [header_record, box_record, header_transcribe, box_transcribe],
                layout=widgets.Layout(
                    grid_template_rows = 'auto auto auto auto',
                    grid_template_areas='''
                    "header_record"
                    "box_record"
                    "header_transcribe"
                    "box_transcribe"
                    ''')
            )

        display(controls)


    # инициализация рабочей директории
    def __init_root_folder(self):
        root_folder = str(Path(os.getcwd(), 'speech_recognition'))

        if not os.path.exists(root_folder):

            Path(root_folder).mkdir(parents = True, exist_ok = True)

        return root_folder


In [None]:
# main

# инициализация длится несколько минут (в зависимости от железа и интернета)

# директория для хранения языковой модели, аудиозаписей и логов
# по умолчанию '[текущая папка]\speech_recognition'
# можно задать явно и передать в конструктор класса runner, к примеру:
# root_work_folder = r'd:/test/record_sound'
main = runner()
main.render()