# Memory-RAG и Chunking стратегии с LangChain

Этот ноутбук демонстрирует реализацию и сравнение различных стратегий разбиения текста (chunking) для Retrieval-Augmented Generation (RAG) и интеграцию ContextualCompressionRetriever и Memory-RAG пайплайна.

Outline:
1. Импорт и установка необходимых библиотек
2. Создание тестового документа
3. Fixed-Size Chunking
4. Semantic Chunking
5. Recursive Code Chunking
6. Adaptive Chunking
7. Context-Enriched Chunking
8. AI-Driven Chunking (mock)
9. Сравнительная оценка стратегий
10. ContextualCompressionRetriever
11. Memory-RAG пайплайн
12. Визуализация метрик


In [None]:
# 1. Импорт и установка необходимых библиотек
import sys, subprocess, importlib

required = [
    'langchain', 'langchain-text-splitters', 'sentence-transformers', 'nltk', 'pydantic', 'numpy', 'pandas', 'matplotlib'
]

for pkg in required:
    try:
        importlib.import_module(pkg.split('-')[0])
    except ImportError:
        print(f'Installing {pkg}...')
        subprocess.run([sys.executable, '-m', 'pip', 'install', pkg])

import re, math, json, time, random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import nltk

try:
    nltk.data.find('tokenizers/punkt')
except LookupError:
    nltk.download('punkt')

from sentence_transformers import SentenceTransformer
from langchain_text_splitters import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain_core.documents import Document

print('Environment ready.')

In [None]:
# 2. Создание тестового документа

def create_dummy_document(paragraphs: int = 40) -> str:
    base_paras = []
    topics = [
        'Machine Learning overview and its branches including supervised learning.',
        'Neural networks fundamentals: activation functions, backpropagation, optimization.',
        'Large Language Models and transformer attention scaling strategies.',
        'Vector databases, hybrid retrieval, reranking pipelines and HNSW indexing.',
        'Context compression methods: MMR, clustering, summarization with LLMs.',
        'Reinforcement Learning from Human Feedback and policy optimization.',
        'Multimodal learning: text + image fusion mechanisms and embeddings.',
        'Federated learning privacy constraints and differential privacy.',
        'Time series forecasting and anomaly detection approaches.',
        'Graph algorithms including PageRank and Personalized PageRank for KG.',
    ]
    for i in range(paragraphs):
        topic = topics[i % len(topics)]
        base_paras.append(f"## Section {i+1}\n" + topic + ' ' + ' '.join([
            'This paragraph elaborates on ' + random.choice(topics).lower(),
            'It provides additional clarifications and comparisons.',
            'Important for retrieval quality and semantic integrity.'
        ]))
    return '\n\n'.join(base_paras)

document = create_dummy_document()
print('Document length (chars):', len(document))
print(document[:500] + '...')  # preview

In [None]:
# 3. Fixed-Size Chunking

