# Распознавание русской речи с помощью Whisper

Этот notebook использует модель `whisper-small` от OpenAI для распознавания русскоязычных аудиозаписей.

## Задача:
1. Взять датасет с https://disk.yandex.ru/d/v2Hipv7XG4fEDQ
2. Применить модель whisper-small из HuggingFace 
3. Вывести результат для 10 случайных аудио из датасета


In [None]:
# Установка зависимостей
!pip install torch transformers soundfile datasets requests tqdm ffmpeg-python torchaudio


In [None]:
# Импорт библиотек
import os
import random
import logging
import zipfile
import requests
from pathlib import Path
from typing import List, Dict
import torch
import torchaudio
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import numpy as np
from IPython.display import Audio, display
import warnings
warnings.filterwarnings('ignore')

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✅ Все библиотеки успешно импортированы!")


In [None]:
# Константы и настройки
MODEL_NAME = "openai/whisper-small"
SAMPLE_RATE = 16000
MAX_AUDIO_LENGTH = 30  # секунд
RANDOM_SEED = 42
NUM_SAMPLES = 10
DATASET_DIR = "dataset"

print(f"🔧 Настройки:")
print(f"   Модель: {MODEL_NAME}")
print(f"   Частота дискретизации: {SAMPLE_RATE} Hz")
print(f"   Максимальная длительность: {MAX_AUDIO_LENGTH} сек")
print(f"   Количество образцов: {NUM_SAMPLES}")
print(f"   Директория датасета: {DATASET_DIR}")

# Функция для загрузки аудио с torchaudio (только загрузка)
def load_audio_with_torchaudio(audio_path: str):
    """
    Загрузка аудио с использованием torchaudio для пересэмплирования
    """
    print(f"🎵 Загружаем аудио с torchaudio: {os.path.basename(audio_path)}")
    
    # Проверяем, что это действительно аудиофайл
    audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac'}
    file_ext = Path(audio_path).suffix.lower()
    
    if file_ext not in audio_extensions:
        print(f"❌ Неподдерживаемый формат: {file_ext}")
        return None, None
    
    # Проверяем, что файл не содержит подозрительные слова
    filename_lower = os.path.basename(audio_path).lower()
    if any(skip_word in filename_lower for skip_word in ['tsv', 'csv', 'txt', 'json', 'xml']):
        print(f"❌ Файл похож на текстовый: {os.path.basename(audio_path)}")
        return None, None
    
    try:
        # Загружаем аудио с помощью torchaudio
        wav, sr = torchaudio.load(audio_path)
        print(f"✅ Аудио загружено: {wav.shape}, частота: {sr} Hz")
        
        # Берем только первый канал если аудио стерео
        if wav.shape[0] > 1:
            wav = wav[0:1, :]  # Берем первый канал
            print(f"📊 Взят первый канал: {wav.shape}")
        
        # пересэмплируйте имеющиеся аудио в 16 kHz - пример кода ниже, можете его менять
        if sr != 16000:
            print(f"🔄 Пересэмплируем с {sr} Hz на 16000 Hz")
            wav = torchaudio.functional.resample(wav, orig_freq=sr, new_freq=16000)
            sr = 16000
            print(f"✅ Пересэмплирование завершено: {wav.shape}")
        
        # Ограничиваем длину аудио
        max_samples = MAX_AUDIO_LENGTH * sr
        if wav.shape[1] > max_samples:
            wav = wav[:, :max_samples]
            print(f"⚠️  Аудио обрезано до {MAX_AUDIO_LENGTH} сек")
        
        return wav, sr
        
    except Exception as e:
        print(f"❌ Ошибка при загрузке {audio_path}: {e}")
        return None, None

print("✅ Функция загрузки аудио с torchaudio готова!")


In [None]:
# Создание директории для датасета
os.makedirs(DATASET_DIR, exist_ok=True)

print(f"📁 Директория '{DATASET_DIR}' создана")
print(f"📥 Пожалуйста, скачайте датасет с https://disk.yandex.ru/d/v2Hipv7XG4fEDQ")
print(f"📂 И поместите аудиофайлы в папку '{DATASET_DIR}/'")
print(f"🎵 Поддерживаемые форматы: .wav, .mp3, .flac, .ogg, .m4a, .aac")


In [None]:
# Проверка содержимого директории dataset
print("🔍 Проверяем содержимое директории dataset:")
print("=" * 50)

