<a href="https://colab.research.google.com/github/21092004Goda/data_anal/blob/main/RAG_system_lab_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Установка зависимостей**

In [8]:
%%capture
!pip install --quiet langchain_huggingface
!pip install --quiet sentence-transformers
!pip install --quiet faiss-cpu
!pip install --quiet arxiv

# **Классы**

## **Извлечение статей**

In [2]:
import arxiv
import time
from typing import List, Dict, Any

class ArxivTopicFetcher:

    def __init__(self, max_results_per_request: int = 100, delay_seconds: float = 3.0):
        self.client = arxiv.Client(
            page_size=max_results_per_request,
            delay_seconds=delay_seconds,
            num_retries=3
        )
        self.common_categories = {
            'machine_learning': 'cs.LG',
            'artificial_intelligence': 'cs.AI',
            'computer_vision': 'cs.CV',
            'nlp': 'cs.CL',
            'robotics': 'cs.RO',
            'databases': 'cs.DB',
            'security': 'cs.CR',
            'networks': 'cs.NI',
            'algorithms': 'cs.DS',
            'hci': 'cs.HC'
        }

    def _get_sort_criterion(self, sort_by: str) -> arxiv.SortCriterion:
        sort_map = {
            'relevance': arxiv.SortCriterion.Relevance,
            'lastUpdatedDate': arxiv.SortCriterion.LastUpdatedDate,
            'submittedDate': arxiv.SortCriterion.SubmittedDate
        }
        return sort_map.get(sort_by, arxiv.SortCriterion.SubmittedDate)

    def build_query(self, category: str) -> str:
        if category in self.common_categories:
            category = self.common_categories[category]
        return f"cat:{category}"

    def fetch_articles_paged(self, query: str, total_results: int = 2000,
                            sort_by: str = 'submittedDate',
                            sort_order: str = 'descending',
                            batch_size: int = 500) -> List[Dict[str, Any]]:

        batch_size = min(batch_size, 2000)

        all_articles = []

        for start_index in range(0, total_results, batch_size):
            print(f"Загружаю статьи с {start_index} по {start_index + batch_size - 1}...")

            sort_order_obj = (arxiv.SortOrder.Descending if sort_order == 'descending'
                             else arxiv.SortOrder.Ascending)

            search = arxiv.Search(
                query=query,
                max_results=batch_size,
                start=start_index,
                sort_by=self._get_sort_criterion(sort_by),
                sort_order=sort_order_obj
            )

            try:
                batch_articles = []
                for result in self.client.results(search):
                    batch_articles.append({
                        'arxiv_id': result.entry_id.split('/')[-1],
                        'title': result.title,
                        'authors': [a.name for a in result.authors],
                        'abstract': result.summary.replace('\n', ' '),
                        'published': result.published.date() if result.published else None,
                        'categories': result.categories,
                        'pdf_url': result.pdf_url
                    })

                all_articles.extend(batch_articles)
                print(f"Получено статей в пачке: {len(batch_articles)}. Всего: {len(all_articles)}")

                if len(batch_articles) < batch_size:
                    print("Достигнут конец списка результатов.")
                    break

                time.sleep(1.0)

            except Exception as e:
                print(f"Ошибка при загрузке пачки (start={start_index}): {e}")
                break

        print(f"\n✅ Загрузка завершена. Всего получено статей: {len(all_articles)}")
        return all_articles

    def fetch_articles(self, query: str, max_results: int = 50,
                      sort_by: str = 'submittedDate',
                      sort_order: str = 'descending') -> List[Dict[str, Any]]:

        return self.fetch_articles_paged(
            query=query,
            total_results=max_results,
            sort_by=sort_by,
            sort_order=sort_order,
            batch_size=min(max_results, 1000)
        )

    def fetch_by_category(self, category: str, max_results: int = 50, **kwargs):
        query = self.build_query(category)
        return self.fetch_articles_paged(
            query=query,
            total_results=max_results,
            batch_size=min(max_results, 1000),
            **kwargs
        )

    def print_summary(self, articles: List[Dict[str, Any]], n: int = 5):
        if not articles:
            print("Пусто.")
            return

        print("\n=== Короткий обзор ===")
        for idx, a in enumerate(articles[:n]):
            print(f"\n{idx+1}. {a['title']}")
            print("   Авторы:", ", ".join(a['authors'][:3]) + (" и др." if len(a['authors']) > 3 else ""))
            print("   Дата:", a['published'])
            print("   ID:", a['arxiv_id'])
            print("   Категории:", ", ".join(a['categories']))
            print("   Абстракт:", a['abstract'][:200], "...")