fixed_splitter = CharacterTextSplitter(
    separator='\n\n',
    chunk_size=800,
    chunk_overlap=160,
    length_function=len
)
fixed_chunks = fixed_splitter.split_text(document)
fixed_docs = [Document(page_content=c, metadata={'chunk_id': i, 'chunk_type': 'fixed', 'size': len(c)}) for i, c in enumerate(fixed_chunks)]
print('Fixed chunks count:', len(fixed_docs))
print('Example chunk preview:\n', fixed_docs[len(fixed_docs)//2].page_content[:300])

In [None]:
# 4. Semantic Chunking

semantic_splitter = RecursiveCharacterTextSplitter(
    separators=['\n## ', '\n\n', '\n', '. ', ' ', ''],
    chunk_size=600,
    chunk_overlap=120,
    length_function=len
)
semantic_chunks = semantic_splitter.split_text(document)
section_pattern = re.compile(r'^##\s+Section\s+(\d+)', re.MULTILINE)
semantic_docs = []
current_section = None
for i, ch in enumerate(semantic_chunks):
    m = section_pattern.search(ch)
    if m:
        current_section = m.group(1)
    semantic_docs.append(Document(page_content=ch, metadata={
        'chunk_id': i,
        'chunk_type': 'semantic',
        'section': current_section,
        'size': len(ch)
    }))
print('Semantic chunks count:', len(semantic_docs))
print('Example semantic chunk metadata:', semantic_docs[0].metadata)

In [None]:
# 5. Recursive Code Chunking (mock Python code)
code_sample = """
import math

class VectorOps:
    def norm(self, v):
        return math.sqrt(sum(x*x for x in v))

    def cosine(self, a, b):
        na = self.norm(a); nb = self.norm(b)
        return sum(x*y for x,y in zip(a,b)) / (na*nb)

def embed(text):
    # mock embedding
    return [float(len(text)) % 10, len(text)%7, len(text)%5]

"""
code_splitter = RecursiveCharacterTextSplitter.from_language(
    language='python',
    chunk_size=120,
    chunk_overlap=20
)
code_chunks = code_splitter.split_text(code_sample)
code_docs = [Document(page_content=c, metadata={'chunk_type': 'code', 'chunk_id': i, 'size': len(c)}) for i,c in enumerate(code_chunks)]
print('Code chunks:', len(code_docs))
for d in code_docs:
    print(d.metadata, '=>', d.page_content.replace('\n',' ')[:80])

In [None]:
# 6. Adaptive Chunking
from langchain_text_splitters import TextSplitter

class AdaptiveTextSplitter(TextSplitter):
    def __init__(self, min_chunk_size=300, max_chunk_size=900, min_overlap=40, max_overlap=140, **kwargs):
        super().__init__(**kwargs)
        self.min_chunk_size=min_chunk_size
        self.max_chunk_size=max_chunk_size
        self.min_overlap=min_overlap
        self.max_overlap=max_overlap

    def complexity(self, text: str) -> float:
        words = re.findall(r'\b\w+\b', text.lower())
        if not words:
            return 0.0
        unique = len(set(words))
        density = unique/len(words)
        avg_sentence_len = sum(len(s) for s in re.split(r'[.!?]', text) if s.strip()) / max(1, len(re.split(r'[.!?]', text)))
        # normalize approx
        return min(1.0, 0.5*density + 0.5*min(1.0, avg_sentence_len/180))

    def split_text(self, text: str) -> list[str]:
        sentences = nltk.sent_tokenize(text)
        chunks=[]; current=[]; current_size=0; current_complexity=0.5
        for sent in sentences:
            c = self.complexity(sent)
            current_complexity = (current_complexity + c)/2 if current else c
            target = self.max_chunk_size - current_complexity*(self.max_chunk_size-self.min_chunk_size)
            if current_size + len(sent) > target and current:
                chunks.append(' '.join(current))
                # overlap
                overlap_target = self.min_overlap + current_complexity*(self.max_overlap-self.min_overlap)
                overlap=[]; o_size=0
                for s in reversed(current):
                    if o_size + len(s) <= overlap_target:
                        overlap.insert(0,s); o_size += len(s)
                    else:
                        break
                current = overlap + [sent]
                current_size = sum(len(x) for x in current)
            else:
                current.append(sent); current_size += len(sent)
        if current:
            chunks.append(' '.join(current))
        return chunks

adaptive_splitter = AdaptiveTextSplitter()
adaptive_chunks = adaptive_splitter.split_text(document)
adaptive_docs = [Document(page_content=c, metadata={'chunk_type':'adaptive','chunk_id':i,'size':len(c)}) for i,c in enumerate(adaptive_chunks)]
print('Adaptive chunks:', len(adaptive_docs))
print('Example adaptive chunk size:', adaptive_docs[0].metadata['size'])

In [None]:
# 7. Context-Enriched Chunking (mock summaries)

base_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
    separators=['\n\n','\n','. ',' ','']
)
base_chunks = base_splitter.split_text(document)

def mock_summary(text: str) -> str:
    first_sentence = text.split('.')[0]
    return 'Summary: ' + first_sentence[:120] + '...'

enriched_docs = []
window_size = 1
for i, ch in enumerate(base_chunks):
    window_start = max(0, i-window_size)
    window_end = min(len(base_chunks), i+window_size+1)
    context_parts = [base_chunks[j] for j in range(window_start, window_end) if j!=i]
    context_text = ' '.join(context_parts)
    summary = mock_summary(context_text) if context_text else ''
    enriched_docs.append(Document(page_content=f'Context: {summary}\n\nContent: {ch}', metadata={
        'chunk_id': i,
        'chunk_type': 'context_enriched',
        'has_context': bool(context_text)
    }))
print('Context-enriched chunks:', len(enriched_docs))
print('Example enriched doc:\n', enriched_docs[0].page_content[:300])

In [None]:
# 8. AI-Driven Chunking (mock)

def ai_driven_chunking_mock(text: str, max_chunks: int = 15) -> list[str]:
    paragraphs = text.split('\n\n')
    chunks=[]; current=''
    for p in paragraphs:
        if len(current)+len(p) < 700:
            current += p + '\n\n'
        else:
            if current:
                chunks.append(current.strip())
            current = p + '\n\n'
    if current:
        chunks.append(current.strip())
    # reduce if too many
    if len(chunks) > max_chunks:
        grouped=[]; group_size = math.ceil(len(chunks)/max_chunks)
        for i in range(0,len(chunks),group_size):
            grouped.append('\n\n'.join(chunks[i:i+group_size]))
        chunks = grouped
    return chunks

ai_chunks = ai_driven_chunking_mock(document, max_chunks=12)
ai_docs = [Document(page_content=c, metadata={'chunk_id':i,'chunk_type':'ai_mock','size':len(c)}) for i,c in enumerate(ai_chunks)]
print('AI-driven mock chunks:', len(ai_docs))
print('Example AI chunk size:', ai_docs[0].metadata['size'])

In [None]:
# 9. Сравнительная оценка стратегий

strategies = {
    'fixed': fixed_docs,
    'semantic': semantic_docs,
    'adaptive': adaptive_docs,
    'context_enriched': enriched_docs,
    'ai_mock': ai_docs
}

keywords = ['machine', 'learning', 'vector', 'retrieval', 'compression', 'graph']
phrases = ['Machine Learning overview', 'Large Language Models', 'Personalized PageRank']

def keyword_coverage(docs):
    texts = [d.page_content.lower() for d in docs]
    found=0
    for kw in keywords:
        if any(kw in t for t in texts):
            found += 1
    return found / len(keywords)

def average_chunk_size(docs):
    return sum(len(d.page_content) for d in docs)/len(docs)

def size_std(docs):
    avg = average_chunk_size(docs)
    return (sum((len(d.page_content)-avg)**2 for d in docs)/len(docs))**0.5

def coherence(docs):
    # crude: penalize chunks starting lowercase or ending without punctuation
    penalty=0
    for d in docs:
        txt=d.page_content.strip()
        if txt and (txt[0].islower() or txt[0] in ',;:)]}'):
            penalty+=1
        if txt and not re.search(r'[.!?]$', txt):
            penalty+=1
    max_pen = 2*len(docs)
    return 1 - penalty/max_pen

results=[]
for name, ds in strategies.items():
    results.append({
        'strategy': name,
        'chunks': len(ds),
        'keyword_coverage': round(keyword_coverage(ds),2),
        'avg_size': round(average_chunk_size(ds),1),
        'size_std': round(size_std(ds),1),
        'coherence': round(coherence(ds),2)
    })

import pandas as pd
results_df = pd.DataFrame(results)
print(results_df)

plt.figure(figsize=(10,5))
plt.bar(results_df['strategy'], results_df['keyword_coverage'])
plt.title('Keyword Coverage')
plt.show()

plt.figure(figsize=(10,5))
plt.bar(results_df['strategy'], results_df['coherence'])
plt.title('Coherence Score')
plt.show()

In [None]:
# 10. ContextualCompressionRetriever (mock/simple pipeline)
from langchain.retrievers import ContextualCompressionRetriever
from langchain.schema import BaseDocumentTransformer

class TopKCompressor(BaseDocumentTransformer):
    def __init__(self, max_chars: int = 400):
        self.max_chars = max_chars
    def transform_documents(self, documents, **kwargs):
        out=[]
        for d in documents:
            content = d.page_content
            if len(content) > self.max_chars:
                content = content[:self.max_chars] + '...'
            out.append(Document(page_content=content, metadata={**d.metadata, 'compressed': True}))
        return out

# base retriever mock: just returns first N semantic docs
class MockRetriever:
    def invoke(self, query: str):
        # pretend ranking by simple keyword matching
        ranked = []
        for d in semantic_docs:
            score = sum(query.lower().count(kw) for kw in ['learning','vector','graph'])
            ranked.append((score, d))
        ranked.sort(key=lambda x: x[0], reverse=True)
        return [d for _,d in ranked[:8]]

base_retriever = MockRetriever()
compressor = TopKCompressor(max_chars=300)
compressed = compressor.transform_documents(base_retriever.invoke('graph learning vector'))
print('Compressed docs returned:', len(compressed))
print('Sample compressed content length:', len(compressed[0].page_content))

In [None]:
# 11. Memory-RAG пайплайн (упрощенный)
# Steps: ingest -> chunk -> embed -> store -> retrieve -> compress -> generate

# 1. Ingest (already have 'document')
source_id = 'dummy-doc-001'

# 2. Chunk (choose adaptive for demo)
ingested_chunks = adaptive_docs

# 3. Embeddings (sentence-transformers)
embed_model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = embed_model.encode([d.page_content for d in ingested_chunks])

# 4. Store (in-memory vector index)
index = [(emb, d) for emb, d in zip(embeddings, ingested_chunks)]

# simple cosine
def cosine(a,b):
    return float(np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b)))