if os.path.exists(DATASET_DIR):
    all_files = []
    for root, dirs, files in os.walk(DATASET_DIR):
        for file in files:
            file_path = os.path.join(root, file)
            file_size = os.path.getsize(file_path)
            file_ext = Path(file).suffix.lower()
            
            all_files.append({
                'name': file,
                'path': file_path,
                'size': file_size,
                'ext': file_ext
            })
    
    print(f"📊 Всего файлов в директории: {len(all_files)}")
    
    # Группируем по расширениям
    extensions = {}
    for file_info in all_files:
        ext = file_info['ext'] or 'без расширения'
        if ext not in extensions:
            extensions[ext] = []
        extensions[ext].append(file_info)
    
    print(f"\n📋 Файлы по типам:")
    for ext, files in extensions.items():
        print(f"   {ext}: {len(files)} файлов")
        for file_info in files[:3]:  # Показываем первые 3 файла каждого типа
            size_mb = file_info['size'] / (1024 * 1024)
            print(f"      • {file_info['name']} ({size_mb:.2f} MB)")
        if len(files) > 3:
            print(f"      ... и еще {len(files) - 3} файлов")
    
    # Показываем подозрительные файлы
    suspicious_files = [f for f in all_files if any(word in f['name'].lower() for word in ['tsv', 'csv', 'txt', 'json', 'xml'])]
    if suspicious_files:
        print(f"\n⚠️  Найдены подозрительные (не-аудио) файлы:")
        for file_info in suspicious_files:
            print(f"   • {file_info['name']} ({file_info['ext']})")
    
else:
    print(f"❌ Директория {DATASET_DIR} не существует")


In [None]:
# Простая загрузка TSV файла
import pandas as pd

def load_tsv_file(tsv_path: str):
    """
    Простая загрузка TSV файла
    """
    
    try:
        df = pd.read_csv(tsv_path, sep='\t', encoding='utf-8')
        
        print(f"✅ TSV файл загружен успешно!")
        print(f"📊 Размер: {df.shape[0]} строк, {df.shape[1]} столбцов")
        print(f"📋 Столбцы: {list(df.columns)}")
        
        return df
        
    except Exception as e:
        print(f"❌ Ошибка при загрузке TSV: {e}")
        
        return None


In [None]:
# Простая загрузка TSV файла
tsv_files = []
df = None

# Ищем TSV файлы в директории dataset
if os.path.exists(DATASET_DIR):
    for root, dirs, files in os.walk(DATASET_DIR):
        for file in files:
            if file.lower().endswith('.tsv'):
                tsv_path = os.path.join(root, file)
                tsv_files.append(tsv_path)
                print(f"📋 Найден TSV файл: {file}")

if tsv_files:
    print(f"\n🔍 Загружаем TSV файл: {os.path.basename(tsv_files[0])}")
    df = load_tsv_file(tsv_files[0])

else:
    print("❌ TSV файлы не найдены в директории dataset")
    print("💡 Убедитесь, что файл urls_normalized.tsv находится в папке dataset/")


In [None]:
# Скачивание файла с Яндекс.Диска
import requests
import re

def download_from_yandex_disk(public_url: str, output_filename: str = "urls_normalized.tsv"):
    """
    Скачивание файла с Яндекс.Диска по публичной ссылке
    """
    print(f"📥 Пытаемся скачать файл с Яндекс.Диска...")
    
    # API Яндекс.Диска для получения прямой ссылки
    api_url = "https://cloud-api.yandex.net/v1/disk/public/resources/download"
    
    try:
        # Получаем прямую ссылку на скачивание
        response = requests.get(api_url, params={'public_key': public_url})
        
        if response.status_code == 200:
            download_info = response.json()
            download_url = download_info['href']
            
            print(f"✅ Получена прямая ссылка для скачивания")
            
            # Скачиваем файл
            file_response = requests.get(download_url)
            
            if file_response.status_code == 200:
                output_path = os.path.join(DATASET_DIR, output_filename)
                
                with open(output_path, 'wb') as f:
                    f.write(file_response.content)
                
                print(f"✅ Файл успешно скачан: {output_path}")
                print(f"📊 Размер файла: {len(file_response.content) / 1024:.2f} KB")
                
                return output_path
            else:
                print(f"❌ Ошибка скачивания файла: {file_response.status_code}")
                return None
        else:
            print(f"❌ Ошибка получения ссылки: {response.status_code}")
            print(f"Ответ: {response.text}")
            return None
            
    except Exception as e:
        print(f"❌ Ошибка: {e}")
        return None

# Альтернативный способ - ручные инструкции
def show_manual_download_instructions():
    """
    Показывает инструкции для ручного скачивания
    """
    print("📋 ИНСТРУКЦИИ ДЛЯ РУЧНОГО СКАЧИВАНИЯ:")
    print("=" * 50)
    print("1. Откройте ссылку в браузере: https://disk.yandex.ru/d/v2Hipv7XG4fEDQ")
    print("2. Нажмите кнопку 'Скачать' на странице")
    print("3. Сохраните файл как 'urls_normalized.tsv'")
    print("4. Поместите файл в папку 'dataset/' в этом проекте")
    print("5. Перезапустите ячейки notebook")
    print("=" * 50)

print("✅ Функции для скачивания готовы!")