## **Разбиение на чанки**

In [3]:
import pandas as pd
import matplotlib.pyplot as plt

class TextChunker:

    def __init__(self, chunk_size: int = 500, overlap: int = 50):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk_text(self, text: str):
        if not text or not isinstance(text, str):
            return []

        words = text.split()
        chunks = []
        start = 0
        while start < len(words):
            end = start + self.chunk_size
            chunk_words = words[start:end]
            if not chunk_words:
                break
            chunks.append(" ".join(chunk_words))
            start = end - self.overlap

        return chunks

    def chunk_many(self, texts):
        return [self.chunk_text(t) for t in texts]

    def to_dataframe(self, articles):
        rows = []
        for a in articles:
            chunks = self.chunk_text(a["abstract"])
            rows.append({
                "id": a["arxiv_id"],
                "title": a["title"],
                "authors": ", ".join(a["authors"]),
                "published": a["published"],
                "categories": ", ".join(a["categories"]),
                "pdf_url": a["pdf_url"],
                "abstract": a["abstract"],
                "chunks": chunks
            })
        return pd.DataFrame(rows)

    def chunk_statistics(self, df: pd.DataFrame, plot: bool = False):
        chunk_counts = df["chunks"].apply(len)
        stats = {
            "Total articles": len(df),
            "Total chunks": int(chunk_counts.sum()),
            "Min chunks per article": int(chunk_counts.min()),
            "Max chunks per article": int(chunk_counts.max()),
            "Mean chunks per article": float(chunk_counts.mean()),
            "Median chunks per article": float(chunk_counts.median())
        }

        print("\n=== Chunking Statistics ===")
        for k, v in stats.items():
            print(f"{k}: {v}")

        if plot:
            plt.figure(figsize=(8,4))
            plt.hist(chunk_counts, bins=range(1, chunk_counts.max()+2), alpha=0.7, color='skyblue', edgecolor='black')
            plt.title("Distribution of Chunks per Article")
            plt.xlabel("Number of Chunks")
            plt.ylabel("Number of Articles")
            plt.xticks(range(1, chunk_counts.max()+2))
            plt.show()

        return stats


## **Векторизация текста**

In [9]:
import numpy as np
import pandas as pd
import faiss
from langchain_huggingface import HuggingFaceEmbeddings

