# LLM‑RAG Project: полный цикл пайплайна
Аудио → транскрипт (faster-whisper) → нормализация → чанкинг → эмбеддинги (E5) → ChromaDB → онлайн‑RAG.
**Как пользоваться:**
1) Укажите пути и параметры в блоке *Config*.
2) (Опционально) Снимите флаг `USE_FAKE_ASR` для реального ASR.
3) Запускайте шаги 1→4 по порядку.

In [None]:
# Установка зависимостей (при необходимости):#
!pip install faster-whisper ctranslate2 torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu121
!pip install sentence-transformers chromadb python-dotenv PyYAML openai librosa torchaudio langchain gradio

In [None]:
# Импорты и версии
from __future__ import annotations
import os, json, re, math, uuid, time
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Iterable
import numpy as np

try:
    import torch
    TORCH_OK = True
except Exception:
    TORCH_OK = False

from dotenv import load_dotenv; load_dotenv()
print('Python OK. Torch:', TORCH_OK)

Python OK. Torch: True


In [None]:
# Config
from dataclasses import dataclass
from pathlib import Path

@dataclass
class Config:
    INPUT_DIR: Path = Path('data/audio')
    ARTIFACTS_DIR: Path = Path('artifacts')
    PERSIST_DIR: Path = Path('data/chroma_db')
    COLLECTION: str = 'rag_demo'
    ASR_MODEL: str = 'large-v3'
    EMB_MODEL: str = 'intfloat/multilingual-e5-large-instruct'
    OPENAI_MODEL: str = 'gpt-4o'
    USE_FAKE_ASR: bool = False  # False для реального ASR
    USE_VAD: bool = False
    SEGMENT_SEC: int = 60
    BEAM_SIZE: int = 5
    DEVICE: str = 'cuda' if TORCH_OK and torch.cuda.is_available() else 'cpu'
    WHISPER_PRECISION: str = 'float16' if (TORCH_OK and torch.cuda.is_available()) else 'int8'

cfg = Config()
cfg.ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)
cfg.PERSIST_DIR.mkdir(parents=True, exist_ok=True)

print(cfg)

Config(INPUT_DIR=PosixPath('data/audio'), ARTIFACTS_DIR=PosixPath('artifacts'), PERSIST_DIR=PosixPath('data/chroma_db'), COLLECTION='rag_demo', ASR_MODEL='large-v3', EMB_MODEL='intfloat/multilingual-e5-large-instruct', OPENAI_MODEL='gpt-4o', USE_FAKE_ASR=False, USE_VAD=False, SEGMENT_SEC=60, BEAM_SIZE=5, DEVICE='cpu', WHISPER_PRECISION='int8')


## 0. Вспомогательные модули (utils)

In [None]:
# === audio_utils ===
from typing import Dict, List
from pathlib import Path

def run_batch_asr(input_dir: Path, language: str = 'ru', segment_sec: int = 60, use_vad: bool = False, beam: int = 5, timestamps: str = 'segment',
                    use_fake: bool = True, model_name: str = 'large-v3', device: str = 'cpu', compute_type: str = 'int8') -> Dict[str, dict]:
    """
    Бежевая обёртка над faster-whisper.
    Если use_fake=True — вернёт фиктивный результат (для быстрой проверки пайплайна).
    Иначе — попытается выполнить реальную транскрибацию всех аудио в папке.
    """
    results: Dict[str, dict] = {}
    audio_ext = {'.mp3', '.wav', '.m4a', '.mp4'}
    audio_paths = [p for p in sorted(input_dir.glob('*')) if p.suffix.lower() in audio_ext]
    if not audio_paths:
        print('Нет аудиофайлов в', input_dir)
        return results
    if use_fake:
        for p in audio_paths:
            file_id = p.stem
            payload = {
                'file_id': file_id,
                'language': language,
                'segments': [
                    {'start': 0.0, 'end': 5.0, 'speaker': 'spk1', 'text': '(demo) пример транскрипта'}
                ],
                'full_text': '(demo) пример транскрипта'
            }
            results[file_id] = payload
        return results
    # Реальный ASR
    try:
        from faster_whisper import WhisperModel
    except Exception as e:
        raise RuntimeError('Установите faster-whisper для реального ASR') from e
    model = WhisperModel(model_name, device=device, compute_type=compute_type)
    for p in audio_paths:
        file_id = p.stem
        segments, info = model.transcribe(str(p), language=language, beam_size=beam, vad_filter=use_vad)
        segs = []
        full_text_parts = []
        for seg in segments:
            segs.append({'start': float(seg.start or 0.0), 'end': float(seg.end or 0.0), 'speaker': 'spk1', 'text': (seg.text or '').strip()})
            full_text_parts.append((seg.text or '').strip())
        payload = {
            'file_id': file_id,
            'language': getattr(info, 'language', language),
            'segments': segs,
            'full_text': ' '.join(full_text_parts).strip()
        }
        results[file_id] = payload
    return results