# 5. Retrieve
query = 'Explain transformer attention scaling strategies and vector databases'
q_emb = embed_model.encode([query])[0]
scored = [(cosine(q_emb, emb), d) for emb,d in index]
scored.sort(key=lambda x: x[0], reverse=True)
retrieved = [d for _,d in scored[:5]]

# 6. Compress (reuse compressor)
compressed_retrieved = compressor.transform_documents(retrieved)

# 7. Generate (mock generation combining compressed text)
answer_context = '\n\n'.join(d.page_content for d in compressed_retrieved)
mock_answer = 'Answer:\n' + answer_context[:1000] + '\n...[truncated]'
print(mock_answer)

In [None]:
# 12. Визуализация дополнительных метрик
plt.figure(figsize=(10,5))
plt.bar(results_df['strategy'], results_df['avg_size'])
plt.title('Average Chunk Size')
plt.show()

plt.figure(figsize=(10,5))
plt.bar(results_df['strategy'], results_df['size_std'])
plt.title('Chunk Size StdDev')
plt.show()

print('Finished all sections.')

# 13. Chat Integration с автосжатием контекста (RAG + LangChain)

Этот раздел демонстрирует:
- Интеграцию LangChain в чат-пайплайн
- Автоматическое сжатие контекста при каждом запросе
- Функции для управления порядком сообщений в JSON

