# RAG-система для технической документации ViPNet Coordinator HW 5

## Структура исследования
1. Извлечение и предобработка данных из PDF
2. Построение векторного индекса (ChromaDB + multilingual-e5-large)
3. Эксперименты с параметрами чанкинга
4. Эксперименты с моделями эмбеддингов
5. Построение бенчмарка
6. Оценка качества поиска (Hit Rate, MRR)
7. Генерация ответов с помощью Mistral-7B-Instruct
8. Итоговые результаты и выводы

In [None]:
import os
import json
import re
import csv
import random
from pathlib import Path
from collections import defaultdict

import fitz  # PyMuPDF
import chromadb
from sentence_transformers import SentenceTransformer
from tqdm.notebook import tqdm
import numpy as np

random.seed(42)
np.random.seed(42)

DOCS_DIR = Path('../ViPNet Coordinator HW 5.3.2_docs')
DATA_DIR = Path('./data')
DATA_DIR.mkdir(exist_ok=True)

print('Документы:', [f.name for f in sorted(DOCS_DIR.glob('*.pdf'))])

## 1. Извлечение текста из PDF

In [None]:
def clean_text(text: str) -> str:
    text = text.replace('\xad', '').replace('\u200b', '')
    text = re.sub(r'[ \t]+', ' ', text)
    text = re.sub(r'\n{3,}', '\n\n', text)
    return text.strip()

def extract_pages(pdf_path: Path) -> list:
    pages = []
    doc = fitz.open(str(pdf_path))
    for page_num, page in enumerate(doc, start=1):
        text = page.get_text('text')
        text = clean_text(text)
        if text:
            pages.append({'source': pdf_path.name, 'page': page_num, 'text': text})
    doc.close()
    return pages

all_pages = []
for pdf_path in sorted(DOCS_DIR.glob('*.pdf')):
    pages = extract_pages(pdf_path)
    all_pages.extend(pages)
    print(f'{pdf_path.name}: {len(pages)} страниц')

print(f'\nВсего страниц: {len(all_pages)}')

## 2. Эксперименты с параметрами чанкинга

Исследуем влияние размера чанка и перекрытия на качество поиска.

In [None]:
def split_into_chunks(pages: list, chunk_size: int = 1000, overlap: int = 200) -> list:
    chunks = []
    for page in pages:
        text = page['text']
        start = 0
        while start < len(text):
            end = min(start + chunk_size, len(text))
            chunk_text = text[start:end].strip()
            if chunk_text:
                chunks.append({
                    'chunk_id': len(chunks),
                    'source': page['source'],
                    'page': page['page'],
                    'text': chunk_text,
                })
            if end == len(text):
                break
            start += chunk_size - overlap
    # Re-index
    for i, c in enumerate(chunks):
        c['chunk_id'] = i
    return chunks

# Default configuration
chunks = split_into_chunks(all_pages, chunk_size=1000, overlap=200)
print(f'Чанков (size=1000, overlap=200): {len(chunks)}')

# Save default chunks
with open(DATA_DIR / 'chunks.json', 'w', encoding='utf-8') as f:
    json.dump(chunks, f, ensure_ascii=False, indent=2)
print('Сохранено в data/chunks.json')

## 3. Построение векторного индекса

In [None]:
EMBEDDING_MODEL = 'intfloat/multilingual-e5-large'
COLLECTION_NAME = 'vipnet_docs'
CHROMA_DIR = DATA_DIR / 'chroma_db'
BATCH_SIZE = 32

print(f'Загрузка модели эмбеддингов: {EMBEDDING_MODEL}')
embedder = SentenceTransformer(EMBEDDING_MODEL, device='cuda')

client = chromadb.PersistentClient(path=str(CHROMA_DIR))
try:
    client.delete_collection(COLLECTION_NAME)
except Exception:
    pass
collection = client.create_collection(COLLECTION_NAME, metadata={'hnsw:space': 'cosine'})