# === text_utils ===
def normalize_text_simple(text: str) -> str:
    t = re.sub(r'\s+', ' ', (text or '')).strip()
    if t and not t[0].isupper():
        t = t[0].upper() + t[1:]
    return t

def split_into_chunks_by_words(text: str, size: int = 250, overlap: int = 50) -> List[str]:
    words = (text or '').split()
    if size <= 0:
        return [text]
    step = max(1, size - overlap)
    chunks = []
    for i in range(0, len(words), step):
        chunk_words = words[i:i+size]
        if not chunk_words:
            continue
        chunks.append(' '.join(chunk_words))
    return chunks

# === embeddings ===
from sentence_transformers import SentenceTransformer
import numpy as np
_EMB_MODEL: SentenceTransformer | None = None

def get_embedding_model(model_name: str = 'intfloat/multilingual-e5-large-instruct') -> SentenceTransformer:
    global _EMB_MODEL
    if _EMB_MODEL is None:
        _EMB_MODEL = SentenceTransformer(model_name)
    return _EMB_MODEL

def _encode_docs(model: SentenceTransformer, texts: List[str]) -> np.ndarray:
    return model.encode(texts, normalize_embeddings=True, show_progress_bar=False)

def embed_documents(model: SentenceTransformer, texts: List[str]) -> List[List[float]]:
    return _encode_docs(model, [f'passage: {t}' for t in texts]).tolist()

def embed_query(model: SentenceTransformer, query: str) -> List[float]:
    return _encode_docs(model, [f'query: {query}'])[0].tolist()

# === vectorstore (ChromaDB) ===
import chromadb

def get_chroma_collection(persist_dir: str | Path, collection_name: str):
    client = chromadb.PersistentClient(path=str(persist_dir))
    return client.get_or_create_collection(name=collection_name)

def upsert_chunks(collection, model, chunks: List[Dict]):
    ids = [c['id'] for c in chunks]
    texts = [c['text'] for c in chunks]
    metas = [c.get('metadata', {}) for c in chunks]
    vectors = embed_documents(model, texts)
    collection.upsert(ids=ids, embeddings=vectors, documents=texts, metadatas=metas)

def query_topk(collection, query_embedding: List[float], k: int = 8) -> List[Dict]:
    res = collection.query(query_embeddings=[query_embedding], n_results=k, include=['documents', 'metadatas', 'distances'])
    docs = []
    for i in range(len(res['ids'][0])):
        docs.append({
            'id': res['ids'][0][i],
            'text': res['documents'][0][i],
            'metadata': res['metadatas'][0][i],
            'distance': res['distances'][0][i],
        })
    return docs

## 1. Распознавание аудио (faster-whisper)

In [None]:
# Запуск ASR на всех файлах в cfg.INPUT_DIR
import json

asr_results = run_batch_asr(
    input_dir=cfg.INPUT_DIR,
    language='ru',
    segment_sec=cfg.SEGMENT_SEC,
    use_vad=cfg.USE_VAD,
    beam=cfg.BEAM_SIZE,
    use_fake=cfg.USE_FAKE_ASR,
    model_name=cfg.ASR_MODEL,
    device=cfg.DEVICE,
    compute_type=cfg.WHISPER_PRECISION
)
print('Файлов обработано:', len(asr_results))

# Сохранение артефактов
(cfg.ARTIFACTS_DIR / 'asr').mkdir(parents=True, exist_ok=True)
for fid, payload in asr_results.items():
    with open(cfg.ARTIFACTS_DIR / 'asr' / f'{fid}.json', 'w', encoding='utf-8') as f:
        json.dump(payload, f, ensure_ascii=False, indent=2)

