# Agentic RAG Implementation - Project Overview

The project "Agentic RAG Implementation" aims to create a sophisticated Retrieval Augmented Generation (RAG) system by leveraging an agentic framework that incorporates self-reflection, continuous learning, and human-in-the-loop interaction.

**Purpose and Main Features:**
The primary purpose is to build an intelligent research agent capable of:
1.  **Information Retrieval**: Searching for relevant research papers on arXiv, performing general web searches, and extracting content from specified websites or PDF URLs.
2.  **Knowledge Management**: Populating and retrieving information from a vector database (`Chroma`) to maintain a persistent knowledge base.
3.  **Self-Refinement and Learning**: The agent (`SelfRefineAgent`) can reflect on its performance, identify mistakes, and learn from past experiences stored in `LearningMemory` to improve future task executions.
4.  **Human-in-the-Loop**: Incorporating mechanisms for user interaction, such as reviewing and modifying agent plans (`review_after_plan`) and evaluating final answers (`evaluate_final_answer`) to guide the agent's learning process.
5.  **Task Orchestration**: A `router_agent` refines initial user queries before delegating them to the `research_agent`.

**Problem Solved:**
Traditional RAG systems often lack the adaptability and self-correction mechanisms to handle complex, multi-step research tasks or to improve over time. This project addresses these limitations by:
*   Enabling the agent to dynamically choose and use appropriate tools based on the task.
*   Allowing it to refine queries and content for better database interaction.
*   Providing a feedback loop where the agent learns from its successes and failures, reducing repetitive errors and improving the quality of its responses.
*   Integrating human oversight to ensure alignment with user intent and ethical considerations.

**Technologies Used:**
*   **Smolagents**: The core framework for building and orchestrating the intelligent agents.
*   **Mistral API**: Utilized for various LLM functionalities, including chat completion, agent creation (`mistral_client.beta.agents.create`), and OCR (`mistral_client.ocr.process`).
*   **ChromaDB**: A vector database used for storing and retrieving documents based on semantic similarity. `langchain-chroma` and `MistralAIEmbeddings` are used for integration and embedding generation.
*   **Tavily**: For robust web search capabilities (`TavilyWebSearch`) and website content extraction (`ExtractWebsiteContent`).
*   **Opik**: An observability tool used for tracing and tracking agent execution and learning processes.
*   **Arxiv API**: For programmatically searching and fetching research papers (`FetchingArxivPapers`).

## Explain the Workflow

### End-to-End Workflow of the Agentic RAG System

The `main()` function orchestrates the entire agentic RAG system, from initialization to task execution, evaluation, and continuous learning. Below is a detailed breakdown of the workflow:

1.  **Initialization and Configuration**: Before any task begins, the `main()` function initializes essential components:
    *   **Mistral Client**: `mistral_client` is created to interact with various Mistral AI services (chat, agents, OCR).
    *   **ChromaDB**: The `vector_store` (a ChromaDB instance) is set up with `MistralAIEmbeddings` for semantic search and storage.
    *   **Learning Memory**: `learning_memory` is instantiated to store and retrieve past experiences for self-refinement.
    *   **External Agents**: Specific Mistral AI agents (`mistral_websearch_agent`, `query_refiner_agent`, `db_verifier_agent`) are created for specialized tasks like web search verification, query refinement, and database content verification, respectively.
    *   **Tools**: All necessary tools for the `SelfRefineAgent` (e.g., `FetchingArxivPapers`, `OCRAgent`, `TavilyWebSearch`, `ExtractWebsiteContent`, `FillingDatabase`, `RetrieverTool`) are instantiated.
    *   **Main Agents**: The `SelfRefineAgent` (named `agent`) and the `router_agent` are instantiated with their respective tools, models, and configurations.

2.  **Initial Query and Refinement**:
    *   A `user_query` is received (e.g., `task`).
    *   The `router_agent` (an instance of `ToolCallingAgent`) is the first point of contact. Its primary role is to refine the user's initial query.
    *   It uses the `ReQueryTool`, which in turn leverages the external `query_refiner_agent` (a Mistral AI agent), to make the query more precise and detailed. This step ensures that the subsequent research is focused and effective.

3.  **Task Delegation and Learning Enhancement**:
    *   Once the query is refined, the `router_agent` delegates it to the `SelfRefineAgent` (named `agent`).
    *   Before the `SelfRefineAgent` starts executing, it consults its `LearningMemory`. The `enhance_task_with_learnings` method retrieves relevant past experiences, reflections, and previous answers related to the current task. These learnings are incorporated into the task description, providing the agent with valuable context and suggestions for improvement from prior runs.

4.  **Planning and Tool Usage**:
    *   The `SelfRefineAgent` then proceeds to plan its actions based on the enhanced task. It determines which tools are most appropriate to address the query.
    *   It dynamically calls various tools:
        *   `FetchingArxivPapers`: To search for research papers on arXiv.
        *   `TavilyWebSearch` and `ExtractWebsiteContent`: For general web searches and extracting content from websites.
        *   `OCRAgent`: To process and extract content from PDF URLs.
        *   `RetrieverTool`: To retrieve semantically similar documents from the `vector_store`.
        *   `FillingDatabase`: To populate the `vector_store` with newly acquired information, ensuring content is chunked and verified by the `db_verifier_agent`.
    *   *(Note: While the provided notebook comments out `planning_interval` and `step_callbacks`, if active, a human-in-the-loop review process would occur here. The `review_after_plan` callback would display the agent's plan to the user, allowing them to approve, modify, or cancel it, guiding the agent's strategy before execution.)*