In [None]:
# 13.1 ChatRAGCompressor — интеграция LangChain с автосжатием контекста

from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import copy

@dataclass
class ChatMessage:
    """Структура сообщения чата"""
    role: str  # 'user', 'assistant', 'system'
    content: str
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    metadata: Dict[str, Any] = field(default_factory=dict)
    compressed: bool = False
    original_length: int = 0
    
    def to_dict(self) -> Dict:
        return {
            'role': self.role,
            'content': self.content,
            'timestamp': self.timestamp,
            'metadata': self.metadata,
            'compressed': self.compressed,
            'original_length': self.original_length
        }
    
    @classmethod
    def from_dict(cls, data: Dict) -> 'ChatMessage':
        return cls(**data)


class ChatRAGCompressor:
    """
    Интеграция LangChain + RAG с автоматическим сжатием контекста.
    При каждом запросе:
    1. Сжимает старые сообщения
    2. Извлекает релевантный контекст через RAG
    3. Формирует оптимальный промпт
    """
    
    def __init__(
        self,
        embed_model_name: str = 'all-MiniLM-L6-v2',
        max_context_tokens: int = 4000,
        compression_threshold: int = 500,
        keep_recent_messages: int = 3
    ):
        self.embed_model = SentenceTransformer(embed_model_name)
        self.max_context_tokens = max_context_tokens
        self.compression_threshold = compression_threshold
        self.keep_recent_messages = keep_recent_messages
        
        # История чата
        self.messages: List[ChatMessage] = []
        # Индекс для RAG (embeddings + messages)
        self.message_index: List[tuple] = []
        
    def _estimate_tokens(self, text: str) -> int:
        """Примерная оценка токенов (4 символа ~ 1 токен)"""
        return len(text) // 4
    
    def _compress_message(self, message: ChatMessage) -> ChatMessage:
        """Сжатие одного сообщения"""
        content = message.content
        original_length = len(content)
        
        if len(content) <= self.compression_threshold:
            return message
        
        # Стратегии сжатия:
        # 1. Удаление приветствий и филлеров
        fillers = [
            r'\b(um|uh|well|so|basically|actually|you know|i mean)\b',
            r'^(hi|hello|hey|greetings)[,!.\s]*',
            r'(thanks|thank you)[,!.\s]*$',
        ]
        compressed = content
        for pattern in fillers:
            compressed = re.sub(pattern, '', compressed, flags=re.IGNORECASE)
        
        # 2. Удаление повторяющихся пробелов
        compressed = re.sub(r'\s+', ' ', compressed).strip()
        
        # 3. Если всё ещё большое — обрезаем до ключевых предложений
        if len(compressed) > self.compression_threshold:
            sentences = nltk.sent_tokenize(compressed)
            # Берём первое и последнее предложения + ключевые
            if len(sentences) > 3:
                compressed = f"{sentences[0]} [...] {sentences[-1]}"
        
        new_msg = ChatMessage(
            role=message.role,
            content=compressed,
            timestamp=message.timestamp,
            metadata={**message.metadata, 'compression_ratio': len(compressed)/original_length},
            compressed=True,
            original_length=original_length
        )
        return new_msg
    
    def _embed_message(self, message: ChatMessage) -> np.ndarray:
        """Создание эмбеддинга для сообщения"""
        return self.embed_model.encode([message.content])[0]
    
    def add_message(self, role: str, content: str, metadata: Dict = None):
        """Добавление нового сообщения в чат"""
        msg = ChatMessage(
            role=role,
            content=content,
            metadata=metadata or {},
            original_length=len(content)
        )
        self.messages.append(msg)
        
        # Добавляем в RAG индекс
        emb = self._embed_message(msg)
        self.message_index.append((emb, msg, len(self.messages) - 1))
        
    def _retrieve_relevant_context(self, query: str, top_k: int = 5) -> List[ChatMessage]:
        """RAG: извлечение релевантных сообщений по запросу"""
        if not self.message_index:
            return []
        
        q_emb = self.embed_model.encode([query])[0]
        scored = []
        for emb, msg, idx in self.message_index:
            score = cosine(q_emb, emb)
            scored.append((score, msg, idx))
        
        scored.sort(key=lambda x: x[0], reverse=True)
        return [msg for _, msg, _ in scored[:top_k]]
    
    def build_context(self, current_query: str) -> Dict[str, Any]:
        """
        Построение оптимального контекста для LLM:
        1. Последние N сообщений (без сжатия)
        2. Релевантные старые сообщения (сжатые через RAG)
        3. System prompt
        """
        result = {
            'system': '',
            'context_messages': [],
            'recent_messages': [],
            'current_query': current_query,
            'stats': {}
        }
        
        total_tokens = 0
        
        # 1. Недавние сообщения (без сжатия)
        recent = self.messages[-self.keep_recent_messages:] if len(self.messages) > self.keep_recent_messages else self.messages
        for msg in recent:
            result['recent_messages'].append(msg.to_dict())
            total_tokens += self._estimate_tokens(msg.content)
        
        # 2. RAG: релевантные старые сообщения (сжатые)
        if len(self.messages) > self.keep_recent_messages:
            relevant = self._retrieve_relevant_context(current_query, top_k=5)
            for msg in relevant:
                if msg not in recent:
                    compressed_msg = self._compress_message(msg)
                    tokens = self._estimate_tokens(compressed_msg.content)
                    if total_tokens + tokens < self.max_context_tokens:
                        result['context_messages'].append(compressed_msg.to_dict())
                        total_tokens += tokens
        
        result['stats'] = {
            'total_tokens_estimate': total_tokens,
            'recent_count': len(result['recent_messages']),
            'context_count': len(result['context_messages']),
            'compression_applied': any(m.get('compressed') for m in result['context_messages'])
        }
        
        return result
    
    def get_messages_json(self) -> str:
        """Экспорт всех сообщений в JSON"""
        return json.dumps([m.to_dict() for m in self.messages], ensure_ascii=False, indent=2)
    
    def load_messages_json(self, json_str: str):
        """Импорт сообщений из JSON"""
        data = json.loads(json_str)
        self.messages = [ChatMessage.from_dict(d) for d in data]
        # Перестроить индекс
        self.message_index = []
        for i, msg in enumerate(self.messages):
            emb = self._embed_message(msg)
            self.message_index.append((emb, msg, i))