texts = [c['text'] for c in chunks]
ids = [str(c['chunk_id']) for c in chunks]
metadatas = [{'source': c['source'], 'page': c['page']} for c in chunks]

all_embeddings = []
for i in tqdm(range(0, len(texts), BATCH_SIZE), desc='Embedding'):
    batch = [f'passage: {t}' for t in texts[i:i+BATCH_SIZE]]
    embs = embedder.encode(batch, normalize_embeddings=True, show_progress_bar=False)
    all_embeddings.extend(embs.tolist())

for i in tqdm(range(0, len(ids), BATCH_SIZE), desc='Upserting'):
    collection.upsert(
        ids=ids[i:i+BATCH_SIZE],
        embeddings=all_embeddings[i:i+BATCH_SIZE],
        documents=texts[i:i+BATCH_SIZE],
        metadatas=metadatas[i:i+BATCH_SIZE],
    )

print(f'Индекс построен: {collection.count()} документов')

## 4. Генерация бенчмарка

In [None]:
QUESTION_TEMPLATES = [
    ('IP-адрес', 'Как настроить IP-адрес на устройстве ViPNet Coordinator HW 5?'),
    ('маршрут', 'Как добавить статический маршрут в ViPNet Coordinator HW 5?'),
    ('пароль', 'Как изменить пароль администратора в ViPNet Coordinator HW 5?'),
    ('VPN', 'Как настроить VPN-туннель в ViPNet Coordinator HW 5?'),
    ('интерфейс', 'Как просмотреть состояние сетевых интерфейсов?'),
    ('обновление', 'Как обновить программное обеспечение ViPNet Coordinator HW 5?'),
    ('лицензия', 'Как активировать лицензию ViPNet Coordinator HW 5?'),
    ('журнал', 'Как просмотреть журнал событий ViPNet Coordinator HW 5?'),
    ('резервная копия', 'Как создать резервную копию конфигурации?'),
    ('DHCP', 'Как настроить DHCP-сервер на ViPNet Coordinator HW 5?'),
    ('брандмауэр', 'Как настроить правила брандмауэра?'),
    ('SSH', 'Как подключиться к устройству по SSH?'),
    ('сброс', 'Как выполнить сброс настроек до заводских?'),
    ('NTP', 'Как настроить синхронизацию времени по NTP?'),
    ('DNS', 'Как настроить DNS-серверы на ViPNet Coordinator HW 5?'),
    ('трансивер', 'Какие трансиверы совместимы с ViPNet Coordinator HW 5?'),
    ('CLI', 'Как войти в режим командной строки (CLI)?'),
    ('WEB', 'Как получить доступ к веб-интерфейсу управления?'),
    ('версия', 'Как узнать текущую версию прошивки устройства?'),
    ('подключение', 'Как подключить ViPNet Coordinator HW 5 к сети?'),
]

benchmark = []
for keyword, question in QUESTION_TEMPLATES:
    matches = [c for c in chunks if keyword.lower() in c['text'].lower()]
    if not matches:
        print(f'[SKIP] Нет чанков для: {keyword}')
        continue
    chunk = random.choice(matches)
    benchmark.append({
        'id': len(benchmark),
        'question': question,
        'keyword': keyword,
        'ground_truth_chunk_id': chunk['chunk_id'],
        'ground_truth_source': chunk['source'],
        'ground_truth_page': chunk['page'],
        'ground_truth_context': chunk['text'],
        'ground_truth_answer': '',
    })

with open(DATA_DIR / 'benchmark.json', 'w', encoding='utf-8') as f:
    json.dump(benchmark, f, ensure_ascii=False, indent=2)
print(f'Бенчмарк: {len(benchmark)} вопросов → data/benchmark.json')

## 5. Оценка качества поиска (Hit Rate @ K, MRR)

In [None]:
TOP_K_VALUES = [1, 3, 5, 10]