5.  **Answer Synthesis**: As the agent gathers information, it synthesizes it to construct a comprehensive final answer to the user's original query.

6.  **Final Answer Evaluation and Self-Correction**:
    *   After producing a final answer, the `evaluate_final_answer` callback is triggered.
    *   This function assesses the quality of the final answer based on criteria like relevancy, depth, structure, and certainty. It might use the `mistral_websearch_agent` to double-check information if needed to gauge certainty.
    *   If the evaluation score is satisfactory (e.g., `avg_score > 7.0`), the answer is approved.
    *   If the answer is unsatisfactory, feedback is generated and added as a new `TaskStep` to the agent's memory. This feedback prompts the agent to self-correct and refine its answer in a subsequent iteration.

7.  **Self-Reflection and Learning**:
    *   Regardless of success or failure, the `extract_learnings` method is called at the end of each run.
    *   It invokes `generate_reflections` to analyze the agent's performance during the run, identifying `mistakes` and `correct_actions` from the step trace. This process involves a Mistral AI model (`mistral-small-2506`) to parse the run history and provide structured reflections.
    *   It also calls `generate_improvement_suggestions` to generate actionable advice for future runs based on the identified mistakes and correct actions.
    *   The `reward_output` method interacts with the user to get feedback on the final answer, which contributes to a numerical reward score. This score, along with reflections and suggestions, is then stored as a `LearningExperience` in the `LearningMemory`, ensuring the agent continuously improves over time.

In [None]:
%%capture
!pip install huggingface_hub transformers tavily-python

In [None]:
%%capture
!pip install --upgrade opik 'smolagents[telemetry,toolkit]' ddgs arxiv langchain chromadb langchain_chroma mistralai langchain_mistralai firecrawl smolagents openinference-instrumentation-smolagents opentelemetry-sdk opentelemetry-exporter-otlp-proto-http opik requests==2.32.5 transformers

In [None]:
import torch
import os
import re
import arxiv
import chromadb
from uuid import uuid4
import dataclasses
from typing import Dict, List, Optional, Any, Callable, Tuple
from datetime import datetime
from tavily import TavilyClient

from langchain_core.documents import Document
from langchain_chroma import Chroma
from langchain_mistralai import MistralAIEmbeddings

from smolagents import (
    InferenceClientModel,
    ToolCallingAgent,
    Tool,
    DuckDuckGoSearchTool,
    TaskStep,
    PlanningStep,
    ActionStep,
    ChatMessage
)
from mistralai import Mistral
from google.colab import userdata
from pydantic import BaseModel, BaseConfig
from firecrawl import Firecrawl
import opik
import json
from sentence_transformers import SentenceTransformer, util

In [None]:
class Config(BaseConfig):
  HF_INF_TOKEN = "HF_INF_TOKEN"
  MISTRAL_MODEL = "mistral-ocr-2505"
  MISTRAL_API_KEY = "MISTRAL_TEST_KEY"
  OPIK_API_KEY = "OPIK_API_KEY"
  CHROMA_CLOUD_KEY = "CHROMA_CLOUD_KEY"
  CHROMA_TENANT = "CHROMA_TENANT_ID"
  CHROMA_DATABASE = "AI_Research_Papers"
  TAVILY_API_KEY = "TAVILY_API_KEY"

config = Config()

# Set HF_TOKEN environment variable for smolagents
os.environ["HF_TOKEN"] = config.HF_INF_TOKEN

# Setup Opik tracing immediately after config is defined
os.environ['OPIK_PROJECT_NAME'] = "smolagents_rag"
opik.configure(api_key=config.OPIK_API_KEY, workspace='Your-Workspace-Name')

device = 'cuda' if torch.cuda.is_available() else 'cpu'

model_emb = SentenceTransformer("all-MiniLM-L6-v2")

Step 1: Creating tools for the agent -> Workflow -> Memory + DB -> Tracing + Eval

In [None]:
# Fetching arxiv papers
class FetchingArxivPapers(Tool):
    name = "research_paper_finder"
    description = "Based on user query, search for the relevant documents/research papers on the arxiv website. The final answer should be in a JSON format. Use this when the query is related to research papers."
    inputs = {
        "title": {
            "type": "string",
            "description": "The title the user wants to search for and find related research papers on the arxiv website. This can be multiple keywords to maximize the result.",
        },
        "papers_count": {
            "type": "integer",
            "description": "The number of papers the search result should return for that title.",
        }
    }
    output_type = "object"

    @opik.track(name="retrieving_arxiv_paper")
    def forward(self, title: str, papers_count: int) -> list[dict]:
        assert isinstance(title, str), "Your search query must be a string"

        search_query = f"'all':{title}"
        search = arxiv.Search(
          query=search_query,
          max_results=papers_count,
          sort_by=arxiv.SortCriterion.SubmittedDate,
          sort_order=arxiv.SortOrder.Descending
      )

        papers = []

        client = arxiv.Client()

        search = client.results(search)

        for result in search:
            paper_info = {
                    'title': result.title,
                    'authors': [author.name for author in result.authors],
                    'summary': result.summary,
                    'published': result.published,
                    'pdf_url': result.pdf_url,
                    'arxiv_url': result.entry_id
                }
            papers.append(paper_info)

        return papers