# Демонстрация
chat = ChatRAGCompressor(max_context_tokens=2000, keep_recent_messages=3)

# Добавляем тестовые сообщения
chat.add_message('user', 'Привет! Расскажи мне про машинное обучение и нейронные сети.')
chat.add_message('assistant', 'Машинное обучение — это раздел искусственного интеллекта, который позволяет компьютерам обучаться на данных без явного программирования. Нейронные сети — это архитектура, вдохновлённая работой мозга.')
chat.add_message('user', 'А что такое трансформеры и attention механизм?')
chat.add_message('assistant', 'Трансформеры — это архитектура нейронных сетей, использующая механизм внимания (attention) для обработки последовательностей. Self-attention позволяет модели взвешивать важность разных частей входа.')
chat.add_message('user', 'Как работают векторные базы данных?')
chat.add_message('assistant', 'Векторные БД хранят эмбеддинги и выполняют быстрый поиск по сходству через алгоритмы типа HNSW или IVF.')

print('Всего сообщений:', len(chat.messages))
print('\nПостроение контекста для нового запроса:')
context = chat.build_context('Объясни связь между attention и векторным поиском')
print(json.dumps(context['stats'], indent=2))

In [None]:
# 13.2 Функции для управления порядком сообщений в JSON

class ChatMessageManager:
    """
    Утилиты для работы с историей чата:
    - Перестановка сообщений
    - Перемещение вверх/вниз
    - Swap между позициями
    - Pin важных сообщений
    """
    
    def __init__(self, messages: List[Dict] = None):
        self.messages = messages or []
    
    def load_json(self, json_str: str):
        """Загрузка из JSON строки"""
        self.messages = json.loads(json_str)
        return self
    
    def load_list(self, messages: List[Dict]):
        """Загрузка из списка"""
        self.messages = copy.deepcopy(messages)
        return self
    
    def to_json(self, indent: int = 2) -> str:
        """Экспорт в JSON"""
        return json.dumps(self.messages, ensure_ascii=False, indent=indent)
    
    def to_list(self) -> List[Dict]:
        """Получить список сообщений"""
        return copy.deepcopy(self.messages)
    
    # === ПЕРЕСТАНОВКИ ===
    
    def swap(self, index1: int, index2: int) -> 'ChatMessageManager':
        """Поменять местами два сообщения по индексам"""
        if 0 <= index1 < len(self.messages) and 0 <= index2 < len(self.messages):
            self.messages[index1], self.messages[index2] = self.messages[index2], self.messages[index1]
        return self
    
    def move_up(self, index: int) -> 'ChatMessageManager':
        """Переместить сообщение на одну позицию вверх"""
        if 0 < index < len(self.messages):
            self.swap(index, index - 1)
        return self
    
    def move_down(self, index: int) -> 'ChatMessageManager':
        """Переместить сообщение на одну позицию вниз"""
        if 0 <= index < len(self.messages) - 1:
            self.swap(index, index + 1)
        return self
    
    def move_to_top(self, index: int) -> 'ChatMessageManager':
        """Переместить сообщение в начало"""
        if 0 < index < len(self.messages):
            msg = self.messages.pop(index)
            self.messages.insert(0, msg)
        return self
    
    def move_to_bottom(self, index: int) -> 'ChatMessageManager':
        """Переместить сообщение в конец"""
        if 0 <= index < len(self.messages) - 1:
            msg = self.messages.pop(index)
            self.messages.append(msg)
        return self
    
    def move_to_position(self, from_index: int, to_index: int) -> 'ChatMessageManager':
        """Переместить сообщение на конкретную позицию"""
        if 0 <= from_index < len(self.messages) and 0 <= to_index < len(self.messages):
            msg = self.messages.pop(from_index)
            self.messages.insert(to_index, msg)
        return self
    
    def reverse(self) -> 'ChatMessageManager':
        """Развернуть порядок всех сообщений"""
        self.messages.reverse()
        return self
    
    def sort_by_timestamp(self, ascending: bool = True) -> 'ChatMessageManager':
        """Сортировка по времени"""
        self.messages.sort(
            key=lambda m: m.get('timestamp', ''),
            reverse=not ascending
        )
        return self
    
    def sort_by_role(self, order: List[str] = None) -> 'ChatMessageManager':
        """Сортировка по роли (system -> user -> assistant)"""
        order = order or ['system', 'user', 'assistant']
        role_priority = {r: i for i, r in enumerate(order)}
        self.messages.sort(key=lambda m: role_priority.get(m.get('role', ''), 999))
        return self
    
    # === ФИЛЬТРАЦИЯ ===
    
    def filter_by_role(self, role: str) -> List[Dict]:
        """Получить сообщения только определённой роли"""
        return [m for m in self.messages if m.get('role') == role]
    
    def remove_by_index(self, index: int) -> 'ChatMessageManager':
        """Удалить сообщение по индексу"""
        if 0 <= index < len(self.messages):
            self.messages.pop(index)
        return self
    
    def duplicate(self, index: int) -> 'ChatMessageManager':
        """Дублировать сообщение"""
        if 0 <= index < len(self.messages):
            msg_copy = copy.deepcopy(self.messages[index])
            msg_copy['timestamp'] = datetime.now().isoformat()
            msg_copy['metadata'] = msg_copy.get('metadata', {})
            msg_copy['metadata']['duplicated_from'] = index
            self.messages.insert(index + 1, msg_copy)
        return self
    
    # === ГРУППОВЫЕ ОПЕРАЦИИ ===
    
    def group_by_role(self) -> Dict[str, List[Dict]]:
        """Группировка сообщений по ролям"""
        groups = {}
        for msg in self.messages:
            role = msg.get('role', 'unknown')
            if role not in groups:
                groups[role] = []
            groups[role].append(msg)
        return groups
    
    def interleave_user_assistant(self) -> 'ChatMessageManager':
        """
        Перестроить: чередовать user/assistant.
        Полезно для восстановления диалоговой структуры.
        """
        users = [m for m in self.messages if m.get('role') == 'user']
        assistants = [m for m in self.messages if m.get('role') == 'assistant']
        others = [m for m in self.messages if m.get('role') not in ('user', 'assistant')]
        
        result = others[:]  # system messages first
        for u, a in zip(users, assistants):
            result.append(u)
            result.append(a)
        # Добавить оставшиеся
        result.extend(users[len(assistants):])
        result.extend(assistants[len(users):])
        
        self.messages = result
        return self
    
    def preview(self, max_content_len: int = 50) -> None:
        """Предпросмотр сообщений"""
        print(f"{'#':<3} {'Role':<12} {'Content':<{max_content_len+3}} Compressed")
        print('-' * (max_content_len + 25))
        for i, m in enumerate(self.messages):
            content = m.get('content', '')[:max_content_len]
            if len(m.get('content', '')) > max_content_len:
                content += '...'
            compressed = '✓' if m.get('compressed') else ''
            print(f"{i:<3} {m.get('role', '?'):<12} {content:<{max_content_len+3}} {compressed}")