In [None]:
# Попытка скачать файл с Яндекс.Диска
yandex_url = "https://disk.yandex.ru/d/v2Hipv7XG4fEDQ"

print("🚀 Пытаемся автоматически скачать файл...")

# Сначала проверяем, есть ли уже файл
tsv_path = os.path.join(DATASET_DIR, "urls_normalized.tsv")
if os.path.exists(tsv_path):
    print(f"✅ Файл уже существует: {tsv_path}")
    file_size = os.path.getsize(tsv_path) / 1024
    print(f"📊 Размер: {file_size:.2f} KB")
else:
    # Пытаемся скачать
    downloaded_path = download_from_yandex_disk(yandex_url)
    
    if downloaded_path is None:
        print("\n" + "="*60)
        print("❌ Автоматическое скачивание не удалось")
        print("💡 Попробуйте ручное скачивание:")
        print()
        show_manual_download_instructions()
        print("\n🔧 Альтернативные способы:")
        print("1. Используйте wget с прямой ссылкой (если есть)")
        print("2. Используйте curl")
        print("3. Скачайте через браузер")
        print("="*60)


In [None]:
# Альтернативные способы скачивания
print("🔧 АЛЬТЕРНАТИВНЫЕ КОМАНДЫ ДЛЯ СКАЧИВАНИЯ:")
print("=" * 60)

print("\n1️⃣ Если у вас есть прямая ссылка на файл:")
print("!wget 'ПРЯМАЯ_ССЫЛКА' -O dataset/urls_normalized.tsv")

print("\n2️⃣ Используя curl:")
print("!curl -L 'ПРЯМАЯ_ССЫЛКА' -o dataset/urls_normalized.tsv")

print("\n3️⃣ Для получения прямой ссылки на Яндекс.Диске:")
print("- Откройте https://disk.yandex.ru/d/v2Hipv7XG4fEDQ")
print("- Нажмите правой кнопкой на файл → 'Скопировать прямую ссылку'")
print("- Или нажмите 'Скачать' и скопируйте ссылку из адресной строки")

print("\n4️⃣ Ручное скачивание:")
print("- Откройте ссылку в браузере")
print("- Нажмите 'Скачать'")
print("- Сохраните файл в папку dataset/ как urls_normalized.tsv")

print("\n💡 После скачивания запустите следующие ячейки для загрузки TSV")
print("=" * 60)


In [None]:
# Функции для работы с аудио URL из TSV
import urllib.request
import tempfile
from urllib.parse import urlparse

def download_audio_from_url(url: str, timeout: int = 30):
    """
    Скачивает аудиофайл по URL во временный файл
    """
    try:
        print(f"📥 Скачиваем аудио: {url[:50]}...")
        
        # Создаем временный файл
        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav')
        temp_path = temp_file.name
        temp_file.close()
        
        # Скачиваем файл
        urllib.request.urlretrieve(url, temp_path)
        
        # Проверяем размер файла
        file_size = os.path.getsize(temp_path)
        print(f"✅ Скачано: {file_size / 1024:.2f} KB")
        
        return temp_path
        
    except Exception as e:
        print(f"❌ Ошибка скачивания: {e}")
        return None

def process_audio_url_with_whisper(url: str, metadata_row=None):
    """
    Скачивает аудио по URL и обрабатывает через Whisper
    """
    print(f"\n🎵 Обрабатываем URL: {url[:60]}...")
    
    # Скачиваем аудио
    temp_audio_path = download_audio_from_url(url)
    
    if temp_audio_path is None:
        return {
            "url": url,
            "text": "ОШИБКА: Не удалось скачать аудио",
            "error": True
        }
    
    try:
        # Загружаем аудио с torchaudio
        wav, sr = load_audio_with_torchaudio(temp_audio_path)
        
        if wav is None or sr is None:
            return {
                "url": url,
                "text": "ОШИБКА: Не удалось загрузить аудио",
                "error": True
            }
        
        # Загружаем модель Whisper (если еще не загружена)
        if 'processor' not in globals():
            global processor, model, device
            print("🤖 Загружаем модель Whisper...")
            processor = WhisperProcessor.from_pretrained(MODEL_NAME)
            model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME)
            
            # Устанавливаем русский язык
            model.config.forced_decoder_ids = processor.get_decoder_prompt_ids(
                language="russian", 
                task="transcribe"
            )
            
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            model.to(device)
            print(f"✅ Модель загружена на {device}")
        
        # Конвертируем в numpy array
        audio_array = wav.squeeze().numpy()
        
        # Подготавливаем входные данные
        input_features = processor(
            audio_array, 
            sampling_rate=sr, 
            return_tensors="pt"
        ).input_features.to(device)
        
        # Генерируем транскрипцию
        with torch.no_grad():
            predicted_ids = model.generate(input_features)
        
        # Декодируем результат
        transcription = processor.batch_decode(
            predicted_ids, 
            skip_special_tokens=True
        )[0]
        
        result = {
            "url": url,
            "text": transcription.strip(),
            "error": False,
            "duration": wav.shape[1] / sr,
            "sample_rate": sr
        }
        
        # Добавляем метаданные если есть
        if metadata_row is not None:
            for col, value in metadata_row.items():
                if pd.notna(value) and col.lower() not in ['url', 'path', 'file']:
                    result[f'metadata_{col.lower()}'] = value
        
        print(f"✅ Распознано: {transcription[:100]}...")
        
        return result
        
    except Exception as e:
        print(f"❌ Ошибка обработки: {e}")
        return {
            "url": url,
            "text": f"ОШИБКА: {str(e)}",
            "error": True
        }
    
    finally:
        # Удаляем временный файл
        try:
            if temp_audio_path and os.path.exists(temp_audio_path):
                os.unlink(temp_audio_path)
        except:
            pass