In [None]:
class FillingDatabase(Tool):
    name = "database_populater"
    description = "Adding text content (e.g., research papers, blog snippets) to the vector database. Each entry in 'content' should be a substantial, coherent passage that makes sense independently when retrieved, avoiding very short or fragmented sentences. Ensure that the 'metadata_descr' accurately captures the core idea of each content entry to provide sufficient context for retrieval and understanding. Calling this tool must be in JSON format. Only those contents should be added to the DB, which provide some sort of answer to the user query."
    inputs = {
        "content": {
            "type": "array",
            "items": {"type": "string"},
            "description": "A list of document contents to populate the VectorDB with. Each item should be a coherent and self-contained passage.",
        },
        "title": {
            "type": "string",
            "description": "The title of the document that is added to the database in a couple of words. This will be linked to the metadata to provide the origin of the content, for better referencing.",
        },
        "url": {
            "type": "string",
            "description": "The URL link of the source where the information was retrieved from. Only one link can be used for one action to populate the DB.",
        }
    }
    output_type = "string"

    def __init__(self, vectordb: Chroma, client: Mistral, db_verifier_id: str, **kwargs):
        super().__init__(**kwargs)
        self.vectordb = vectordb
        self.client = client
        self.db_verifier_id = db_verifier_id

    @opik.track(name="populating_the_DB")
    def forward(self, content: list[str], title: str, url: str) -> None:
        assert isinstance(content, list), "The input must be a list."

        doc_verified = self.client.beta.conversations.start(
            agent_id=self.db_verifier_id,
            inputs=json.dumps(content)
        )

        verified_agent_response = doc_verified.outputs[0].content
        print(verified_agent_response)
        refined_content = json.loads(verified_agent_response)

        documents = [Document(page_content=doc_content, metadata={'description': title, 'url': url}) for doc_content in refined_content]

        uuids = [str(uuid4()) for _ in range(len(documents))]

        self.vectordb.add_documents(documents=documents, ids=uuids)

        return "Successfully added all documents to DB"

class RetrieverTool(Tool):
    name = "data_retriever"
    description = "Using semantic similarity, retrieves some documents from the knowledge base that have the closest embeddings to the input query. Calling this tool must be in JSON format."
    inputs = {
        "query": {
            "type": "string",
            "description": "The query to perform. This should be semantically close to your target documents. Use the affirmative form rather than a question.",
        },
        "k": {
            "type": "integer",
            "description": "The number of similar documents to retrieve to answer the user query.",
        }
    }
    output_type = "string"

    def __init__(self, vectordb: Chroma, **kwargs):
        super().__init__(**kwargs)
        self.vectordb = vectordb

    @opik.track(name="retrieving_from_db")
    def forward(self, query: str, k: int) -> str:
        assert isinstance(query, str), "Your search query must be a string"

        docs = self.vectordb.similarity_search_with_score(
            query,
            k=k,
        )

        # For ChromaDB Cloud hosting, the smaller the score, the more relevant are the documents to each other
        min_score = min([score for _, score in docs])
        print(min_score)

        if min_score <= 0.3:
          return [f"* {doc.page_content} [{doc.metadata['description']}]" for doc, score in docs]
        else:
          return "No relevant document found, try different tools!"

In [None]:
# Use an OCR Agent as a tool
class OCRAgent(Tool):
  name = "ocr_agent"
  description = "Using an OCR AI model to retrieve the content of PDF files while keeping its structured format. Calling this tool must be in JSON format. Best use case: PDF URLs"
  inputs = {
      "pdf_url": {
          "type": "string",
          "description": "The link where the PDF file is located on the web",
      }
  }
  output_type = "array"

  def __init__(self, client: Mistral, model: str, **kwargs):
        super().__init__(**kwargs)
        self.client = client
        self.model = model

  @opik.track(name="read_with_ocr")
  def forward(self, pdf_url: str) -> list[str]:

    pages_content = []

    ocr_response = self.client.ocr.process(
      model=self.model,
      document={
          "type": "document_url",
          "document_url": pdf_url
      },
      table_format="markdown",
      include_image_base64=False
    )

    for page in ocr_response.pages:
      pages_content.append(page.markdown)

    return pages_content

In [None]:
class ReQueryTool(Tool):
  name = "requery_agent"
  description = "Refine and make the user query more precise by using an internal agent."
  inputs = {
      "initial_query": {
          "type": "string",
          "description": "The initial query from the user that needs to be refined.",
      }
  }
  output_type = "string"

  def __init__(self, client: Mistral, requery_agent_id: str, **kwargs):
        super().__init__(**kwargs)
        self.client = client
        self.requery_agent_id = requery_agent_id

  @opik.track(name="improve_user_query")
  def forward(self, initial_query: str) -> str:
    response = self.client.beta.conversations.start(
        agent_id=self.requery_agent_id,
        inputs=initial_query
    )
    try:
      upgraded_query = response.outputs[0].content
      return upgraded_query
    except Exception as e:
      print(e)
      return initial_query