# Превью
list(asr_results.keys())[:5], (next(iter(asr_results.values())) if asr_results else {})

config.json: 0.00B [00:00, ?B/s]

preprocessor_config.json:   0%|          | 0.00/340 [00:00<?, ?B/s]

vocabulary.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

model.bin:   0%|          | 0.00/3.09G [00:00<?, ?B/s]

KeyboardInterrupt: 

## 2. Нормализация текста

In [None]:
# Нормализуем full_text по каждому файлу
from typing import Dict

normalized_map: Dict[str, str] = {}
for fid, payload in asr_results.items():
    norm = normalize_text_simple(payload.get('full_text', ''))
    normalized_map[fid] = norm

# Сохранение
(cfg.ARTIFACTS_DIR / 'normalized').mkdir(parents=True, exist_ok=True)
with open(cfg.ARTIFACTS_DIR / 'normalized' / 'normalized_texts.json', 'w', encoding='utf-8') as f:
    json.dump(normalized_map, f, ensure_ascii=False, indent=2)

# Превью
{k: v[:120] + ('…' if len(v) > 120 else '') for k, v in list(normalized_map.items())[:3]}

{'стружки 59': '(demo) пример транскрипта'}

## 3. Чанкинг, эмбеддинги и вставка в ChromaDB

In [None]:
# Чанкинг
all_chunks = []
for fid, text in normalized_map.items():
    chunks = split_into_chunks_by_words(text, size=250, overlap=50)
    for i, ch in enumerate(chunks):
        all_chunks.append({
            'id': f'{fid}::chunk::{i}',
            'text': ch,
            'metadata': {'source_id': fid, 'chunk_index': i}
        })
print('Чанков:', len(all_chunks))

# Эмбеддинги и вставка в ChromaDB
emb_model = get_embedding_model(cfg.EMB_MODEL)
coll = get_chroma_collection(cfg.PERSIST_DIR, cfg.COLLECTION)
upsert_chunks(coll, emb_model, all_chunks)
len(all_chunks)

Чанков: 1


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/128 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_xlm-roberta_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/690 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.12G [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/964 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/271 [00:00<?, ?B/s]

1

## 4. Поиск и генерация ответа (OpenAI, RAG)

In [None]:
# Проверяем наличие API-ключа
assert os.getenv('OPENAI_API_KEY'), 'Положите ключ в переменную окружения OPENAI_API_KEY (например, через .env)'

from openai import OpenAI
client = OpenAI()

question = 'Кто режиссёр первого фильма MCU?'
q_vec = embed_query(emb_model, question)
docs = query_topk(coll, q_vec, k=3)
context = '\n'.join(f"{i+1}. {d['text']}" for i, d in enumerate(docs))

system = 'Отвечай только на основе контекста. Если ответа нет — скажи, что не знаешь.'
user = f'Вопрос: {question}\n\nКонтекст: \n{context}'

resp = client.chat.completions.create(
    model=cfg.OPENAI_MODEL,
    messages=[
        {'role':'system','content':system},
        {'role':'user','content':user}
    ],
    temperature=0.3,
    max_tokens=256
)
answer = resp.choices[0].message.content
print('Ответ:', answer)

print('Top‑K источники:')
for i, d in enumerate(docs, 1):
    print(f"{i}) id={d['id']} distance={d['distance']:.4f} source={d.get('metadata', {}).get('source_id')}")

SyntaxError: unterminated string literal (detected at line 2) (ipython-input-1831748718.py, line 2)

### Примечания и отладка- Для реального ASR установите `faster-whisper` и поставьте `USE_FAKE_ASR=False`.
- Если нет GPU — параметр `compute_type='int8'` позволит работать на CPU.
- Чанкинг с `overlap=50` помогает при границах фрагментов.
- Убедитесь, что ChromaDB имеет права на запись в `data/chroma_db`.

In [None]:
!pip install chromadb

Collecting chromadb
  Downloading chromadb-1.0.21-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.3 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.2-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (8.7 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.22.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.9 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.37.0-py3-none-any.whl.metadata (2.4 kB)
Collecting pypika>=0.48.9 (from chromadb)
  Downloading PyPika-0.48.9.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [