## Основной пайплан транскрибации

### Транскрибация и расчет метрик

In [None]:
import torch
import whisper
import time
import os
import re
import warnings
from pydub import AudioSegment
from IPython.display import display
from ipywidgets import widgets, Button, VBox, Label
from pyannote.audio import Pipeline
import traceback
from jiwer import wer, cer, mer, wil
from pymystem3 import Mystem
import numpy as np
import json
import wave
from vosk import Model, KaldiRecognizer
from transformers import AutoProcessor, SeamlessM4Tv2Model, Wav2Vec2ForCTC, Wav2Vec2Processor
import torchaudio
import gc
from scipy.io import wavfile
import noisereduce as nr
import librosa
import gigaam


AUTH_TOKEN = #Токен higging face. Получить бесплатно можно тут https://huggingface.co/settings/tokens
os.environ["HF_TOKEN"] = AUTH_TOKEN


DIARIZATION_MODEL = "pyannote/speaker-diarization-3.1"
NUM_SPEAKERS = 2

mystem = Mystem()

os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
torch.backends.cudnn.benchmark = True

def clear_memory():
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    gc.collect()

def normalize_text(text):

    # text = text.replace('SPEAKER_00', ' ')
    # text = text.replace('SPEAKER_01', ' ')
    # text = text.replace('UNKNOWN', ' ') 
    text = text.lower().replace('ё', 'е')
    text = re.sub(r'[-–]', ' ', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def lemmatize_text(text):
    try:
        return ''.join(mystem.lemmatize(text)).strip()
    except Exception as e:
        print(f"Ошибка лемматизации: {str(e)}")
        return text

def get_audio_duration(file_path):
    audio = AudioSegment.from_file(file_path)
    return len(audio) / 1000.0

def calculate_metrics(reference, hypothesis):
    def safe_lemmatize(text):
        try:
            return lemmatize_text(text) if text else ""
        except:
            return text
    
    ref_norm = normalize_text(reference or "")
    hyp_norm = normalize_text(hypothesis or "")
    ref_lemma = safe_lemmatize(ref_norm)
    hyp_lemma = safe_lemmatize(hyp_norm)

    def calculate_level_metrics(ref, hyp):
        if not ref or not hyp:
            return {'WER': 1.0, 'CER': 1.0, 'MER': 1.0, 'WIL': 1.0, 'PIWER': 1.0}
        
        return {
            'WER': wer(ref, hyp),
            'CER': cer(ref, hyp),
            'MER': mer(ref, hyp),
            'WIL': wil(ref, hyp),
            'PIWER': 1 - len(set(ref.split()) & set(hyp.split())) / max(1, len(set(ref.split())))
        }
    
    return {
        'Base': calculate_level_metrics(ref_norm, hyp_norm),
        'Lemmatized': calculate_level_metrics(ref_lemma, hyp_lemma)
    }

def convert_to_wav(file_path):
    try:
        audio = AudioSegment.from_file(file_path)
        wav_path = os.path.splitext(file_path)[0] + ".wav"
        audio.export(wav_path, format="wav", parameters=["-ac", "1", "-ar", "16000"])
        return wav_path
    except Exception as e:
        raise RuntimeError(f"Ошибка конвертации: {str(e)}")

def apply_audio_preprocessing(file_path):
    try:
        rate, data = wavfile.read(file_path)
        
        if len(data.shape) > 1:
            data = data.mean(axis=1)
            
        data = data.astype(np.float32)
        data /= np.max(np.abs(data))
        
        reduced_noise = nr.reduce_noise(
            y=data,
            sr=rate,
            stationary=True,
            prop_decrease=0.75
        )
        
        trimmed, _ = librosa.effects.trim(reduced_noise, top_db=20)
        
        temp_path = f"preprocessed_{os.path.basename(file_path)}"
        wavfile.write(temp_path, rate, trimmed)
        
        return temp_path
        
    except Exception as e:
        raise RuntimeError(f"Ошибка предобработки: {str(e)}")

def transcribe_whisper(file_path, model_size):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    try:
        clear_memory()
        start_time = time.time()
        model = whisper.load_model(model_size, device=device)
        
        audio = whisper.load_audio(file_path)
        result = model.transcribe(
            audio,
            word_timestamps=True,
            language="ru",
            initial_prompt="Запиши все числительные словами, а не цифрами." # промт для корректной записи числительных
        )
        
        return {
            'text': result['text'],
            'segments': result['segments'],
            'time': time.time() - start_time,
            'device': device.upper()
        }
    except Exception as e:
        raise RuntimeError(f"Whisper: ошибка {str(e)}")
    finally:
        del model
        clear_memory()

def transcribe_whisper_medium(file_path):
    return transcribe_whisper(file_path, "medium")

def transcribe_whisper_large(file_path):
    return transcribe_whisper(file_path, "large-v3-turbo")

def transcribe_vosk(file_path):
    device = 'cpu'
    try:
        clear_memory()
        start_time = time.time()
        model_path = "vosk-model-ru-0.42"
        if not os.path.exists(model_path):
            raise RuntimeError(f"Модель Vosk не найдена в корне папки: {model_path}")
            
        model = Model(model_path)
        wf = wave.open(file_path, "rb")
        rec = KaldiRecognizer(model, wf.getframerate())
        rec.SetWords(True)
        
        results = []
        while True:
            data = wf.readframes(4000)
            if not data: break
            if rec.AcceptWaveform(data):
                results.append(json.loads(rec.Result()))
        
        results.append(json.loads(rec.FinalResult()))
        wf.close()

        segments = []
        full_text = []
        for res in results:
            if 'result' in res:
                for word in res['result']:
                    segments.append({
                        'start': word['start'],
                        'end': word['end'],
                        'text': word['word']
                    })
                    full_text.append(word['word'])
            elif 'text' in res and res['text']:
                segments.append({
                    'start': 0,
                    'end': get_audio_duration(file_path),
                    'text': res['text']
                })
                full_text.append(res['text'])

        return {
            'text': ' '.join(full_text),
            'segments': segments,
            'time': time.time() - start_time,
            'device': device.upper()
        }
    except Exception as e:
        raise RuntimeError(f"Vosk: ошибка {str(e)}")
    finally:
        del model
        clear_memory()


def clear_memory():
    torch.cuda.empty_cache()

def transcribe_seamless(file_path):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    try:
        clear_memory()
        start_time = time.time()
        
        processor = AutoProcessor.from_pretrained("facebook/seamless-m4t-v2-large")
        model = SeamlessM4Tv2Model.from_pretrained(
            "facebook/seamless-m4t-v2-large",
            device_map=device
        ).eval()
        
        audio_data, sample_rate = torchaudio.load(file_path)
        audio_data = audio_data.mean(dim=0)  
        total_duration = audio_data.shape[0] / sample_rate
        

        chunk_duration = 30  
        chunk_size = int(chunk_duration * sample_rate)
        overlap = int(0.5 * sample_rate)  
        
        segments = []
        full_text = []
        current_pos = 0
        
        while current_pos < len(audio_data):
            chunk_start = int(max(0, current_pos - overlap) if current_pos > 0 else 0)
            chunk_end = int(min(current_pos + chunk_size, len(audio_data)))
            
            chunk = audio_data[chunk_start:chunk_end]
            
            chunk = chunk / torch.max(torch.abs(chunk))
            
            inputs = processor(
                audios=chunk.numpy(),
                return_tensors="pt",
                sampling_rate=sample_rate
            ).to(device)
            
            with torch.no_grad():
                outputs = model.generate(
                    **inputs,
                    tgt_lang="rus",
                    generate_speech=False
                )
                
                if hasattr(outputs, 'text'):
                    text = outputs.text[0]
                else:
                    text = processor.batch_decode(outputs.sequences, skip_special_tokens=True)[0]
            
            segment_start = chunk_start / sample_rate
            segment_end = chunk_end / sample_rate
            
            segments.append({
                'start': segment_start,
                'end': segment_end,
                'text': text
            })
            
            full_text.append(text)
            current_pos += chunk_size
            
            del inputs, outputs
            clear_memory()
        
        return {
            'text': ' '.join(full_text),
            'segments': segments,
            'time': time.time() - start_time,
            'device': device.upper()
        }
        
    except Exception as e:
        raise RuntimeError(f"Seamless-M4T: ошибка {str(e)}\n{traceback.format_exc()}")
    finally:
        del model
        clear_memory()


def transcribe_wav2vec(file_path):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    try:
        clear_memory()
        start_time = time.time()
        
        processor = Wav2Vec2Processor.from_pretrained("jonatasgrosman/wav2vec2-large-xlsr-53-russian")
        model = Wav2Vec2ForCTC.from_pretrained(
            "jonatasgrosman/wav2vec2-large-xlsr-53-russian"
        ).to(device).eval()
        

        audio_data, sample_rate = torchaudio.load(file_path)
        audio_data = audio_data.mean(dim=0)  
        

        total_duration = audio_data.shape[0] / sample_rate
        

        chunk_duration = 30 
        chunk_size = int(chunk_duration * sample_rate)
        overlap = int(0.5 * sample_rate) 
        
        segments = []
        full_text = []
        current_pos = 0
        
        while current_pos < len(audio_data):
            chunk_start = max(0, current_pos - overlap) if current_pos > 0 else 0
            chunk_end = min(current_pos + chunk_size, len(audio_data))
            
            chunk = audio_data[chunk_start:chunk_end]

            chunk = chunk / torch.max(torch.abs(chunk))

            input_values = processor(
                chunk.numpy(), 
                sampling_rate=sample_rate, 
                return_tensors="pt"
            ).input_values.to(device)
            
            with torch.no_grad():
                logits = model(input_values).logits
                predicted_ids = torch.argmax(logits, dim=-1)
                text = processor.batch_decode(predicted_ids)[0]
            
            segment_start = chunk_start / sample_rate
            segment_end = chunk_end / sample_rate
            
            segments.append({
                'start': segment_start,
                'end': segment_end,
                'text': text
            })
            
            full_text.append(text)
            current_pos += chunk_size - overlap  
            

            del input_values, logits, predicted_ids
            torch.cuda.empty_cache()
        
        return {
            'text': ' '.join(full_text),
            'segments': segments,
            'time': time.time() - start_time,
            'device': device.upper(),
            'total_duration': total_duration
        }
        
    except Exception as e:
        raise RuntimeError(f"Wav2Vec2 ошибка: {str(e)}\n{traceback.format_exc()}")
    finally:
        del model
        clear_memory()

def transcribe_gigaam(file_path):
    device = 'cuda' if torch.cuda.is_available() else 'cpu' # Тут алгортм выбирает cuda автоматически. Попытка прописать cuda руками, приводит к конфлитку версий библиотек.
    try:
        clear_memory()
        start_time = time.time()
        
        model = gigaam.load_model("v2_rnnt")  # Другие подвиды модели: "v2_ctc" or "ctc", "v2_rnnt" or "rnnt", "v1_ctc", "v1_rnnt"
        # v2_rnnt - является лучшей

        recognition_result = model.transcribe_longform(file_path)
        

        full_text = ' '.join([segment['transcription'] for segment in recognition_result])
        

        segments = []
        for segment in recognition_result:
            segments.append({
                'start': segment['boundaries'][0],
                'end': segment['boundaries'][1],
                'text': segment['transcription']
            })
        
        return {
            'text': full_text,
            'segments': segments,
            'time': time.time() - start_time,
            'device': device.upper()
        }
    except Exception as e:
        raise RuntimeError(f"GigaAM error: {str(e)}\n{traceback.format_exc()}")
    finally:
        if 'model' in locals():
            del model
        clear_memory()

def diarize_audio(file_path):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    try:
        clear_memory()
        start_time = time.time()
        pipeline = Pipeline.from_pretrained(
            DIARIZATION_MODEL,
            use_auth_token=AUTH_TOKEN
        ).to(torch.device(device))
        
        diarization = pipeline(file_path, num_speakers=NUM_SPEAKERS)
        
        return {
            'diarization': diarization,
            'time': time.time() - start_time,
            'device': device.upper()
        }
    except Exception as e:
        raise RuntimeError(f"Ошибка диаризации: {str(e)}")
    finally:
        del pipeline
        clear_memory()

def match_speakers(transcript_segments, diarization):
    matched = []
    for seg in transcript_segments:
        best_speaker = "UNKNOWN"
        max_overlap = 0
        for turn, _, speaker in diarization.itertracks(yield_label=True):
            overlap_start = max(seg['start'], turn.start)
            overlap_end = min(seg['end'], turn.end)
            if overlap_start < overlap_end:
                overlap = overlap_end - overlap_start
                if overlap > max_overlap:
                    max_overlap = overlap
                    best_speaker = speaker
        matched.append({
            'start': seg['start'],
            'end': seg['end'],
            'speaker': best_speaker,
            'text': seg['text'].strip()
        })
    return matched

def save_combined_results(file_name, all_results, diarize_time, audio_duration, preprocessing_enabled=False):
    output_file = f"{os.path.splitext(file_name)[0]}_combined_report.txt"
    
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("СРАВНИТЕЛЬНЫЙ ОТЧЕТ\n")
        f.write("###############################################\n\n")
        f.write(f"Файл: {file_name}\n")
        f.write(f"Предобработка аудио: {'включена' if preprocessing_enabled else 'выключена'}\n")
        f.write(f"Длительность записи: {audio_duration:.1f} сек\n")
        f.write(f"Время диаризации: {diarize_time:.1f} сек\n")
        f.write(f"Устройство диаризации: {all_results.get('diarization', {}).get('device', 'N/A')}\n\n")
        
        f.write("МЕТРИКИ КАЧЕСТВА:\n")
        f.write("{:<25} {:<10} {:<10} {:<10} {:<10} {:<10} {:<15} {:<15} {:<15}\n".format(
            "Модель", "WER", "CER", "MER", "WIL", "PIWER", "RTF", "Время (сек)", "Устройство"))
        
        for model_name, results in all_results.items():
            if model_name == 'diarization' or 'error' in results:
                continue
                
            time_str = f"{results.get('time', 'N/A'):.1f}" if 'time' in results else 'N/A'
            metrics = results.get('metrics')
            device = results.get('device', 'N/A')
            
            if metrics:
                f.write("{:<25} {:<10.2%} {:<10.2%} {:<10.2%} {:<10.2%} {:<10.2%} {:<15} {:<15} {:<15}\n".format(
                    f"\n {model_name} \n  → Base                 ",
                    metrics['Base']['WER'],
                    metrics['Base']['CER'],
                    metrics['Base']['MER'],
                    metrics['Base']['WIL'],
                    metrics['Base']['PIWER'],
                    round(float(time_str)/audio_duration, 2),
                    time_str,
                    device
                ))
                f.write("{:<25} {:<10.2%} {:<10.2%} {:<10.2%} {:<10.2%} {:<10.2%} {:<15} {:<15} {:<15}\n".format(
                    "  → Lemmatized",
                    metrics['Lemmatized']['WER'],
                    metrics['Lemmatized']['CER'],
                    metrics['Lemmatized']['MER'],
                    metrics['Lemmatized']['WIL'],
                    metrics['Lemmatized']['PIWER'],
                    "",
                    "",
                    ""
                ))
            else:
                f.write("{:<25} {:<10} {:<10} {:<10} {:<15} {:<15} {:<15}\n".format(
                    model_name, "N/A", "N/A", "N/A", "N/A", "N/A", round(float(time_str)/audio_duration, 2), time_str, device))

        for model_name, results in all_results.items():
            if model_name == 'diarization':
                continue
                
            f.write("\n\n###############################################\n")
            if 'error' in results:
                f.write(f"МОДЕЛЬ: {model_name} - ОШИБКА\n{results['error']}\n")
                continue
                
            f.write(f"МОДЕЛЬ: {model_name}\n")
            f.write(f"Устройство обработки: {results.get('device', 'N/A')}\n")
            if 'time' in results:
                f.write(f"Время обработки: {results['time']:.1f} сек\n")
            
            if 'segments' in results and results['segments']:
                f.write("\nТРАНСКРИПТ:\n")
                current_speaker = None
                for entry in results['segments']:
                    if entry['speaker'] != current_speaker:
                        f.write(f"\n[{entry['speaker']}]:\n")
                        current_speaker = entry['speaker']
                    f.write(f"{entry['text']} ")
            else:
                f.write("\nТранскрипт недоступен\n")
    
    return output_file

audio_uploader = widgets.FileUpload(accept='audio/*', multiple=False)
reference_uploader = widgets.FileUpload(accept='text/*', multiple=False)
process_btn = Button(description="Начать обработку", button_style='success')
status_label = Label(value='')
preprocessing_checkbox = widgets.Checkbox(
    value=False,
    description='Предобработка аудио',
    disabled=False
)


audio_uploader = widgets.FileUpload(accept='audio/*', multiple=False)
reference_uploader = widgets.FileUpload(accept='text/*', multiple=False)
process_btn = Button(description="Начать обработку", button_style='success')
status_label = Label(value='')
preprocessing_checkbox = widgets.Checkbox(
    value=False,
    description='Предобработка аудио',
    disabled=False
)

model_checkboxes = {
    'Whisper Medium': widgets.Checkbox(value=True, description='Whisper Medium'),
    'Whisper Large-v3-turbo': widgets.Checkbox(value=True, description='Whisper Large-v3-turbo'),
    'Vosk 0.42 RU': widgets.Checkbox(value=True, description='Vosk 0.42 RU'),
    'Seamless-M4T-v2-large': widgets.Checkbox(value=True, description='Seamless-M4T-v2-large'),
    'Wav2Vec-finetuned': widgets.Checkbox(value=True, description='Wav2Vec-finetuned'),
    'GigaAM': widgets.Checkbox(value=True, description='GigaAM')
}

models_mapping = {
    'Whisper Medium': transcribe_whisper_medium,
    'Whisper Large-v3-turbo': transcribe_whisper_large,
    'Vosk 0.42 RU': transcribe_vosk,
    'Seamless-M4T-v2-large': transcribe_seamless,
    'Wav2Vec-finetuned': transcribe_wav2vec,
    'GigaAM': transcribe_gigaam
}

model_selection = VBox([
    Label('Выберите модели для транскрибации:'),
    *model_checkboxes.values()
])

display(VBox([
    Label('Загрузите аудио файл:'),
    audio_uploader,
    Label('Загрузите эталонный текст (опционально):'),
    reference_uploader,
    model_selection,
    preprocessing_checkbox,
    process_btn,
    status_label
]))

def on_process_click(btn):
    status_label.value = ''
    
    try:
        if not audio_uploader.value:
            raise ValueError("Сначала загрузите аудио файл!")

        selected_models = {name: models_mapping[name] for name, cb in model_checkboxes.items() if cb.value}
        if not selected_models:
            raise ValueError("Выберите хотя бы одну модель для транскрибации!")

        audio_file = audio_uploader.value[0]
        file_name = audio_file['name']
        temp_path = f"./{file_name}"
        
        with open(temp_path, 'wb') as f:
            f.write(audio_file['content'])
            
        if not file_name.lower().endswith('.wav'):
            status_label.value = 'Конвертация в WAV...'
            file_path = convert_to_wav(temp_path)
            os.remove(temp_path)
        else:
            file_path = temp_path
        
        if preprocessing_checkbox.value:
            status_label.value = 'Предобработка аудио...'
            processed_path = apply_audio_preprocessing(file_path)
            os.remove(file_path)
            file_path = processed_path
        
        audio_duration = get_audio_duration(file_path)
        
        status_label.value = 'Диаризация...'
        diarization_result = diarize_audio(file_path)
        all_results = {'diarization': diarization_result}

        reference_text = None
        if reference_uploader.value:
            ref_file = reference_uploader.value[0]
            reference_text = bytes(ref_file['content']).decode('utf-8')

        for model_name, transcribe_fn in selected_models.items():
            try:
                status_label.value = f'Обработка {model_name}...'
                clear_memory()
                
                transcription = transcribe_fn(file_path)
                matched_segments = match_speakers(
                    transcription['segments'], 
                    diarization_result['diarization']
                )
                
                metrics = None
                if reference_text:
                    metrics = calculate_metrics(reference_text, transcription['text'])
                
                all_results[model_name] = {
                    'segments': matched_segments,
                    'time': transcription.get('time', 0),
                    'metrics': metrics,
                    'device': transcription.get('device', 'N/A')
                }
                
                clear_memory()
                
            except Exception as e:
                clear_memory()
                all_results[model_name] = {
                    'error': f"{str(e)}\n{traceback.format_exc()}",
                    'time': 0,
                    'device': 'N/A'
                }
                continue
        
        status_label.value = 'Генерация отчета...'
        output_file = save_combined_results(
            file_name,
            all_results,
            diarization_result['time'],
            audio_duration,
            preprocessing_checkbox.value
        )
        
        status_label.value = f'Готово! Результат: {output_file}'
    
    except Exception as e:
        status_label.value = f'Критическая ошибка: {str(e)}'
        traceback.print_exc()
    
    finally:
        if 'file_path' in locals() and os.path.exists(file_path):
            os.remove(file_path)
        clear_memory()

process_btn.on_click(on_process_click)

### Уведомление об окончании работы в телеграм (опционально)

In [None]:
import requests

def send_telegram_message(token, chat_id, text):
    url = f"https://api.telegram.org/bot{token}/sendMessage"
    params = {
        "chat_id": chat_id,
        "text": text,
        "parse_mode": "HTML" 
    }
    
    try:
        response = requests.post(url, data=params)
        result = response.json()
        if result.get("ok"):
            print("Сообщение успешно отправлено!")
        else:
            print("Ошибка отправки:", result.get("description"))
    except Exception as e:
        print("Критическая ошибка:", e)

TOKEN = #Токен телеграм-бота
CHAT_ID = #ID телеграм-чата для отправки
MESSAGE_TEXT = "Транскрибация завершена!!!"

send_telegram_message(TOKEN, CHAT_ID, MESSAGE_TEXT)

## Дополнительные инструменты работы (опционально)

### Объединение нескольких аудиофайлов в один

In [None]:
from tkinter import Tk, filedialog, Listbox, Button, Frame, END
from pydub import AudioSegment
import os

class AudioMerger:
    def __init__(self):
        self.root = Tk()
        self.root.title("Audio Merger")
        self.file_paths = []
        
        self.listbox = Listbox(self.root, width=80, height=15)
        self.frame = Frame(self.root)
        
        Button(self.frame, text="Добавить файлы", command=self.add_files).pack(side="left")
        Button(self.frame, text="Вверх", command=self.move_up).pack(side="left")
        Button(self.frame, text="Вниз", command=self.move_down).pack(side="left")
        Button(self.frame, text="Объединить", command=self.merge_files).pack(side="left")
        
        self.frame.pack(pady=10)
        self.listbox.pack(pady=5, padx=10)
        
    def add_files(self):
        new_files = filedialog.askopenfilenames(
            title="Выберите аудиофайлы",
            filetypes=[("Audio files", "*.wav *.mp3 *.ogg *.flac *.m4a *.mp4"), ("All files", "*.*")]
        )
        for f in new_files:
            self.listbox.insert(END, f)
    
    def move_up(self):
        pos = self.listbox.curselection()
        if pos and pos[0] > 0:
            index = pos[0]
            text = self.listbox.get(index)
            self.listbox.delete(index)
            self.listbox.insert(index-1, text)
            self.listbox.select_set(index-1)
    
    def move_down(self):
        pos = self.listbox.curselection()
        if pos and pos[0] < self.listbox.size()-1:
            index = pos[0]
            text = self.listbox.get(index)
            self.listbox.delete(index)
            self.listbox.insert(index+1, text)
            self.listbox.select_set(index+1)
    
    def merge_files(self):
        self.file_paths = list(self.listbox.get(0, END))
        if not self.file_paths:
            print("Нет файлов для объединения!")
            return
        
        combined = AudioSegment.empty()
        
        try:
            for file_path in self.file_paths:
                audio = AudioSegment.from_file(file_path)
                combined += audio
                print(f"Обработан: {os.path.basename(file_path)}")
            
            output_path = filedialog.asksaveasfilename(
                title="Сохранить объединенный файл",
                defaultextension=".m4a",
                filetypes=[
                    ("MPEG-4 Audio (AAC)", "*.m4a"),
                    ("MP4", "*.mp4"),
                    ("MP3", "*.mp3"),
                    ("WAV", "*.wav"),
                    ("Все файлы", "*.*")
                ]
            )
            
            if output_path:
                file_ext = output_path.split('.')[-1].lower()
                format_params = {}
                
                if file_ext in ['mp4', 'm4a']:
                    format_params = {
                        'format': 'mp4',
                        'codec': 'aac',
                        'bitrate': '192k'
                    }
                elif file_ext == 'mp3':
                    format_params = {
                        'format': 'mp3',
                        'bitrate': '192k'
                    }
                
                combined.export(output_path, **format_params)
                print(f"Файл сохранен: {output_path}")
                print("Объединение завершено успешно!")
        
        except Exception as e:
            print(f"Ошибка: {str(e)}")
    
    def run(self):
        self.root.mainloop()

if __name__ == "__main__":
    merger = AudioMerger()
    merger.run()

### Рассчет метрик качества транскрибации (без основного пайплайна)

In [None]:
import re
import traceback
from ipywidgets import widgets, Button, VBox, HBox, Label
from pymystem3 import Mystem
from jiwer import wer, cer, mer, wil


mystem = Mystem()


def normalize_text(text):
    text = text.replace('SPEAKER_00', ' ')
    text = text.replace('SPEAKER_01', ' ')
    text = text.replace('UNKNOWN', ' ') 
    text = text.lower().replace('ё', 'е')
    text = re.sub(r'[-–]', ' ', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()

    return text

def lemmatize_text(text):
    try:
        return ''.join(mystem.lemmatize(text)).strip()
    except Exception as e:
        print(f"Ошибка лемматизации: {str(e)}")
        return text


def calculate_metrics(reference, hypothesis):
    def safe_lemmatize(text):
        try:
            return lemmatize_text(text) if text else ""
        except:
            return text
    
    ref_norm = normalize_text(reference or "")
    hyp_norm = normalize_text(hypothesis or "")
    ref_lemma = safe_lemmatize(ref_norm)
    hyp_lemma = safe_lemmatize(hyp_norm)

    def calculate_level_metrics(ref, hyp):
        if not ref or not hyp:
            return {'WER': 1.0, 'CER': 1.0, 'MER': 1.0, 'WIL': 1.0, 'PIWER': 1.0}
        
        return {
            'WER': wer(ref, hyp),
            'CER': cer(ref, hyp),
            'MER': mer(ref, hyp),
            'WIL': wil(ref, hyp),
            'PIWER': 1 - len(set(ref.split()) & set(hyp.split())) / max(1, len(set(ref.split())))
        }
    
    return {
        'Base': calculate_level_metrics(ref_norm, hyp_norm),
        'Lemmatized': calculate_level_metrics(ref_lemma, hyp_lemma)
    }


metric_ref_uploader = widgets.FileUpload(
    accept='text/*', 
    multiple=False, 
    description='Оригинал'
)
metric_hyp_uploader = widgets.FileUpload(
    accept='text/*', 
    multiple=False, 
    description='ML-Транскрипт'
)
calc_metrics_btn = Button(
    description="Вычислить метрики", 
    button_style='info'
)
metrics_result_label = Label(value='')


def display_metrics(metrics):
    result = "МЕТРИКИ КАЧЕСТВА:"
    result += "{:<15} {:<10} {:<10} {:<10} {:<10} {:<10}\n".format(
        "Type", "WER", "CER", "MER", "WIL", "PIWER")
    
    for level in ['Base', 'Lemmatized']:
        result += "{:<15} {:<10.2%} {:<10.2%} {:<10.2%} {:<10.2%} {:<10.2%}".format(
            level,
            metrics[level]['WER'],
            metrics[level]['CER'],
            metrics[level]['MER'],
            metrics[level]['WIL'],
            metrics[level]['PIWER']
        )
    return result


def on_calc_metrics_click(btn):
    try:
        if not metric_ref_uploader.value or not metric_hyp_uploader.value:
            raise ValueError("Загрузите оба текстовых файла!")


        ref_content = bytes(metric_ref_uploader.value[0]['content']).decode('utf-8')
        hyp_content = bytes(metric_hyp_uploader.value[0]['content']).decode('utf-8')

        metrics = calculate_metrics(ref_content, hyp_content)
        

        metrics_result_label.value = display_metrics(metrics)
        
    except Exception as e:
        metrics_result_label.value = f"Ошибка: {str(e)}"
        traceback.print_exc()

calc_metrics_btn.on_click(on_calc_metrics_click)


display(VBox([
    Label('Сравнение двух текстовых файлов:'),
    HBox([metric_ref_uploader, metric_hyp_uploader]),
    calc_metrics_btn,
    metrics_result_label
]))