In [None]:
class ExtractWebsiteContent(Tool):
  name = "extract_website_content"
  description = "Using the website URL, it scrapes the website content and returns its content. Best use case: general website urls."
  inputs = {
      "website_url": {
          "type": "string",
          "description": "The link of the website that should be scraped to retrieve its content.",
      }
  }
  output_type = "string"

  def __init__(self, client: TavilyClient, **kwargs):
      super().__init__(**kwargs)
      self.client = client

  @opik.track(name="get_site_content")
  def forward(self, website_url: str) -> str:
    try:
      response = self.client.extract(urls=website_url)

      extract_result, url = response['results'][0]['raw_content'], response['results'][0]['url']
      return extract_result, url
    except Exception as e:
      return f'Request failed, following error occured during execution: {e}'

In [None]:
class TavilyWebSearch(Tool):
  name = "websearch_tool"
  description = "Using a websearch agent to find information on websites."
  inputs = {
      "query": {
          "type": "string",
          "description": "The query from the user for which the answer is needed.",
      }
  }
  output_type = "string"

  def __init__(self, client: TavilyClient, *args, **kwargs):
        super().__init__(**kwargs)
        self.client = client

  @opik.track(name="using_websearch_tool")
  def forward(self, query: str) -> str:
    try:
      response = self.client.search(query=query, search_depth='fast', max_results=10)

      search_result, url = response['results'][0]['content'], response['results'][0]['url']
      return search_result, url
    except Exception as e:
      return f'Request failed, following error occured during execution: {e}'

Step 2: Edit memory to the agent and human in the loop

In [None]:
def display_plan(plan_content):
    """Display the plan in a formatted way"""
    print("\n" + "=" * 60)
    print("RESEARCH AGENT PLAN")
    print("=" * 60)
    print(plan_content)
    print("=" * 60)


def get_user_choice():
    """Get user's choice for plan approval"""
    while True:
        choice = input("\nChoose an option:\n1. Approve plan\n2. Modify plan\n3. Cancel\nYour choice (1-3): ").strip()
        if choice in ["1", "2", "3"]:
            return int(choice)
        print("Invalid choice. Please enter 1, 2, or 3.")


def get_modified_plan(original_plan):
    """Allow user to modify the plan"""
    print("\n" + "-" * 40)
    print("MODIFY PLAN")
    print("-" * 40)
    print("Current plan:")
    print(original_plan)
    print("-" * 40)

    print("What would you like to modify?")
    update_request = input().strip()

    modified_plan = mistral_client.chat.complete(
        model = "ministral-8b-2512",
        messages = [
            {
                "role": "user",
                "content": f"Based on user request: {update_request}, update {original_plan} to fit into the needs of the user. Return the updated plan in the same format as the original one.\
                Return only the modified plan and nothing else. What was not modified, should remain the same. Avoid adding comments explaining what was changed.",
            },
        ]
    )
    print(modified_plan.choices[0].message.content)
    return modified_plan.choices[0].message.content if modified_plan else original_plan

@opik.track(name="review_after_plan")
def review_after_plan(memory_step, agent):
    """
    Step callback that interrupts the agent after a planning step is created for human review.
    This allows for user interaction to review and potentially modify the plan.
    """
    if isinstance(memory_step, PlanningStep):

        # Display the created plan
        display_plan(memory_step.plan)

        # Get user choice
        choice = get_user_choice()

        if choice == 1:  # Approve plan
            print("‚úÖ Plan approved! Continuing execution...")
            return

        elif choice == 2:  # Modify plan
            while choice == 2:
              modified_plan = get_modified_plan(memory_step.plan)

              display_plan(memory_step.plan)

              choice = get_user_choice()

            if choice == 3:
              agent.interrupt()
              return

            memory_step.plan = modified_plan

            print("\nPlan updated!")
            print("‚úÖ Continuing with modified plan...")
            return

        elif choice == 3: # Cancel plan
            print("‚ùå Execution cancelled by user.")
            agent.interrupt()
            return

Step 4: Setting up the Workflow

In [None]:
# Initiating the database
embeddings = MistralAIEmbeddings(
    model="mistral-embed",
    api_key=config.MISTRAL_API_KEY,
)

vector_store = Chroma(
    collection_name="ai_papers",
    embedding_function=embeddings,
    chroma_cloud_api_key=config.CHROMA_CLOUD_KEY,
    tenant=config.CHROMA_TENANT,
    database=config.CHROMA_DATABASE,
)

In [None]:
import functools
from smolagents.agents import AgentMemory, ToolCallingAgent
from pydantic import BaseModel
import traceback

class AnswerEval(BaseModel):
    relevancy: float
    depth: float
    structure: float
    certainty: float
    avg_score: float
    feedback: str