class ArxivVectorPipeline:

    def __init__(
        self,
        model_name="sentence-transformers/all-MiniLM-L6-v2",
        device="cpu",
        normalize=False
    ):
        self.embeddings_model = HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs={"device": device},
            encode_kwargs={"normalize_embeddings": normalize}
        )

        self.index = None
        self.embedding_dim = None
        self.chunks_df = None
        self.embeddings = None

    def _flatten_chunks(self, df: pd.DataFrame):
        rows = []
        for _, row in df.iterrows():
            base = {
                "id": row["id"],
                "title": row["title"],
                "authors": row["authors"],
                "published": row["published"],
                "categories": row["categories"],
                "pdf_url": row["pdf_url"],
                "abstract": row["abstract"]
            }

            for idx, ch in enumerate(row["chunks"]):
                rows.append({
                    **base,
                    "chunk_id": idx,
                    "text_chunk": ch
                })

        return pd.DataFrame(rows)

    def _embed(self, texts):
        if not texts:
            return np.array([])

        emb = self.embeddings_model.embed_documents(texts)
        return np.array(emb, dtype=np.float32)

    def build(self, df: pd.DataFrame):
        self.chunks_df = self._flatten_chunks(df)
        texts = self.chunks_df["text_chunk"].tolist()
        self.embeddings = self._embed(texts)
        self.embedding_dim = self.embeddings.shape[1]
        self.index = faiss.IndexFlatL2(self.embedding_dim)
        self.index.add(self.embeddings)
        return self

    def search(self, query: str, top_k: int = 5):
        if self.index is None:
            raise ValueError("Индекс пуст. Сначала вызови build().")

        q_emb = self._embed([query])
        distances, indices = self.index.search(q_emb, top_k)

        results = []
        for dist, idx in zip(distances[0], indices[0]):
            row = self.chunks_df.iloc[int(idx)]
            results.append({
                "distance": float(dist),
                "chunk_id": int(row["chunk_id"]),
                "text_chunk": row["text_chunk"],
                "article": {
                    "id": row["id"],
                    "title": row["title"],
                    "authors": row["authors"],
                    "categories": row["categories"],
                    "published": row["published"],
                    "abstract": row["abstract"],
                    "pdf_url": row["pdf_url"]
                }
            })

        return results


## **Интеграция с LLM**

In [10]:
from google import genai


class LLMClient:

    def __init__(self, api_key: str, model: str = "gemini-2.5-flash"):
        self.client = genai.Client(api_key=api_key)
        self.model = model

    def ask(self, prompt: str) -> str:

        response = self.client.models.generate_content(
            model=self.model,
            contents=f'"role": "user", "content": "{prompt}"'
        )
        return response.text


## **RAG-QA по корпусу статей**

In [11]:
class ArxivQA:

    def __init__(self, vector_pipeline, llm_client, top_k=5):
        self.vec = vector_pipeline
        self.llm = llm_client
        self.top_k = top_k

    def answer(self, query: str) -> str:
        hits = self.vec.search(query, top_k=self.top_k)
        context = "\n\n".join(chunk["text_chunk"] for chunk in hits)

        prompt = f"""
You are a smart assistant. Here is the context from scientific articles:

{context}

Now answer the user's question:
{query}

Answer clearly and concisely.
        """

        return self.llm.ask(prompt)


## **поиск и RAG**

In [12]:
import numpy as np


class SearchEngine:

    def __init__(self, vector_pipeline, llm_client):
        self.vec = vector_pipeline
        self.llm = llm_client
        self._build_spell_corpus()

    def _build_spell_corpus(self):
        words = set()
        for txt in self.vec.chunks_df["text_chunk"]:
            for w in txt.lower().split():
                if w.isalpha():
                    words.add(w)
        self.corpus_words = list(words)

    def vector_search(self, query: str, top_k: int = 5):
        return self.vec.search(query, top_k)

    def build_prompt(self, query: str, context: str, template=None):
        if template is None:
            template = """
User query: "{query}"

Use ONLY this context:
-----------------
{context}
-----------------

Answer clearly and factually.
"""
        return template.format(query=query, context=context)

    def ask_rag(self, query: str, template=None, top_k=5):
        hits = self.vector_search(query, top_k)
        context = "\n\n".join(h["text_chunk"] for h in hits)
        prompt = self.build_prompt(corrected, context, template)
        return self.llm.ask(prompt)

    def ask_vanilla(self, query: str):
        return self.llm.ask(query)


# **Проверка выполнение**

In [13]:
fetcher = ArxivTopicFetcher()
articles = fetcher.fetch_by_category("machine_learning", max_results=2000)
fetcher.print_summary(articles, n=3)

Запрос: cat:cs.LG
Получено статей: 2000

=== Короткий обзор ===