# === ДЕМОНСТРАЦИЯ ===

# Создаём менеджер из существующего чата
manager = ChatMessageManager()
manager.load_json(chat.get_messages_json())

print("=== Исходный порядок ===")
manager.preview()

print("\n=== После swap(0, 2) ===")
manager.swap(0, 2).preview()

print("\n=== После move_to_bottom(0) ===")
manager.move_to_bottom(0).preview()

print("\n=== После reverse() ===")
manager.reverse().preview()

print("\n=== Экспорт в JSON (первые 500 символов) ===")
print(manager.to_json()[:500] + '...')

In [None]:
# 13.3 Полный пример: Chat API с автосжатием и управлением порядком

class ChatAPI:
    """
    API-подобный интерфейс для чата с:
    - Автосжатием контекста через RAG
    - Управлением порядком сообщений
    - Экспортом/импортом JSON
    """
    
    def __init__(self, **kwargs):
        self.compressor = ChatRAGCompressor(**kwargs)
        self.manager = ChatMessageManager()
    
    def send_message(self, content: str, role: str = 'user') -> Dict:
        """Отправить сообщение и получить сжатый контекст"""
        self.compressor.add_message(role, content)
        context = self.compressor.build_context(content)
        return {
            'status': 'ok',
            'message_count': len(self.compressor.messages),
            'context': context
        }
    
    def get_history(self) -> List[Dict]:
        """Получить всю историю"""
        return [m.to_dict() for m in self.compressor.messages]
    
    def export_json(self) -> str:
        """Экспорт в JSON"""
        return self.compressor.get_messages_json()
    
    def import_json(self, json_str: str):
        """Импорт из JSON"""
        self.compressor.load_messages_json(json_str)
    
    # === Управление порядком ===
    
    def reorder(self, operation: str, **kwargs) -> Dict:
        """
        Универсальный метод для изменения порядка.
        
        Operations:
        - 'swap': swap index1 и index2
        - 'move_up': поднять index на 1
        - 'move_down': опустить index на 1
        - 'move_to': переместить from_index на to_index
        - 'reverse': развернуть весь список
        - 'sort_time': сортировка по времени
        - 'interleave': чередование user/assistant
        """
        self.manager.load_list([m.to_dict() for m in self.compressor.messages])
        
        if operation == 'swap':
            self.manager.swap(kwargs.get('index1', 0), kwargs.get('index2', 1))
        elif operation == 'move_up':
            self.manager.move_up(kwargs.get('index', 0))
        elif operation == 'move_down':
            self.manager.move_down(kwargs.get('index', 0))
        elif operation == 'move_to':
            self.manager.move_to_position(kwargs.get('from_index', 0), kwargs.get('to_index', 0))
        elif operation == 'reverse':
            self.manager.reverse()
        elif operation == 'sort_time':
            self.manager.sort_by_timestamp(kwargs.get('ascending', True))
        elif operation == 'interleave':
            self.manager.interleave_user_assistant()
        else:
            return {'status': 'error', 'message': f'Unknown operation: {operation}'}
        
        # Применяем изменения обратно
        new_json = self.manager.to_json()
        self.compressor.load_messages_json(new_json)
        
        return {
            'status': 'ok',
            'operation': operation,
            'new_order': [{'index': i, 'role': m.role, 'preview': m.content[:40]} 
                          for i, m in enumerate(self.compressor.messages)]
        }
    
    def preview(self):
        """Предпросмотр"""
        self.manager.load_list([m.to_dict() for m in self.compressor.messages])
        self.manager.preview()