@opik.track(name="eval_final_answer")
def evaluate_final_answer(final_answer: str, agent_memory: AgentMemory, agent: ToolCallingAgent, user_query: str) -> bool:
  """Evaluates the final answer and returns True if satisfactory, False otherwise.
  If not satisfactory, adds the feedback to agent's memory for the agent to use.
  """
  try:
    while True:
      # let the model call tools to investigate what the user wanted: websearch
      response = mistral_client.beta.conversations.start(
          agent_id=mistral_websearch_agent.id,
          inputs=user_query
      )
      user_query += response.outputs[0].content

      chat_response = mistral_client.chat.parse(
        model='magistral-small-latest',
        messages=[
            {
                "role": "system",
                "content": f"""
                  Act as a worldclass evaluator on wide range of topics. Your task is to evaluate the final answer based on the criteria
                  outlined below. Each category should receive a float between 0-10, while the avg_score element should be the average score of
                  the other categories. After giving out the scores for each category, provide a brief constructive feedback on what should be improved.
                  Be very critical since your goal is to only let those answers pass, which can truly give an extensive answer to be used by AI engineers (unless the user specifies they do not want one).
                  In your feedback, provide solely what needs to be improved, either in a form of listing or another way.

                  Evaluation Criteria:
                  - relevancy: how relevant the final answer is to the task
                  - depth: to what extent does the answer go into the depth the task requires
                  - structure: whether the answer is structured in an easily digestible way to learn the content
                  - certainty: how certain you are that the final answer is relevant to the user query. Low score could mean that you do not know for sure what the user really wants/what their query is truly about.
                  - avg_score: average score of the scores of all the other categories
                  - feedback: your feedback on what to improve. This feedback will be given to the agent for self-correction.
                  Your evaluation should be based on the provided Task and Final Answer.
                """
            },
            {
                "role": "user",
                "content": f"Task: {user_query}; Final Answer: {final_answer}"
            },
        ],
        response_format=AnswerEval,
        temperature=0.2
      )

      evaluation_result = chat_response.choices[0].message.parsed
      if evaluation_result.certainty >= 6.0:
        break

    if evaluation_result.avg_score > 7.0:
      print(f"‚úÖ Final answer approved with average score: {evaluation_result.avg_score}")
      return True
    else:
      print(f"‚ùå Final answer needs improvement. Average score: {evaluation_result.avg_score}. Feedback: {evaluation_result.feedback}")
      # Add feedback to the agent's memory as a new TaskStep
      feedback_step = TaskStep(
          task=f"Previous final answer was unsatisfactory. Feedback: {evaluation_result.feedback}. Average score: {evaluation_result.avg_score}. Please expand and improve your answer based on the feedback. Use tools if needed.",
      )
      agent_memory.steps.append(feedback_step)
      return False
  except Exception as e:
    print(f"Error in evaluate_final_answer: {e}")
    traceback.print_exc()
    return False

Implement self-refining agent

In [None]:
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Callable, Tuple
from datetime import datetime

@dataclass
class LearningExperience:
    """Store learning experiences from agent runs"""
    timestamp: datetime
    task: str
    correct_actions: List[str]
    mistakes: List[str]
    final_answer: str
    final_answer_reward: float
    reflections: str
    improvement_suggestions: List[str]
    metadata: Dict[str, Any] = field(default_factory=dict)


In [None]:
class LearningMemory:
    """Persistent learning memory for self-refinement"""

    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.experiences: List[LearningExperience] = []
        self.load_memory()

    @opik.track(name="load_memory")
    def load_memory(self):
      """Load the experiences to the memory from the json file"""
      try:
        with open(self.storage_path, "r") as f:
          data = json.load(f)

        self.experiences = [
            LearningExperience(
              timestamp=datetime.fromisoformat(experience['timestamp']),
              task=experience['task'],
              mistakes=experience['mistakes'],
              correct_actions=experience['correct_actions'],
              final_answer=experience['final_answer'],
              final_answer_reward=experience['final_answer_reward'],
              reflections=experience['reflections'],
              improvement_suggestions=experience['improvement_suggestions'],
              metadata=experience.get('metadata', {})
          ) for experience in data
            ]

      except FileNotFoundError:
        print(f"Learning memory file '{self.storage_path}' not found. Creating an empty one.")
        os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True)
        with open(self.storage_path, "w") as f:
          json.dump([], f)
        self.experiences = []
      except json.JSONDecodeError as e:
        print(f"Error decoding JSON from '{self.storage_path}': {e}. Initializing with empty experiences.")
        self.experiences = []
      except Exception as e:
        print(f"An unexpected error occurred loading learning memory: {e}. Initializing with empty experiences.")
        self.experiences = []

    @opik.track(name="save_memory")
    def save_memory(self):
      """Save the memory to the json file"""
      try:
            data = [
                {
                    'timestamp': experience.timestamp.isoformat(),
                    'task': experience.task,
                    'mistakes': experience.mistakes,
                    'correct_actions': experience.correct_actions,
                    'final_answer': experience.final_answer,
                    'final_answer_reward': experience.final_answer_reward,
                    'reflections': experience.reflections,
                    'improvement_suggestions': experience.improvement_suggestions,
                    'metadata': experience.metadata
                } for experience in self.experiences
            ]
            os.makedirs(os.path.dirname(self.storage_path) or '.', exist_ok=True)
            with open(self.storage_path, 'w') as f:
                json.dump(data, f, indent=2)
      except Exception as e:
          print(f"Error saving learning memory: {e}")

    @opik.track(name="add_experience")
    def add_experience(self, experience: LearningExperience):
      """Add further experiences to the memory"""
      self.experiences.append(experience)
      self.save_memory()

    @opik.track(name="top_reflections")
    def top_reflections(self, task: str, top_k=3) -> str:
      """Retrieve the most relevant reflections from past experiences"""
      if not self.experiences:
          return "", ""
      task_trimmed = re.search(r'---\s*Task:\s*\n(.*?)\s*---', task, re.DOTALL)
      if task_trimmed:
          task_trimmed = task_trimmed.group(1).strip()
      else:
        task_trimmed = task
      task_emb = model_emb.encode(task_trimmed, convert_to_tensor=True)
      task_instr = [exp.task for exp in self.experiences]
      embs = model_emb.encode(task_instr, convert_to_tensor=True)
      scores = util.cos_sim(task_emb, embs)[0]
      best = scores.topk(min(top_k, len(self.experiences)))
      out_refl = []
      out_answer = []
      for score, idx in zip(best.values, best.indices):
          if score > 0.5:
            out_refl.append(f"- {self.experiences[idx].reflections} (score={score:.2f})")
            out_answer.append(f"- {self.experiences[idx].final_answer} (score={score:.2f})")
          else:
            return ""
      final_refl = "\n".join(out_refl)
      final_output = "\n".join(out_answer)
      return final_refl, final_output