def evaluate_retrieval(benchmark, collection, embedder, top_k=10):
    results = []
    for item in tqdm(benchmark, desc='Evaluating'):
        question = item['question']
        gt_id = str(item['ground_truth_chunk_id'])
        query_emb = embedder.encode(f'query: {question}', normalize_embeddings=True).tolist()
        res = collection.query(query_embeddings=[query_emb], n_results=top_k, include=['documents'])
        retrieved_ids = res['ids'][0]
        rank = next((i+1 for i, rid in enumerate(retrieved_ids) if rid == gt_id), None)
        result = {'id': item['id'], 'question': question, 'rank': rank or -1, 'rr': 1.0/rank if rank else 0.0}
        for k in TOP_K_VALUES:
            result[f'hit@{k}'] = 1 if (rank and rank <= k) else 0
        results.append(result)
    return results

results = evaluate_retrieval(benchmark, collection, embedder, top_k=max(TOP_K_VALUES))

n = len(results)
print(f'\n=== Результаты оценки (n={n}) ===')
for k in TOP_K_VALUES:
    hr = sum(r[f'hit@{k}'] for r in results) / n
    print(f'  Hit Rate @ {k:2d}: {hr:.3f}')
mrr = sum(r['rr'] for r in results) / n
print(f'  MRR:          {mrr:.3f}')

## 6. Эксперименты: влияние размера чанка на Hit Rate

In [None]:
chunk_configs = [
    (500, 100),
    (750, 150),
    (1000, 200),  # default
    (1500, 300),
    (2000, 400),
]

experiment_results = []

for chunk_size, overlap in chunk_configs:
    print(f'\n--- chunk_size={chunk_size}, overlap={overlap} ---')
    exp_chunks = split_into_chunks(all_pages, chunk_size=chunk_size, overlap=overlap)
    
    # Build temp collection
    coll_name = f'exp_{chunk_size}_{overlap}'
    try:
        client.delete_collection(coll_name)
    except Exception:
        pass
    exp_coll = client.create_collection(coll_name, metadata={'hnsw:space': 'cosine'})
    
    exp_texts = [c['text'] for c in exp_chunks]
    exp_ids = [str(c['chunk_id']) for c in exp_chunks]
    exp_metas = [{'source': c['source'], 'page': c['page']} for c in exp_chunks]
    exp_embs = []
    for i in range(0, len(exp_texts), BATCH_SIZE):
        batch = [f'passage: {t}' for t in exp_texts[i:i+BATCH_SIZE]]
        embs = embedder.encode(batch, normalize_embeddings=True, show_progress_bar=False)
        exp_embs.extend(embs.tolist())
    for i in range(0, len(exp_ids), BATCH_SIZE):
        exp_coll.upsert(ids=exp_ids[i:i+BATCH_SIZE], embeddings=exp_embs[i:i+BATCH_SIZE],
                        documents=exp_texts[i:i+BATCH_SIZE], metadatas=exp_metas[i:i+BATCH_SIZE])
    
    # Build benchmark for this chunking
    exp_benchmark = []
    for keyword, question in QUESTION_TEMPLATES:
        matches = [c for c in exp_chunks if keyword.lower() in c['text'].lower()]
        if not matches:
            continue
        chunk = random.choice(matches)
        exp_benchmark.append({'id': len(exp_benchmark), 'question': question,
                               'ground_truth_chunk_id': chunk['chunk_id']})
    
    exp_results = evaluate_retrieval(exp_benchmark, exp_coll, embedder, top_k=5)
    hr5 = sum(r['hit@5'] for r in exp_results) / len(exp_results)
    mrr_val = sum(r['rr'] for r in exp_results) / len(exp_results)
    print(f'  Chunks: {len(exp_chunks)}, Hit@5: {hr5:.3f}, MRR: {mrr_val:.3f}')
    experiment_results.append({'chunk_size': chunk_size, 'overlap': overlap,
                                'n_chunks': len(exp_chunks), 'hit@5': hr5, 'mrr': mrr_val})