# === ДЕМОНСТРАЦИЯ API ===

api = ChatAPI(max_context_tokens=2000, keep_recent_messages=2)

# Симулируем диалог
api.send_message("Расскажи про RAG системы")
api.send_message("RAG (Retrieval-Augmented Generation) — это метод, объединяющий поиск релевантных документов с генерацией ответа.", role='assistant')
api.send_message("А как работает сжатие контекста?")
api.send_message("Сжатие контекста уменьшает количество токенов, сохраняя семантику через различные методы: суммаризацию, удаление redundancy, выбор ключевых фраз.", role='assistant')
api.send_message("Можно ли комбинировать RAG и compression?")

print("=== Текущая история ===")
api.preview()

print("\n=== Меняем местами 0 и 2 ===")
result = api.reorder('swap', index1=0, index2=2)
print(json.dumps(result, ensure_ascii=False, indent=2))

print("\n=== После swap ===")
api.preview()

print("\n=== Контекст для нового запроса ===")
context = api.send_message("Как это применить на практике?")
print(f"Статистика: {context['context']['stats']}")

In [None]:
# 13.4 HTTP-ready функции (для интеграции в FastAPI/Flask)

def create_chat_endpoints():
    """
    Пример структуры endpoints для REST API.
    Можно скопировать в FastAPI роутер.
    """
    
    endpoints = {
        'POST /chat/message': {
            'description': 'Отправить сообщение',
            'body': {'role': 'user', 'content': 'текст сообщения'},
            'response': {'status': 'ok', 'context': {...}}
        },
        'GET /chat/history': {
            'description': 'Получить всю историю',
            'response': [{'role': '...', 'content': '...', 'timestamp': '...'}]
        },
        'POST /chat/reorder': {
            'description': 'Изменить порядок сообщений',
            'body': {'operation': 'swap|move_up|move_down|move_to|reverse', 'params': {...}},
            'response': {'status': 'ok', 'new_order': [...]}
        },
        'GET /chat/export': {
            'description': 'Экспорт в JSON',
            'response': '[...]'
        },
        'POST /chat/import': {
            'description': 'Импорт из JSON',
            'body': {'messages': [...]},
            'response': {'status': 'ok', 'count': N}
        },
        'POST /chat/compress': {
            'description': 'Принудительное сжатие всех сообщений',
            'response': {'status': 'ok', 'tokens_saved': N}
        }
    }
    
    return endpoints