In [None]:
# JSON validator function
class ReflectionOutput(BaseModel):
    reflection: str
    mistakes: List[str]
    correct_actions: List[str]

In [None]:
class SelfRefineAgent(ToolCallingAgent):
    def __init__(self, *args, learning_memory : LearningMemory, mistral_client: Mistral, **kwargs):
        super().__init__(*args, **kwargs)
        self.learning_memory  = learning_memory
        self.mistral_client = mistral_client

    @opik.track(name="enhance_task_with_learnings")
    def enhance_task_with_learnings(self, task: str, relevant_experiences: str, previous_answers: str) -> str:
        """Enhance the task with insights from past experiences"""
        if not relevant_experiences:
          return task
        # Add relevant experiences as context to the task
        # Another option: instead of adding the result of the previous task to the model as prompt
        # it can be stored in the DB and the model retrieves it when needed, so overpopulate the context window.
        task_upd = re.search(r'---\s*Task:\s*\n(.*?)\s*---', task, re.DOTALL).group(1).strip()
        if previous_answers:
          enhanced_task = f"{task_upd}\n\n--- Here are the feedbacks for improvements from previous similar queries that you should take into account: ---\n{relevant_experiences}\n\n--- Here is the result of previous similar queries, use it if needed as a starting point: ---\n{previous_answers}"
        else:
          enhanced_task = f"{task_upd}\n\n--- Here are the feedbacks for improvements from previous similar queries that you should take into account: ---\n{relevant_experiences}"
        return enhanced_task

    @opik.track(name="extract_learnings")
    def extract_learnings(self, task: str, result: str):
        """Extract learnings from this run and store them"""

        refl_calls = self.generate_reflections(task)

        mistakes = refl_calls.mistakes
        correct_actions = refl_calls.correct_actions
        final_answer = result
        final_answer_reward = self.reward_output(final_answer)
        final_answer_reward = list(final_answer_reward.values())
        reflections = refl_calls.reflection
        improvement_suggestions = self.generate_improvement_suggestions(task, correct_actions, mistakes)

        def extract_query(query: str) -> str:
          """Get the improved query from the instructions of the manager agent"""
          task_upd = re.search(r'---\s*Task:\s*\n(.*?)\s*---', query, re.DOTALL)
          if task_upd:
              return task_upd.group(1).strip()
          return task

        task_upd = extract_query(task)

        experience = LearningExperience(
            timestamp=datetime.now(),
            task=task_upd,
            correct_actions=correct_actions,
            mistakes=mistakes,
            final_answer=str(result),
            final_answer_reward=final_answer_reward,
            reflections=reflections,
            improvement_suggestions=improvement_suggestions,
            metadata={
                'duration': (datetime.now() - self.current_run_start_time).total_seconds(),
                'total_steps': len(self.memory.steps)
            }
        )
        self.learning_memory.add_experience(experience)

    @opik.track(name="generate_reflections")
    def generate_reflections(self, task: str) -> ReflectionOutput:
        """Generate reflections about this run"""
        task_trimmed = re.search(r'---\s*Task:\s*\n(.*?)\s*---', task, re.DOTALL)
        if task_trimmed:
            task_trimmed = task_trimmed.group(1).strip()
        else:
          task_trimmed = task
        steps = self.memory.get_succinct_steps()
        history_steps = "\n".join([f"{s['role']}: {s['content']}" for s in steps if hasattr(s, 'role')])
        prompt = f"""
          Act as an expert at introspectively analysing content. Given the task and the step trace below,
          write a concise reflection (<=3 bullet points) that will help a future agent solve
          similar tasks better. Make sure to also point out mistakes the agent made in that step.
          The mistakes should be in bullet points and should be clear actionable steps.
          E.g.: mistake if the agent called the tools in a less effective order -> what to add:
          Retrieve any relevant info from the DB before doing websearch or other actions.
          Besides finding the mistakes, the correct actions, what the agent did right, should be reinforced in agent,
          so make sure to list them too.

          Task: {task_trimmed}

          Trace:
          {history_steps}

          Answer in JSON object with reflection, mistakes and correct_actions as the key.
          Return ONLY the JSON object and nothing else.
        """

        reflection_response = self.mistral_client.chat.parse(
            model='mistral-small-2506',
            messages=[
                {"role": "user", "content": prompt}
            ],
            response_format=ReflectionOutput,
        )
        return reflection_response.choices[0].message.parsed

    @opik.track(name="generate_improvement_suggestions")
    def generate_improvement_suggestions(self, task: str, correct_actions: List[str], mistakes: List[str]) -> List[str]:
        """Generate specific improvement suggestions"""
        suggestions = []
        task_trimmed = re.search(r'---\s*Task:\s*\n(.*?)\s*---', task, re.DOTALL)
        if task_trimmed:
            task_trimmed = task_trimmed.group(1).strip()
        else:
          task_trimmed = task
        prompt = f"""
          Act as an expert to providing clear actions on how to improve the completion of the user query on the topic of {task_trimmed},
          provide a list of suggestions for improvements in a form of clear actions that should be taken
          to ensure better future execution.
          Return only the list of suggestions!
          The agent, in the next run, should be able to work with those suggestions as if it was a sole researcher,
          who has no access to experts or any group of people. The aim of your suggestions is to improve the research result or answer to the user query.

          Bad suggestion: 'Update the information regularly to reflect any changes or updates in the model architecture'.
          Since the agent will not be able to do this, since the process lasts until it provided the final answer to the user query.

          Mistakes made during the task: {mistakes}
          Correct actions to make during the task: {correct_actions}
        """
        suggestion = self.model([ChatMessage(role= "user", content= prompt)])
        suggestions.append(suggestion.content)

        return suggestions

    @opik.track(name="reward_output")
    def reward_output(self, final_answer: str) -> float:
      """Nudge the model into the user preferred output"""

      user_feedback = input("Did you get the desired response? (Y/N)").strip()
      if user_feedback.upper() == "Y":
        prompt_features = f"""
          The user liked the following final answer: '{final_answer}'.
          Analyze the answer and extract its key desirable features. Return these features as a JSON string.
          Example: {{\"clarity\": \"excellent\", \"conciseness\": \"good\", \"detail_level\": \"high\"}}
          Feel free to add more criteria than the ones in the example.
          """

      elif user_feedback.upper() == "N":
        prompt_features = f"""
          The user did NOT like the following final answer: '{final_answer}'.
          Analyze the answer and identify its key undesirable features. Return these features as a JSON string.
          Example: {{\"clarity\": \"poor\", \"conciseness\": \"too verbose\", \"detail_level\": \"low\"}}
          Feel free to add more criteria than the ones in the example.
          """

      else:
        print("Unrecognised response, defaulting reward to 0.")
        return 0.0

      review = self.model([ChatMessage(role= "user", content= prompt_features)])
      review_content = review.content

      prompt_score = f"""
        Score the analysis of the result based on the user's judgment of it: {review_content}.
        Return ONLY a float score between 0.0-10.0 and nothing else! Avoid explaining the reason for it or adding anything else!
        0.0 indicating that the final result was completely terrible and irrelevant to the user.
        10.0 indicating that the final result could completely satisfy the needs of the user.
      """
      result_reward_msg = self.model([ChatMessage(role= "user", content= prompt_score)])
      try:
          result_reward = float(result_reward_msg.content.strip())
      except ValueError:
          print(f"Warning: Could not parse reward score from model output: {result_reward_msg.content}")
          result_reward = 0.0 # Default to 0 if parsing fails

      return {"The reward score of your final answer is": result_reward}

    def run(self, task: str, *args, **kwargs):
        self.current_run_start_time = datetime.now()
        relevant_experiences, previous_answers = self.learning_memory.top_reflections(task)
        enhanced_task = self.enhance_task_with_learnings(task, relevant_experiences, previous_answers)
        try:
            # Run the enhanced task
            result = super().run(enhanced_task, *args, **kwargs)

            # Analyze the run and extract learnings
            self.extract_learnings(task, result)

            return result

        except Exception as e:
            # Extract learnings from failure
            self.extract_learnings(task, str(e))
            raise

