# team_bereza — Test Notebook

Последовательные тесты всех модулей RAG-пайплайна.  
Каждая секция независима — можно запускать по отдельности.

| Секция | Что тестируем | Нужен GPU? |
|--------|--------------|------------|
| 0 | Окружение, CUDA, зависимости | — |
| 1 | Config | — |
| 2 | OCR (EasyOCR + PyMuPDF) | желательно |
| 3 | Parser (семантический чанкер) | — |
| 4 | TextEmbedder | — |
| 5 | FaissStore | — |
| 6 | LocalGenerator | **да** |
| 7 | Полный пайплайн | **да** |
| 8 | Валидация ответа | — |

---
## 0. Окружение

In [None]:
import sys
import torch

print(f"Python : {sys.version}")
print(f"PyTorch: {torch.__version__}")
print(f"CUDA   : {torch.version.cuda}")
print(f"GPU    : {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"  Device : {torch.cuda.get_device_name(0)}")
    print(f"  VRAM   : {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

# Выбор устройства для всего ноутбука
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\nDevice : {DEVICE}")

In [None]:
# Проверяем, что все зависимости установлены
import importlib

deps = [
    "fitz",           # PyMuPDF
    "easyocr",
    "faiss",
    "sentence_transformers",
    "transformers",
    "docx",           # python-docx
    "numpy",
    "PIL",            # Pillow
]

for dep in deps:
    try:
        importlib.import_module(dep)
        print(f"  OK  {dep}")
    except ImportError:
        print(f"  MISSING  {dep}  <-- pip install")

In [None]:
# Проверяем импорт всех модулей проекта
from src.config import Config
from src.ocr import extract_text_from_pdf
from src.parser import GuidelineParserStub
from src.embeddings import TextEmbedder
from src.faiss_store import FaissStore
from src.generator import LocalGenerator

print("Все модули src/ импортированы успешно.")

---
## 1. Config

In [None]:
from src.config import Config
from dataclasses import asdict
import json

cfg = Config(device=DEVICE)
print(json.dumps(asdict(cfg), indent=2, ensure_ascii=False))

---
## 2. OCR — EasyOCR + PyMuPDF

Тест включает:
- детекцию нативного текстового слоя
- извлечение текста (нативное или через EasyOCR)
- проверку нормализации (сохранение абзацев)

In [None]:
import fitz
from pathlib import Path

PDF_PATH = cfg.guideline_pdf_path
assert Path(PDF_PATH).is_file(), f"PDF не найден: {PDF_PATH}"

# Проверяем наличие текстового слоя
with fitz.open(PDF_PATH) as doc:
    total_pages = len(doc)
    pages_with_text = sum(1 for p in doc if p.get_text().strip())

print(f"Страниц всего : {total_pages}")
print(f"Страниц с текстом: {pages_with_text}")
print(f"Режим извлечения: {'native' if pages_with_text > 0 else 'OCR (EasyOCR)'}")

In [None]:
from src.ocr import extract_text_from_pdf
import time

t0 = time.time()
guideline_text = extract_text_from_pdf(PDF_PATH, languages=["ru"])
elapsed = time.time() - t0

paragraphs = [p for p in guideline_text.split("\n\n") if p.strip()]

print(f"Время         : {elapsed:.1f}s")
print(f"Символов      : {len(guideline_text):,}")
print(f"Абзацев (\\n\\n): {len(paragraphs)}")
print()
print("=== Первые 3 абзаца ===")
for i, p in enumerate(paragraphs[:3]):
    print(f"[{i}] {p[:200]}")
    print()

---
## 3. Parser — семантический чанкер

In [None]:
# Юнит-тест на синтетическом тексте
from src.parser import GuidelineParserStub

parser = GuidelineParserStub(chunk_size=30, overlap=5)

sample = (
    "Первый абзац: краткие сведения о заболевании.\n\n"
    "Второй абзац: диагностика и лабораторные показатели.\n\n"
    "Третий абзац: принципы лечения первой линии.\n\n"
    "Четвёртый абзац: реабилитация и наблюдение после терапии."
)

chunks = parser.parse(sample)
print(f"Входных абзацев: 4")
print(f"Выходных чанков: {len(chunks)}")
print()
for i, c in enumerate(chunks):
    print(f"[{i}] {c!r}")

In [None]:
# Тест на реальном тексте из PDF
# guideline_text должен быть получен в секции OCR выше
import time

parser_real = GuidelineParserStub(
    chunk_size=cfg.chunk_size,
    overlap=cfg.overlap,
    model_name=cfg.embedder_model,
)

t0 = time.time()
chunks = parser_real.parse(guideline_text)
elapsed = time.time() - t0

token_counts = [
    len(parser_real.tokenizer.encode(c, add_special_tokens=False))
    for c in chunks
]

print(f"Время     : {elapsed:.2f}s")
print(f"Чанков    : {len(chunks)}")
print(f"Токенов   : min={min(token_counts)}, max={max(token_counts)}, avg={sum(token_counts)//len(token_counts)}")
print()
print("=== Первые 2 чанка ===")
for i, c in enumerate(chunks[:2]):
    print(f"[{i}] ({token_counts[i]} tok) {c[:300]}")
    print()

---
## 4. TextEmbedder

In [None]:
from src.embeddings import TextEmbedder
import numpy as np

embedder = TextEmbedder(model_name=cfg.embedder_model)

# embed_text
single = embedder.embed_text("Рак лёгкого: диагностика и лечение")
print(f"embed_text  shape : {single.shape}, dtype: {single.dtype}")
print(f"L2 norm           : {np.linalg.norm(single):.6f}  (должно быть ~1.0)")

# embed_batch
batch_texts = [
    "Хирургическое лечение немелкоклеточного рака лёгкого",
    "Химиотерапия при мелкоклеточном раке",
    "Иммунотерапия ингибиторами контрольных точек",
]
batch = embedder.embed_batch(batch_texts)
print(f"\nembed_batch shape : {batch.shape}")
norms = np.linalg.norm(batch, axis=1)
print(f"L2 norms          : {norms}  (все ~1.0)")

# Косинусное сходство между парами
sims = batch @ batch.T
print(f"\nКосинусная матрица сходства (normalized L2 = cosine):")
print(np.round(sims, 3))

---
## 5. FaissStore

In [None]:
from src.faiss_store import FaissStore
import tempfile, os

# Создаём in-memory store с синтетическими данными
texts = [
    "Хирургическое лечение немелкоклеточного рака лёгкого",
    "Химиотерапия при мелкоклеточном раке",
    "Иммунотерапия ингибиторами контрольных точек",
    "Таргетная терапия при EGFR-мутации",
    "Лучевая терапия при раке лёгкого",
]
embeddings = embedder.embed_batch(texts)
metadata = [{"id": i} for i in range(len(texts))]

store = FaissStore(dimension=embeddings.shape[1])
store.add(texts, embeddings, metadata)
print(f"Добавлено векторов: {store.index.ntotal}")

# Поиск
query = "Лечение EGFR мутация таргетная"
q_emb = embedder.embed_text(query)[0]
results = store.search(q_emb, top_k=3)

print(f"\nЗапрос: '{query}'")
print("Результаты:")
for r in results:
    print(f"  score={r['score']:.4f}  {r['text']}")

# Сохранение и загрузка
with tempfile.TemporaryDirectory() as tmpdir:
    store.save(tmpdir)
    files = os.listdir(tmpdir)
    print(f"\nСохранённые файлы: {files}")

    store2 = FaissStore(dimension=embeddings.shape[1])
    store2.load(tmpdir)
    results2 = store2.search(q_emb, top_k=1)
    print(f"После load — top-1: {results2[0]['text']}")

print("\nFaissStore: OK")

### 5.1 Загрузить существующий индекс с диска

In [None]:
import faiss
from pathlib import Path

INDEX_DIR = cfg.faiss_index_path
index_exists = all(
    Path(INDEX_DIR, f).exists()
    for f in ["index.faiss", "texts.pkl", "metadata.pkl"]
)

if index_exists:
    temp = faiss.read_index(str(Path(INDEX_DIR) / "index.faiss"))
    real_store = FaissStore(dimension=temp.d)
    real_store.load(INDEX_DIR)
    del temp

    q_real = embedder.embed_text("EGFR мутация таргетная терапия первая линия")[0]
    results = real_store.search(q_real, top_k=3)

    print(f"Векторов в индексе: {real_store.index.ntotal}")
    print(f"Dimension         : {real_store.dimension}")
    print()
    print("Top-3 по запросу 'EGFR мутация таргетная терапия первая линия':")
    for r in results:
        print(f"  score={r['score']:.4f}  {r['text'][:150]}")
        print()
else:
    print("Индекс не найден — запусти секцию 7 (полный пайплайн) для его создания.")

---
## 6. LocalGenerator

> **Требования**: Qwen2-7B-Instruct ~ 14 GB VRAM (GPU) / ~28 GB RAM (CPU).  
> На CPU работает, но очень медленно. На A100 / H100 — несколько секунд.

In [None]:
from src.generator import LocalGenerator
import time

t0 = time.time()
generator = LocalGenerator(model_name=cfg.generator_model, device=DEVICE)
print(f"Модель загружена за {time.time() - t0:.1f}s")
print(f"Device: {generator.device}")

In [None]:
# Проверяем системные промпты
for mode in ("doctor", "patient"):
    prompt = generator._get_system_prompt(mode)
    print(f"[{mode}] первые 120 символов: {prompt[:120]!r}")

# Проверяем валидацию неизвестного режима
try:
    generator._get_system_prompt("unknown")
except ValueError as e:
    print(f"\nОжидаемая ошибка: {e}")

In [None]:
# Проверяем apply_chat_template (без инференса)
messages = [
    {"role": "system", "content": "Ты ассистент."},
    {"role": "user", "content": "Привет!"},
]
formatted = generator.tokenizer.apply_chat_template(
    messages, tokenize=False, add_generation_prompt=True
)
print("Chat template output:")
print(formatted)

In [None]:
# Генерация на синтетических данных
import time

SYNTHETIC_SECTIONS = [
    "При EGFR-мутации (делеция 19 экзона или замена L858R) рекомендована таргетная терапия ингибиторами тирозинкиназы первого поколения.",
    "Хирургическое лечение рекомендовано при I–II стадии НМРЛ при отсутствии противопоказаний.",
    "Химиотерапия первой линии на основе препаратов платины рекомендована при отсутствии активирующих мутаций.",
]
SYNTHETIC_PATIENT = (
    "Пациент 58 лет. Диагноз: аденокарцинома лёгкого, стадия IIIA. "
    "EGFR мутация: делеция 19 экзона. Назначен гефитиниб."
)

t0 = time.time()
response = generator.generate(
    patient_text=SYNTHETIC_PATIENT,
    retrieved_sections=SYNTHETIC_SECTIONS,
    mode="doctor",
    max_new_tokens=256,
)
elapsed = time.time() - t0

print(f"Время генерации: {elapsed:.1f}s")
print(f"Токенов в ответе: ~{len(response.split())}")
print()
print("=== Ответ (режим doctor) ===")
print(response)

In [None]:
# То же самое в режиме patient
t0 = time.time()
response_patient = generator.generate(
    patient_text=SYNTHETIC_PATIENT,
    retrieved_sections=SYNTHETIC_SECTIONS,
    mode="patient",
    max_new_tokens=256,
)
print(f"Время: {time.time() - t0:.1f}s")
print()
print("=== Ответ (режим patient) ===")
print(response_patient)

---
## 7. Полный пайплайн (end-to-end)

Запускает `main()` целиком.  
Если FAISS-индекс уже есть — OCR и чанкинг пропускаются.  
Чтобы пересобрать индекс — удали папку `data/faiss_index/`.

In [None]:
# Раскомментировать, чтобы сбросить индекс и пересобрать с нуля
# import shutil
# shutil.rmtree(cfg.faiss_index_path, ignore_errors=True)
# print("Индекс удалён.")

In [None]:
# Запуск полного пайплайна через main()
# Если выше уже загружен generator — можно переиспользовать, но main() создаёт его заново.
import importlib
import main as main_module
importlib.reload(main_module)  # подхватываем изменения без перезапуска ядра

main_module.main()

### 7.1 Посмотреть сохранённые результаты

In [None]:
import json
from pathlib import Path

response_dir = Path(cfg.response_dir)
versions = sorted(response_dir.glob("version_*.json"))

if not versions:
    print("Нет сохранённых результатов.")
else:
    latest = versions[-1]
    with latest.open(encoding="utf-8") as f:
        data = json.load(f)

    print(f"Файл    : {latest.name}")
    print(f"Версия  : {data['version']}")
    print(f"Режим   : {data['mode']}")
    print(f"Модель  : {data['model_name']}")
    print(f"Старт   : {data['timestamp_start']}")
    print(f"Конец   : {data['timestamp_end']}")
    print(f"Итого   : {data['total_duration_seconds']}s")
    print()
    print("Этапы:")
    for stage, dur in data['stage_durations'].items():
        print(f"  {stage:<20} {dur}s")
    print()
    print("=== Финальный ответ ===")
    print(data['final_response'])

---
## 8. Валидация ответа

Автоматические проверки качества сгенерированного ответа.  
Запускать после секции 7 (либо загрузить любой `version_N.json` вручную).

### 8.1 Загрузка версии для анализа

In [None]:
import json
from pathlib import Path

# Поменяй номер версии при необходимости, None = последняя
EVAL_VERSION = None

response_dir = Path(cfg.response_dir)
versions = sorted(response_dir.glob("version_*.json"))
assert versions, "Нет сохранённых результатов — запусти секцию 7."

target = versions[-1] if EVAL_VERSION is None else response_dir / f"version_{EVAL_VERSION}.json"
assert target.exists(), f"Файл не найден: {target}"

with target.open(encoding="utf-8") as f:
    data = json.load(f)

response      = data["final_response"]
retrieved     = data.get("retrieved_sections", [])
mode          = data.get("mode", "—")
model_name    = data.get("model_name", "—")
total_dur     = data.get("total_duration_seconds", 0)
stage_dur     = data.get("stage_durations", {})

print(f"Версия  : {data['version']}")
print(f"Режим   : {mode}")
print(f"Модель  : {model_name}")
print(f"Длит.   : {total_dur}s")
print(f"Чанков  : {len(retrieved)}")

### 8.2 Структурные проверки

In [None]:
checks = []

def check(name, condition, detail=""):
    status = "OK  " if condition else "FAIL"
    checks.append((status, name, detail))
    print(f"  [{status}]  {name}" + (f"  →  {detail}" if detail else ""))

print("=== Структурные проверки ===\n")

# 1. Дисклеймер
check(
    "Дисклеймер присутствует",
    "Не является медицинской рекомендацией" in response,
)

# 2. Промпт не протёк в ответ
check("Нет артефактов промпта (===)",  "===" not in response)
check("Нет 'Ваш ответ:'",              "Ваш ответ:" not in response)
check("Нет '[system]' / '[user]'",     "[system]" not in response and "[user]" not in response)

# 3. Длина ответа
word_count = len(response.split())
check("Длина > 100 слов",  word_count > 100,  f"{word_count} слов")
check("Длина < 3000 слов", word_count < 3000, f"{word_count} слов")

# 4. Язык — кириллица присутствует
cyrillic_ratio = sum(1 for c in response if '\u0400' <= c <= '\u04ff') / max(len(response), 1)
check("Кириллица в ответе (>30%)", cyrillic_ratio > 0.3, f"{cyrillic_ratio:.0%}")

# 5. Нет явных галлюцинаций-маркеров
hallucination_markers = ["по данным ВОЗ", "согласно исследованию", "в 2023 году", "клинические испытания показали"]
found_markers = [m for m in hallucination_markers if m.lower() in response.lower()]
check("Нет внешних ссылок-маркеров", len(found_markers) == 0, ", ".join(found_markers) if found_markers else "")

print(f"\nИтого: {sum(1 for s,_,_ in checks if s=='OK  ')}/{len(checks)} OK")

### 8.3 Faithfulness — ответ vs retrieved_sections

Для каждого утверждения из ответа проверяем: какой чанк его поддерживает?  
Использует эмбеддинг-сходство как прокси — не заменяет экспертную проверку, но сигнализирует о потенциальных галлюцинациях.

In [None]:
import numpy as np
import re

if not retrieved:
    print("retrieved_sections не сохранены в этой версии JSON.")
    print("Пересобери пайплайн — начиная с v0.3 они сохраняются автоматически.")
else:
    # Разбиваем ответ на предложения
    sentences = [s.strip() for s in re.split(r'(?<=[.!?])\s+', response) if len(s.strip()) > 40]

    # Эмбеддим предложения ответа и retrieved-чанки
    # embedder должен быть инициализирован в секции 4
    sent_embs   = embedder.embed_batch(sentences)          # (S, D)
    chunk_embs  = embedder.embed_batch(retrieved)          # (K, D)

    # Косинусное сходство (векторы уже L2-нормализованы)
    sim_matrix = sent_embs @ chunk_embs.T                  # (S, K)

    print(f"Предложений в ответе : {len(sentences)}")
    print(f"Retrieved чанков     : {len(retrieved)}")
    print()

    LOW_THRESH = 0.35   # ниже — предложение плохо покрыто чанками
    flagged = []

    print(f"{'Сходство':>8}  Предложение")
    print("-" * 80)
    for i, (sent, sims) in enumerate(zip(sentences, sim_matrix)):
        best_score = float(sims.max())
        best_chunk = int(sims.argmax())
        flag = " ⚠" if best_score < LOW_THRESH else ""
        if best_score < LOW_THRESH:
            flagged.append((sent, best_score))
        print(f"  {best_score:.3f}{flag}   {sent[:90]}")

    print()
    if flagged:
        print(f"⚠  {len(flagged)} предложений плохо покрыты retrieved-чанками (score < {LOW_THRESH}):")
        for sent, score in flagged:
            print(f"   [{score:.3f}] {sent[:120]}")
    else:
        print("Все предложения хорошо покрыты retrieved-чанками.")

### 8.4 Просмотр retrieved-чанков

In [None]:
if not retrieved:
    print("retrieved_sections не сохранены в этой версии JSON.")
else:
    print(f"Передано в генератор {len(retrieved)} чанков:\n")
    for i, chunk in enumerate(retrieved):
        print(f"── Чанк {i} {'─'*60}")
        print(chunk)
        print()

### 8.5 Сравнение версий

In [None]:
import json
from pathlib import Path

response_dir = Path(cfg.response_dir)
versions = sorted(response_dir.glob("version_*.json"))

if len(versions) < 2:
    print("Нужно минимум 2 версии для сравнения.")
else:
    rows = []
    for vf in versions:
        with vf.open(encoding="utf-8") as f:
            d = json.load(f)
        gen_dur   = d.get("stage_durations", {}).get("generation", 0)
        has_ret   = "да" if d.get("retrieved_sections") else "нет"
        has_disc  = "да" if "Не является медицинской рекомендацией" in d.get("final_response", "") else "НЕТ ⚠"
        has_art   = "ДА ⚠" if "Ваш ответ:" in d.get("final_response", "") else "нет"
        wc        = len(d.get("final_response", "").split())
        rows.append((d["version"], d.get("mode","—"), gen_dur, wc, has_disc, has_art, has_ret))

    header = f"{'ver':>3}  {'mode':<8}  {'gen,s':>7}  {'слов':>5}  {'дисклейм':>9}  {'артефакт':>9}  {'sections':>8}"
    print(header)
    print("─" * len(header))
    for ver, mode, gen, wc, disc, art, ret in rows:
        print(f"  {ver:>2}  {mode:<8}  {gen:>7.1f}  {wc:>5}  {disc:>9}  {art:>9}  {ret:>8}")