def extract_audio_urls_from_tsv(df):
    """
    Извлекает URL-ы аудиофайлов из TSV
    """
    if df is None:
        return []
    
    audio_urls = []
    
    # Ищем столбцы с URL-ами
    for col in df.columns:
        if any(keyword in col.lower() for keyword in ['url', 'path', 'link', 'audio']):
            for idx, url in df[col].dropna().items():
                if isinstance(url, str) and ('http' in url or 'storage.mds.yandex.net' in url):
                    audio_urls.append({
                        'url': url,
                        'row_index': idx,
                        'metadata': df.iloc[idx].to_dict()
                    })
    
    print(f"🎵 Найдено {len(audio_urls)} аудио URL-ов")
    return audio_urls

print("✅ Функции для работы с аудио URL готовы!")


In [None]:
# Обработка аудио URL из TSV файла
if 'df' in globals() and df is not None:
    print("🚀 Извлекаем аудио URL из TSV...")
    
    # Извлекаем URL-ы
    audio_urls = extract_audio_urls_from_tsv(df)
    
    if audio_urls:
        print(f"📊 Найдено {len(audio_urls)} аудио URL-ов")
        print(f"🎯 Будем обрабатывать первые {min(NUM_SAMPLES, len(audio_urls))} образцов")
        
        # Показываем примеры URL-ов
        print(f"\n📋 Примеры URL-ов:")
        for i, audio_info in enumerate(audio_urls[:5], 1):
            url = audio_info['url']
            print(f"   {i}. {url[:80]}...")
        
        # Выбираем случайные URL-ы для обработки
        import random
        random.seed(RANDOM_SEED)
        selected_urls = random.sample(audio_urls, min(NUM_SAMPLES, len(audio_urls)))
        
        print(f"\n🎲 Выбрано {len(selected_urls)} случайных URL-ов для обработки")
        
        # Обрабатываем выбранные URL-ы
        print("\n" + "="*70)
        print("🚀 НАЧИНАЕМ ОБРАБОТКУ АУДИО ИЗ TSV")
        print("="*70)
        
        url_results = []
        
        for i, audio_info in enumerate(selected_urls, 1):
            print(f"\n📊 Обрабатываем аудио {i}/{len(selected_urls)}")
            print(f"🔗 URL: {audio_info['url'][:80]}...")
            
            # Обрабатываем аудио
            result = process_audio_url_with_whisper(
                audio_info['url'], 
                audio_info['metadata']
            )
            
            url_results.append(result)
            
            # Показываем результат
            if not result.get('error', False):
                print(f"✅ Результат: \"{result['text'][:100]}...\"")
                print(f"⏱️  Длительность: {result.get('duration', 0):.2f} сек")
                
                # Показываем метаданные если есть
                metadata_keys = [k for k in result.keys() if k.startswith('metadata_')]
                if metadata_keys:
                    print(f"📋 Метаданные: {len(metadata_keys)} полей")
            else:
                print(f"❌ {result['text']}")
            
            print("-" * 50)
        
        # Итоговая статистика
        successful_results = [r for r in url_results if not r.get('error', False)]
        
        print(f"\n🎉 ОБРАБОТКА ЗАВЕРШЕНА!")
        print(f"📊 Статистика:")
        print(f"   Всего обработано: {len(url_results)}")
        print(f"   Успешно: {len(successful_results)}")
        print(f"   Ошибок: {len(url_results) - len(successful_results)}")
        
        if successful_results:
            avg_duration = sum(r.get('duration', 0) for r in successful_results) / len(successful_results)
            total_duration = sum(r.get('duration', 0) for r in successful_results)
            print(f"   Средняя длительность: {avg_duration:.2f} сек")
            print(f"   Общая длительность: {total_duration:.2f} сек")
        
        # Сохраняем результаты в глобальную переменную
        globals()['url_results'] = url_results
        
    else:
        print("❌ Аудио URL-ы не найдены в TSV файле")
        print("💡 Проверьте, что TSV содержит столбцы с URL-ами")

else:
    print("❌ TSV файл не загружен")
    print("💡 Сначала запустите ячейки для загрузки TSV файла")


In [None]:
# Итоговые результаты распознавания речи из TSV
if 'url_results' in globals() and url_results:
    print("\n" + "="*80)
    print("🎯 ИТОГОВЫЕ РЕЗУЛЬТАТЫ РАСПОЗНАВАНИЯ РЕЧИ ИЗ TSV")
    print("="*80)
    
    successful_results = [r for r in url_results if not r.get('error', False)]
    error_results = [r for r in url_results if r.get('error', False)]
    
    # Показываем все успешные результаты
    for i, result in enumerate(successful_results, 1):
        print(f"\n{i}. 🔗 URL: {result['url'][:60]}...")
        print(f"   📝 Распознанный текст:")
        print(f"   \"{result['text']}\"")
        print(f"   ⏱️  Длительность: {result.get('duration', 0):.2f} сек")
        
        # Показываем метаданные
        metadata_keys = [k for k in result.keys() if k.startswith('metadata_')]
        if metadata_keys:
            print(f"   📋 Метаданные:")
            for key in metadata_keys[:3]:  # Показываем первые 3 метаданных
                value = result[key]
                if isinstance(value, str) and len(value) > 50:
                    value = value[:50] + "..."
                print(f"      {key.replace('metadata_', '')}: {value}")
        
        print("-" * 60)
    
    # Показываем ошибки если есть
    if error_results:
        print(f"\n❌ ОШИБКИ ({len(error_results)} URL-ов):")
        for result in error_results:
            print(f"   • {result['url'][:60]}...: {result['text']}")
    
    # Финальная статистика
    print(f"\n📊 ФИНАЛЬНАЯ СТАТИСТИКА:")
    print(f"   📈 Всего URL-ов обработано: {len(url_results)}")
    print(f"   ✅ Успешно распознано: {len(successful_results)}")
    print(f"   ❌ Ошибок: {len(error_results)}")
    print(f"   📊 Процент успеха: {len(successful_results)/len(url_results)*100:.1f}%")
    
    if successful_results:
        # Статистика по длительности
        durations = [r.get('duration', 0) for r in successful_results]
        total_duration = sum(durations)
        avg_duration = total_duration / len(durations)
        
        print(f"   ⏱️  Общая длительность аудио: {total_duration:.1f} сек ({total_duration/60:.1f} мин)")
        print(f"   ⏱️  Средняя длительность: {avg_duration:.2f} сек")
        
        # Статистика по тексту
        text_lengths = [len(r['text'].split()) for r in successful_results]
        avg_words = sum(text_lengths) / len(text_lengths)
        total_words = sum(text_lengths)
        
        print(f"   📝 Всего слов распознано: {total_words}")
        print(f"   📝 Среднее количество слов: {avg_words:.1f}")
    
    print("\n🎉 Обработка датасета завершена!")
    
    # Сохраняем результаты в файл (опционально)
    try:
        import json
        results_file = os.path.join(DATASET_DIR, "whisper_results.json")
        
        # Подготавливаем данные для сохранения
        save_data = {
            "total_processed": len(url_results),
            "successful": len(successful_results),
            "errors": len(error_results),
            "results": url_results
        }
        
        with open(results_file, 'w', encoding='utf-8') as f:
            json.dump(save_data, f, ensure_ascii=False, indent=2)
        
        print(f"💾 Результаты сохранены в файл: {results_file}")
        
    except Exception as e:
        print(f"⚠️  Не удалось сохранить результаты: {e}")

else:
    print("❌ Результаты обработки URL-ов не найдены")
    print("💡 Сначала запустите предыдущую ячейку для обработки аудио из TSV")


In [None]:
# Загрузка модели Whisper
print("🤖 Загружаем модель Whisper...")

# Определяем устройство (GPU если доступно)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device_name = "GPU (CUDA)" if torch.cuda.is_available() else "CPU"
print(f"💻 Используем устройство: {device_name}")

# Загружаем процессор и модель
processor = WhisperProcessor.from_pretrained(MODEL_NAME)
model = WhisperForConditionalGeneration.from_pretrained(MODEL_NAME)
model.to(device)

# Устанавливаем русский язык для модели
model.config.forced_decoder_ids = processor.get_decoder_prompt_ids(
    language="russian", task="transcribe"
)

print("✅ Модель Whisper успешно загружена!")
print(f"🇷🇺 Язык установлен: русский")
print(f"📊 Размер модели: ~244MB")