In [None]:
mistral_client = Mistral(api_key=config.MISTRAL_API_KEY)
# Create mistral agents to chip in during execution
# This agent is primarily useful when the user gives a link and asks about its content
mistral_websearch_agent = mistral_client.beta.agents.create(
    model="mistral-small-2506",
    description="Agent able to search information over the web to double check the responses of the agent.",
    name="Websearch Agent",
    instructions="""
      Using your `web_search` tool, double check the work of the agent based on the provided `user_query` and `final_answer`.
      First, make sure to understand what the user wants, make assumptions if needed, then find out the final answer is missing something that could be relevant to the user.
      Return an improved and updated `user_query` that would clarify the user's intention and aim of what they want to know.
    """,
    tools=[{"type": "web_search"}],
)

query_refiner_agent = mistral_client.beta.agents.create(
    model="mistral-small-2506",
    description="Agent designed to improve and make user queries more precise after performing a web search to understand user intent.",
    name="Query Refiner Agent",
    instructions="""
      Act as an expert at refining and improving existing queries, your goal is to upgrade the user's initial query,
      so it can be used by another agent to conduct deeper research on this topic.
      If needed, use the `web_search` tool to understand the context and what the user might be looking for.
      Based on the search results and your understanding, strictly only rewrite the original `user_query` to be more precise, detailed, and clear.
      Return *only* the refined query as a string, without any additional conversational text.
      Vital: avoid answering the query!

      Example:
      initial_query: Teach me what this paper is about: {url of Qwen3-TTS Technical Report }
      steps taken: go to the url -> find out that the user is interested about Qwen3-TTS -> refine the query, so the next agent can give a more extensive answer
      refined_query (output): What is Qwen3-TTS Technical Report about?
    """,
)