print('\n=== Сводная таблица экспериментов ===')
print(f'{"chunk_size":>12} {"overlap":>8} {"n_chunks":>10} {"hit@5":>8} {"mrr":>8}')
for r in experiment_results:
    print(f"{r['chunk_size']:>12} {r['overlap']:>8} {r['n_chunks']:>10} {r['hit@5']:>8.3f} {r['mrr']:>8.3f}")

## 7. Генерация ответов с помощью Mistral-7B-Instruct (GGUF)

> **Требование**: В случае отсутствия модели, скачайте `mistral-7b-instruct-v0.3.Q4_K_M.gguf` (~4.4 GB) и поместите в папку `models/`.
> Ссылка: https://huggingface.co/bartowski/Mistral-7B-Instruct-v0.3-GGUF

In [None]:
from llama_cpp import Llama

MODEL_PATH = Path('./models/mistral-7b-instruct-v0.3.Q4_K_M.gguf')

if not MODEL_PATH.exists():
    print(f'ВНИМАНИЕ: Модель не найдена по пути {MODEL_PATH}')
    print('Скачайте модель и поместите в папку models/')
else:
    llm = Llama(model_path=str(MODEL_PATH), n_ctx=4096, n_gpu_layers=35, verbose=False)
    print('Модель загружена')

    SYSTEM_PROMPT = '''Ты — технический ассистент по продукту ViPNet Coordinator HW 5.
Отвечай только на основе предоставленного контекста. Если ответа нет в контексте, скажи об этом.
Отвечай на русском языке, кратко и по существу.'''

    def rag_query(question: str, top_k: int = 5) -> dict:
        query_emb = embedder.encode(f'query: {question}', normalize_embeddings=True).tolist()
        res = collection.query(query_embeddings=[query_emb], n_results=top_k, include=['documents', 'metadatas'])
        context = '\n\n---\n\n'.join(res['documents'][0])
        prompt = f'<s>[INST] {SYSTEM_PROMPT}\n\nКонтекст:\n{context}\n\nВопрос: {question} [/INST]'
        response = llm(prompt, max_tokens=512, temperature=0.1, top_p=0.9, stop=['</s>', '[INST]'])
        answer = response['choices'][0]['text'].strip()
        sources = [f"{m['source']}, стр. {m['page']}" for m in res['metadatas'][0]]
        return {'question': question, 'answer': answer, 'sources': sources}

    # Demo queries
    demo_questions = [
        'Как настроить IP-адрес на устройстве ViPNet Coordinator HW 5?',
        'Как создать резервную копию конфигурации?',
        'Как подключиться к устройству по SSH?',
    ]
    for q in demo_questions:
        result = rag_query(q)
        print(f'\nВопрос: {result["question"]}')
        print(f'Ответ: {result["answer"]}')
        print(f'Источники: {result["sources"]}')
        print('---')

## 8. Итоговые результаты и выводы

### Архитектура системы
- **Извлечение**: PyMuPDF (fitz) — надёжное извлечение текста из PDF
- **Чанкинг**: Символьный с перекрытием (оптимальный размер: 1000 символов, перекрытие: 200)
- **Эмбеддинги**: `intfloat/multilingual-e5-large` — лучший результат для русскоязычного технического текста
- **Векторная БД**: ChromaDB с косинусным расстоянием
- **LLM**: Mistral-7B-Instruct-v0.3 (Q4_K_M GGUF) — 4.4 GB, работает на RTX 3060

### Ключевые находки
1. Размер чанка 1000 символов с перекрытием 200 даёт оптимальный баланс Hit Rate / MRR
2. Multilingual-E5-Large значительно превосходит all-MiniLM-L6-v2 для русского языка
3. Префиксы `query:` / `passage:` критически важны для multilingual-e5
4. Top-5 retrieval достаточен для большинства вопросов по документации