In [None]:
# Поиск аудиофайлов в датасете
def find_audio_files(directory: str) -> List[str]:
    """Поиск аудиофайлов в директории"""
    audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac'}
    audio_files = []
    
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)
            file_ext = Path(file).suffix.lower()
            
            # Проверяем расширение файла
            if file_ext in audio_extensions:
                # Дополнительная проверка - пропускаем явно не-аудио файлы
                if not any(skip_word in file.lower() for skip_word in ['tsv', 'csv', 'txt', 'json', 'xml']):
                    audio_files.append(file_path)
                    print(f"✅ Найден аудиофайл: {file}")
                else:
                    print(f"⚠️  Пропускаем файл: {file} (похож на текстовый)")
            else:
                print(f"⚠️  Пропускаем файл: {file} (неподдерживаемый формат: {file_ext})")
    
    return audio_files

# Ищем аудиофайлы
audio_files = find_audio_files(DATASET_DIR)

print(f"🔍 Поиск аудиофайлов в '{DATASET_DIR}'...")
print(f"📊 Найдено аудиофайлов: {len(audio_files)}")

if audio_files:
    print(f"📝 Примеры найденных файлов:")
    for i, file in enumerate(audio_files[:5]):
        print(f"   {i+1}. {os.path.basename(file)}")
    if len(audio_files) > 5:
        print(f"   ... и еще {len(audio_files) - 5} файлов")
else:
    print("❌ Аудиофайлы не найдены!")
    print(f"📂 Поместите аудиофайлы в папку '{DATASET_DIR}/'")
    print("🎵 Поддерживаемые форматы: .wav, .mp3, .flac, .ogg, .m4a, .aac")


In [None]:
# Выбор случайных файлов для обработки
def select_random_files(files: List[str], num_samples: int = NUM_SAMPLES) -> List[str]:
    """Выбор случайных файлов из списка"""
    random.seed(RANDOM_SEED)
    
    if len(files) < num_samples:
        print(f"⚠️  Доступно только {len(files)} файлов, используем все")
        return files
    
    selected = random.sample(files, num_samples)
    return selected

# Выбираем случайные файлы
if audio_files:
    selected_files = select_random_files(audio_files, NUM_SAMPLES)
    
    print(f"🎲 Выбрано {len(selected_files)} случайных файлов для обработки:")
    for i, file in enumerate(selected_files, 1):
        print(f"   {i}. {os.path.basename(file)}")
else:
    selected_files = []
    print("❌ Нет файлов для выбора")


In [None]:
# Функции для обработки аудио
def load_audio(audio_path: str) -> np.ndarray:
    """Загрузка и предобработка аудиофайла"""
    # Проверяем расширение файла
    audio_extensions = {'.wav', '.mp3', '.flac', '.ogg', '.m4a', '.aac'}
    file_ext = Path(audio_path).suffix.lower()
    
    if file_ext not in audio_extensions:
        print(f"❌ Неподдерживаемый формат: {file_ext}")
        return None
    
    # Проверяем имя файла на подозрительные слова
    filename_lower = os.path.basename(audio_path).lower()
    if any(skip_word in filename_lower for skip_word in ['tsv', 'csv', 'txt', 'json', 'xml']):
        print(f"❌ Файл {os.path.basename(audio_path)} похож на текстовый, пропускаем")
        return None
    
    try:
        # Загружаем аудио с нужной частотой дискретизации
        audio, sr = librosa.load(audio_path, sr=SAMPLE_RATE)
        
        # Ограничиваем длину аудио
        max_samples = MAX_AUDIO_LENGTH * SAMPLE_RATE
        if len(audio) > max_samples:
            audio = audio[:max_samples]
            print(f"⚠️  Аудио {os.path.basename(audio_path)} обрезано до {MAX_AUDIO_LENGTH} сек")
        
        return audio
        
    except Exception as e:
        print(f"❌ Ошибка при загрузке {audio_path}: {e}")
        return None

def transcribe_audio(audio_path: str) -> Dict[str, str]:
    """Распознавание речи в аудиофайле"""
    print(f"🎵 Обрабатываем: {os.path.basename(audio_path)}")
    
    # Загружаем аудио
    audio = load_audio(audio_path)
    if audio is None:
        return {"file": os.path.basename(audio_path), "text": "ОШИБКА ЗАГРУЗКИ", "error": True}
    
    try:
        # Подготавливаем входные данные
        input_features = processor(
            audio, 
            sampling_rate=SAMPLE_RATE, 
            return_tensors="pt"
        ).input_features.to(device)
        
        # Генерируем транскрипцию
        with torch.no_grad():
            predicted_ids = model.generate(input_features)
        
        # Декодируем результат
        transcription = processor.batch_decode(
            predicted_ids, 
            skip_special_tokens=True
        )[0]
        
        return {
            "file": os.path.basename(audio_path),
            "text": transcription.strip(),
            "error": False,
            "audio_path": audio_path
        }
        
    except Exception as e:
        print(f"❌ Ошибка при обработке {audio_path}: {e}")
        return {"file": os.path.basename(audio_path), "text": f"ОШИБКА: {str(e)}", "error": True}

print("✅ Функции для обработки аудио готовы!")