db_verifier_agent = mistral_client.beta.agents.create(
    model="mistral-small-latest",
    name="db-verifier-agent",
    description="Agent to improve and restructure the provided content into coherent and independently meaningful chunks, optimizing them for retrieval and storage in a vector database.",
    instructions="""
      You are an expert content optimizer and chunker for a vector database. Your primary role is to take incoming text content, provided as a JSON string representing a list of strings, and process it into discrete, self-contained, and highly retrievable chunks.

      Carefully process the provided `content` based on these criteria:
      1.  **Coherence and Independence:** Each output chunk must be a coherent and meaningful passage that can stand alone without needing additional context. Avoid very short, fragmented sentences, or content that relies heavily on its neighbors for understanding.
      2.  **Optimal for Retrieval:** Restructure the content to maximize its utility when retrieved. This means each chunk should ideally convey a single, complete idea or a closely related set of ideas.
      3.  **Structured Output:** Your output must be a list of strings, where each string is an optimized content chunk. The number of output chunks does not need to match the number of input items; you can combine or split content as necessary to meet the above criteria.

      Your output should *only* be the JSON representation of the list of strings. Do not include any conversational text, explanations, or other formatting outside the JSON array.
      Give no intro at the beginning of your answer, like 'Here is ...', only return the updated text.

      Note: if necessary, use your "web_search_premium" tool investigate the inputs further to make sure you store independent chunks.

      Example:
      Input: ['This is a partial sentence. It needs more context to make sense. Here is another idea. It is about AI.', 'And this is a third idea. It is very detailed.']
      Output: ['This is a partial sentence. It needs more context to make sense.', 'Here is another idea about AI.', 'This is a third idea, which is very detailed.']
    """,
    )


In [None]:
task = """What are the most popular scaling laws in LLM training, make sure to save contents to DB"""

In [None]:
learning_memory = LearningMemory("research_agent_memory.json")
@opik.track(name="main_run")
def main():
    """Run the complete plan customization example"""
    print("Starting the Agentic RAG Process")
    print("=" * 60)

    # Create agent with planning enabled and step callback
    # Fill in the final eval function with the user query
    evaluate_final_answer.__name__ = "eval_final_answer"
    eval_final_answer = functools.partial(evaluate_final_answer, user_query=task)

    # Initiating the agent
    model = InferenceClientModel(
        model_id="MiniMaxAI/MiniMax-M2",
        provider='auto',
        token=config.HF_INF_TOKEN,
      )

    agent = SelfRefineAgent(
        learning_memory = learning_memory,
        tools=[
            FetchingArxivPapers(),
            OCRAgent(client=mistral_client, model=config.MISTRAL_MODEL),
            TavilyWebSearch(client=TavilyClient(api_key=config.TAVILY_API_KEY)),
            ExtractWebsiteContent(client=TavilyClient(api_key=config.TAVILY_API_KEY)),
            FillingDatabase(vectordb=vector_store, client=mistral_client, db_verifier_id=db_verifier_agent.id),
            RetrieverTool(vectordb=vector_store),
          ],
        model=model,
        mistral_client=mistral_client,
        final_answer_checks=[eval_final_answer],
        verbosity_level=2,
        planning_interval=6,
        step_callbacks={PlanningStep: review_after_plan},
        name="research_agent",
        description="Agent that can conduct research to find research papers, retrieve content from the web and has access to the vector db."
      )

    router_agent = ToolCallingAgent(
        tools=[ReQueryTool(client=mistral_client, requery_agent_id=query_refiner_agent.id)],
        model=model,
        instructions="Your sole purpose is to refine the user's query using the `requery_agent` tool and then, without exception, immediately **delegate** the refined query to your managed agent. You **must not** attempt to answer the user's original task yourself.",
        managed_agents=[agent],
    )

    try:
        print(f"\nüìã Task: {task}")
        print("\nü§ñ Agent starting execution...")

        result = router_agent.run(task)

        print("\n‚úÖ Task completed successfully!")
        print("\nüìÑ Final Result:")
        print("-" * 40)
        print(result)

    except Exception as e:
        if "interrupted" in str(e).lower():
            print("\nüõë Agent execution was cancelled by user.")

            print(f"\nüìö Current memory contains {len(agent.memory.steps)} steps:")
            for i, step in enumerate(agent.memory.steps):
                step_type = type(step).__name__
                print(f"  {i + 1}. {step_type}")

            resume_choice = input("\nWould you like to see resume demonstration? (y/n): ").strip().lower()
            if resume_choice == "y":
                print("\nüîÑ Resuming execution...")
                try:
                    agent.run(task, reset=False)
                    print("\n‚úÖ Task completed after resume!")
                    print("\nüìÑ Final Result:")
                    print("-" * 40)
                except Exception as resume_error:
                    print(f"\n‚ùå Error during resume: {resume_error}")
                else:
                    print(f"\n‚ùå An error occurred: {e}")

In [None]:
if __name__ == "__main__":
    main()

In [None]:
print("Learnings:")
for i, experience in enumerate(learning_memory.experiences):
    print(f"--- Learning Experience {i+1} ---")
    for field_name, field_value in dataclasses.asdict(experience).items():
        # Handle lists and other potentially long outputs for better formatting
        if isinstance(field_value, list):
            print(f"  {field_name}:")
            for item in field_value:
                print(f"    - {item}")
        else:
            print(f"  {field_name}: {field_value}")
    print("-----------------------------------\n")