1. ThetaEvolve: Test-time Learning on Open Problems
   Авторы: Yiping Wang, Shao-Rong Su, Zhiyuan Zeng и др.
   Дата: 2025-11-28
   ID: 2511.23473v1
   Категории: cs.LG, cs.CL
   Абстракт: Recent advances in large language models (LLMs) have enabled breakthroughs in mathematical discovery, exemplified by AlphaEvolve, a closed-source system that evolves programs to improve bounds on open ...

2. SmallWorlds: Assessing Dynamics Understanding of World Models in Isolated Environments
   Авторы: Xinyi Li, Zaishuo Xia, Weyl Lu и др.
   Дата: 2025-11-28
   ID: 2511.23465v1
   Категории: cs.LG
   Абстракт: Current world models lack a unified and controlled setting for systematic evaluation, making it difficult to assess whether they truly capture the underlying rules that govern environment dynamics. In ...

3. The Price of Progress: Algorithmic Efficiency and the Falling Cost of AI Inference
   Авторы: Hans Gundlach, Jayson Lync

In [None]:
chunker = TextChunker(chunk_size=150, overlap=15)

df = chunker.to_dataframe(articles)

print(df.head())
print(df["chunks"].iloc[0][:2])

In [None]:
stats = chunker.chunk_statistics(df)
print(stats)

In [None]:
# Full pipeline
pipeline = ArxivVectorPipeline(device="cpu")
pipeline.build(df)

results = pipeline.search("reinforcement learning for robots", top_k=5)

for r in results:
    print("\n---")
    print("Distance:", r["distance"])
    print("Chunk:", r["text_chunk"][:200], "…")
    print("Article title:", r["article"]["title"])
    print("ID:", r["article"]["id"])

In [None]:
llm = LLMClient(api_key="AIzaSyAQDkTa1BshAi4NMnDTasTZgbmijWdBA8w")

summary = llm.ask(
    f"Сделай короткий хардкорный конспект статьи: {articles[0]['abstract']}"
)

print(summary)


In [None]:
llm_model = LLMClient(api_key="AIzaSyAQDkTa1BshAi4NMnDTasTZgbmijWdBA8w")

qa = ArxivQA(pipeline, llm_model)

ans = qa.answer("How do data augmentation techniques improve the generalization of machine learning models?")
print(ans)


In [None]:
search = SearchEngine(pipeline, llm_model)

resp = search.ask_rag("reinforment learnig for robtos")
print(resp)

In [None]:
q = "What are advancements in model-based RL?"

print("=== Vanilla LLM ===")
print(search.ask_vanilla(q))

print("\n=== RAG ===")
print(search.ask_rag(q))


In [None]:
prompt_1 = """
Provide a scientific explanation grounded strictly in the supplied corpus.

Query: "{query}"

Base your answer ONLY on the information in:
{context}

If details are absent in the corpus, respond that no relevant evidence is present.
"""
print(search.ask_rag("how transformer-based models compress high-dimensional embeddings", template=prompt_1))


In [None]:
prompt_2 = """
Your task is to extract factual statements from the given context
and use only those facts to answer the user’s question.

Question: "{query}"

Relevant extracted facts must come solely from:
{context}

Do not infer or extend beyond what is explicitly stated.
"""
print(search.ask_rag("why normalization of embeddings affects similarity search accuracy", template=prompt_2))


In [None]:
prompt_3 = """
Explain the answer with simple analogies suitable for a newcomer,
but rely strictly on data from the context.

Question: "{query}"

Context to use:
{context}

If the context lacks information needed for the answer,
state that the corpus does not cover this topic.
"""
print(search.ask_rag("trade-offs between large pretrained models and lightweight fine-tuned models", template=prompt_3))


In [None]:
prompt_4 = """
Compose a brief analytical report (3–4 sentences)
based exclusively on the provided material.

Question: "{query}"

Use ONLY the content below:
{context}

Avoid adding external knowledge or assumptions.
"""
print(search.ask_rag("how synthetic data generation impacts model generalization", template=prompt_4))