In [None]:
# Обработка выбранных аудиофайлов
if selected_files:
    print("🚀 Начинаем обработку аудиофайлов...")
    print("=" * 60)
    
    results = []
    
    for i, audio_file in enumerate(selected_files, 1):
        print(f"\n📊 Обрабатываем файл {i}/{len(selected_files)}")
        result = transcribe_audio(audio_file)
        results.append(result)
        
        # Показываем результат сразу
        if not result.get('error', False):
            print(f"✅ Результат: {result['text']}")
        else:
            print(f"❌ {result['text']}")
        
        print("-" * 40)
    
    print(f"\n🎉 Обработка завершена! Обработано {len(results)} файлов")
    
else:
    results = []
    print("❌ Нет файлов для обработки")


In [None]:
# Итоговые результаты и статистика
if results:
    print("\n" + "=" * 80)
    print("🎯 ИТОГОВЫЕ РЕЗУЛЬТАТЫ РАСПОЗНАВАНИЯ РЕЧИ")
    print("=" * 80)
    
    successful_results = [r for r in results if not r.get('error', False)]
    error_results = [r for r in results if r.get('error', False)]
    
    # Показываем все успешные результаты
    for i, result in enumerate(successful_results, 1):
        print(f"\n{i}. 📁 Файл: {result['file']}")
        print(f"   📝 Распознанный текст:")
        print(f"   \"{result['text']}\"")
        print("-" * 50)
    
    # Показываем ошибки если есть
    if error_results:
        print(f"\n❌ ОШИБКИ ({len(error_results)} файлов):")
        for result in error_results:
            print(f"   • {result['file']}: {result['text']}")
    
    # Статистика
    print(f"\n📊 СТАТИСТИКА:")
    print(f"   📈 Всего обработано: {len(results)}")
    print(f"   ✅ Успешно: {len(successful_results)}")
    print(f"   ❌ Ошибок: {len(error_results)}")
    print(f"   📊 Процент успеха: {len(successful_results)/len(results)*100:.1f}%")
    
    # Показываем примеры аудио (если доступны)
    print(f"\n🎵 ПРИМЕРЫ АУДИО:")
    for i, result in enumerate(successful_results[:3], 1):
        if 'audio_path' in result and os.path.exists(result['audio_path']):
            print(f"\n{i}. {result['file']}:")
            print(f"   Текст: \"{result['text']}\"")
            try:
                display(Audio(result['audio_path']))
            except:
                print("   (Аудио недоступно для воспроизведения)")
    
else:
    print("❌ Нет результатов для отображения")


## Дополнительные возможности

### Тестирование с демо-аудио
Если у вас нет датасета, можно протестировать на демо-файлах:


In [None]:
# Тестирование обработки с метаданными из TSV
if 'tsv_audio_paths' in globals() and tsv_audio_paths and metadata_df is not None:
    print("🚀 Тестируем обработку аудио с метаданными из TSV...")
    print("=" * 70)
    
    # Выбираем случайные файлы из TSV
    import random
    random.seed(RANDOM_SEED)
    test_files = random.sample(tsv_audio_paths, min(5, len(tsv_audio_paths)))
    
    metadata_results = []
    
    for i, audio_file in enumerate(test_files, 1):
        print(f"\n📊 Обрабатываем файл {i}/{len(test_files)}")
        result = process_audio_with_metadata(audio_file, metadata_df)
        metadata_results.append(result)
        
        # Показываем результат с метаданными
        if not result.get('error', False):
            print(f"✅ Whisper результат: {result['text']}")
            
            # Показываем метаданные если есть
            metadata_keys = [k for k in result.keys() if k.startswith('metadata_')]
            if metadata_keys:
                print(f"📋 Метаданные:")
                for key in metadata_keys:
                    print(f"   {key.replace('metadata_', '')}: {result[key]}")
                
                # Сравниваем с ground truth если есть транскрипция
                ground_truth_key = next((k for k in metadata_keys if 'text' in k or 'transcript' in k), None)
                if ground_truth_key:
                    ground_truth = result[ground_truth_key]
                    comparison = compare_with_ground_truth(result['text'], ground_truth)
                    
                    print(f"🔍 Сравнение с эталоном:")
                    print(f"   Jaccard similarity: {comparison['jaccard_similarity']:.3f}")
                    print(f"   Соотношение длин: {comparison['length_ratio']:.3f}")
                    print(f"   Слов Whisper: {comparison['whisper_words']}")
                    print(f"   Слов эталон: {comparison['truth_words']}")
            else:
                print("📋 Метаданные для этого файла не найдены")
        else:
            print(f"❌ {result['text']}")
        
        print("-" * 50)
    
    # Общая статистика
    successful_results = [r for r in metadata_results if not r.get('error', False)]
    if successful_results:
        print(f"\n📊 ОБЩАЯ СТАТИСТИКА:")
        print(f"   Успешно обработано: {len(successful_results)}/{len(metadata_results)}")
        
        # Средние метрики сравнения
        comparisons = []
        for result in successful_results:
            ground_truth_key = next((k for k in result.keys() if k.startswith('metadata_') and ('text' in k or 'transcript' in k)), None)
            if ground_truth_key:
                comparison = compare_with_ground_truth(result['text'], result[ground_truth_key])
                comparisons.append(comparison)
        
        if comparisons:
            avg_jaccard = sum(c['jaccard_similarity'] for c in comparisons) / len(comparisons)
            avg_length_ratio = sum(c['length_ratio'] for c in comparisons) / len(comparisons)
            
            print(f"   Средняя Jaccard similarity: {avg_jaccard:.3f}")
            print(f"   Среднее соотношение длин: {avg_length_ratio:.3f}")