# Пример кода для FastAPI (можно скопировать в отдельный файл)
fastapi_example = '''
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Any, Optional

app = FastAPI()

# Инициализация глобального чата (в production использовать DI)
chat_api = ChatAPI(max_context_tokens=4000, keep_recent_messages=3)

class MessageRequest(BaseModel):
    role: str = "user"
    content: str

class ReorderRequest(BaseModel):
    operation: str
    index: Optional[int] = None
    index1: Optional[int] = None
    index2: Optional[int] = None
    from_index: Optional[int] = None
    to_index: Optional[int] = None

@app.post("/chat/message")
async def send_message(req: MessageRequest):
    result = chat_api.send_message(req.content, req.role)
    return result

@app.get("/chat/history")
async def get_history():
    return chat_api.get_history()

@app.post("/chat/reorder")
async def reorder_messages(req: ReorderRequest):
    params = req.dict(exclude={"operation"}, exclude_none=True)
    result = chat_api.reorder(req.operation, **params)
    return result

@app.get("/chat/export")
async def export_chat():
    return {"json": chat_api.export_json()}

@app.post("/chat/import")
async def import_chat(messages: List[Dict]):
    chat_api.import_json(json.dumps(messages))
    return {"status": "ok", "count": len(messages)}
'''

print("=== Доступные API endpoints ===")
for endpoint, info in create_chat_endpoints().items():
    print(f"\n{endpoint}")
    print(f"  → {info['description']}")

print("\n=== FastAPI код сохранён в переменную `fastapi_example` ===")
print("Скопируйте в отдельный файл (например, chat_api.py) для использования.")