elif 'selected_files' in globals() and selected_files:
    print("⚠️  TSV метаданные недоступны, используем обычную обработку")
    print("💡 Убедитесь, что файл urls_normalized.tsv находится в папке dataset/")
else:
    print("❌ Нет аудиофайлов для тестирования")
    print("💡 Сначала запустите предыдущие ячейки")


In [None]:
# Создание демо-аудио для тестирования (опционально)
import numpy as np
from scipy.io.wavfile import write

def create_demo_audio():
    """Создает простой демо-аудиофайл для тестирования"""
    # Создаем простой тон (440 Hz - нота Ля)
    duration = 3  # секунды
    sample_rate = 16000
    t = np.linspace(0, duration, int(sample_rate * duration), False)
    frequency = 440  # Hz
    audio_data = np.sin(2 * np.pi * frequency * t)
    
    # Добавляем затухание
    fade_samples = int(0.1 * sample_rate)  # 0.1 секунды затухания
    audio_data[:fade_samples] *= np.linspace(0, 1, fade_samples)
    audio_data[-fade_samples:] *= np.linspace(1, 0, fade_samples)
    
    # Нормализуем и конвертируем в int16
    audio_data = (audio_data * 32767).astype(np.int16)
    
    # Сохраняем файл
    demo_path = os.path.join(DATASET_DIR, "demo_tone.wav")
    write(demo_path, sample_rate, audio_data)
    
    return demo_path

# Создаем демо-файл если нет других аудиофайлов
if not audio_files:
    print("🎵 Создаем демо-аудиофайл для тестирования...")
    try:
        from scipy.io.wavfile import write
        demo_file = create_demo_audio()
        print(f"✅ Создан демо-файл: {demo_file}")
        print("⚠️  Примечание: это синтетический тон, не речь")
    except ImportError:
        print("❌ Для создания демо-файла нужна библиотека scipy")
        print("Установите: pip install scipy")
    except Exception as e:
        print(f"❌ Ошибка создания демо-файла: {e}")
else:
    print("✅ Датасет найден, демо-файл не нужен")


In [None]:
# Тестирование загрузки аудио с torchaudio
# Эта ячейка тестирует только загрузку и пересэмплирование

if 'selected_files' in globals() and selected_files:
    print("🚀 Тестируем загрузку аудио с torchaudio...")
    print("=" * 60)
    
    # Тестируем первые 3 файла
    test_files = selected_files[:3]
    
    for i, audio_file in enumerate(test_files, 1):
        print(f"\n📊 Тестируем файл {i}/{len(test_files)}: {os.path.basename(audio_file)}")
        
        wav, sr = load_audio_with_torchaudio(audio_file)
        
        if wav is not None and sr is not None:
            print(f"✅ Успешно загружено!")
            print(f"   Форма тензора: {wav.shape}")
            print(f"   Частота дискретизации: {sr} Hz")
            print(f"   Длительность: {wav.shape[1] / sr:.2f} сек")
            print(f"   Количество каналов: {wav.shape[0]}")
        else:
            print(f"❌ Ошибка загрузки")
        
        print("-" * 40)
    
    print(f"\n🎉 Тестирование загрузки завершено!")
    
elif 'audio_files' in globals() and audio_files:
    print("🚀 Тестируем загрузку на найденных аудиофайлах...")
    test_files = audio_files[:3]
    
    for i, audio_file in enumerate(test_files, 1):
        print(f"\n📊 Тестируем файл {i}/{len(test_files)}: {os.path.basename(audio_file)}")
        
        wav, sr = load_audio_with_torchaudio(audio_file)
        
        if wav is not None and sr is not None:
            print(f"✅ Успешно загружено!")
            print(f"   Форма тензора: {wav.shape}")
            print(f"   Частота дискретизации: {sr} Hz")
            print(f"   Длительность: {wav.shape[1] / sr:.2f} сек")
        else:
            print(f"❌ Ошибка загрузки")
        
        print("-" * 40)

else:
    print("❌ Нет аудиофайлов для тестирования")
    print("💡 Сначала запустите предыдущие ячейки или поместите аудиофайлы в папку 'dataset/'")
