# Robotics Experiment Intelligence Agent

This notebook implements an Enterprise-style Experiment Intelligence Agent for robotics research. 
The agent ingests synthetic TurtleBot3 experiment logs, computes localization metrics, compares navigation configurations, 
and answers research questions without manual spreadsheet analysis.

Instead of manually parsing ROS data, aligning trajectories, and calculating RMSE or CPU usage across runs, 
the system uses a multi-agent architecture:

- **DataAnalysisAgent** — deterministic metrics computation from CSV logs  
- **ExplanationAgent** — converts numeric results into human-readable insights  
- **MemoryStore** — preserves context across queries for iterative analysis

All quantitative evaluation (RMSE, MAE, peak error, CPU/memory) is computed transparently using Python and remains reproducible.  
The agent layer adds reasoning and interactivity, similar to a research assistant, allowing the user to ask questions such as:

> “Which configuration achieved the lowest localization error across all runs?”

or

> “How does Gmapping affect performance for 10-minute missions?”

This notebook demonstrates the prototype using synthetic TurtleBot3 runs at 5 Hz across four configurations and three durations.


In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
!ls /kaggle/input


agents-intensive-capstone-project  synthetic-tb3-experiments-for-agent-analysis


In [2]:
from pathlib import Path

DATA_DIR = Path("/kaggle/input/synthetic-tb3-experiments-for-agent-analysis")

!ls "$DATA_DIR"


algorithma_gmappingoff_10min.csv  algorithmb_gmappingoff_10min.csv
algorithma_gmappingoff_2min.csv   algorithmb_gmappingoff_2min.csv
algorithma_gmappingoff_5min.csv   algorithmb_gmappingoff_5min.csv
algorithma_gmappingon_10min.csv   algorithmb_gmappingon_10min.csv
algorithma_gmappingon_2min.csv	  algorithmb_gmappingon_2min.csv
algorithma_gmappingon_5min.csv	  algorithmb_gmappingon_5min.csv


In [3]:
import pandas as pd
import numpy as np
from pathlib import Path

def load_run(file_path: Path) -> pd.DataFrame:
    """Load a single experiment CSV and validate columns."""
    df = pd.read_csv(file_path)
    required_cols = ["timestamp", "x", "y", "x_gt", "y_gt", "v_linear", "v_angular", "cpu", "memory"]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Missing columns in {file_path}: {missing}")
    return df

def parse_config_from_name(filename: str) -> dict:
    """
    Parse config info from filename pattern:
    algorithma_gmappingon_5min.csv
    """
    base = Path(filename).stem.lower()  
    parts = base.split("_")

    if len(parts) < 3:
        raise ValueError(f"Unexpected filename pattern: {filename}")

    algo_raw = parts[0]   # 'algorithma'
    gmap_raw = parts[1]   # 'gmappingon'
    dur_raw  = parts[2]   # '5min'

    algo_letter = algo_raw.replace("algorithm", "").upper()  # 'A' or 'B'
    gmapping    = "ON" if "on" in gmap_raw else "OFF"
    duration    = int(dur_raw.replace("min", "").strip())

    return {
        "algorithm": f"Algorithm{algo_letter}",
        "gmapping": gmapping,
        "duration_min": duration,
    }

def compute_localization_metrics(df: pd.DataFrame) -> dict:
    """Compute localization + resource metrics for one run."""
    dx = df["x"] - df["x_gt"]
    dy = df["y"] - df["y_gt"]
    err = np.sqrt(dx**2 + dy**2)

    rmse = float(np.sqrt(np.mean(err**2)))
    mae  = float(np.mean(np.abs(err)))
    max_err = float(np.max(err))

    cpu_mean = float(df["cpu"].mean())
    mem_mean = float(df["memory"].mean())

    return {
        "rmse_error": rmse,
        "mae_error": mae,
        "max_error": max_err,
        "cpu_mean": cpu_mean,
        "memory_mean": mem_mean,
    }

def summarize_all_runs(data_dir: Path) -> pd.DataFrame:
    """Scan CSVs in data_dir and build a summary table with config + metrics."""
    rows = []
    for csv_path in data_dir.glob("*.csv"):
        name_lower = csv_path.name.lower()
        # Only process files that look like our experiment logs
        if not name_lower.startswith("algorithma_") and not name_lower.startswith("algorithmb_"):
            continue

        df = load_run(csv_path)
        cfg = parse_config_from_name(csv_path.name)
        metrics = compute_localization_metrics(df)

        row = {
            "file": csv_path.name,
            **cfg,
            **metrics,
        }
        rows.append(row)

    summary_df = pd.DataFrame(rows)
    if summary_df.empty:
        raise RuntimeError(f"No valid experiment CSVs found in {data_dir}")

    return summary_df.sort_values(["algorithm", "gmapping", "duration_min"])


In [4]:
summary_df = summarize_all_runs(DATA_DIR)
summary_df


Unnamed: 0,file,algorithm,gmapping,duration_min,rmse_error,mae_error,max_error,cpu_mean,memory_mean
7,algorithma_gmappingoff_2min.csv,AlgorithmA,OFF,2,0.093685,0.083453,0.207659,28.058656,390.366364
1,algorithma_gmappingoff_5min.csv,AlgorithmA,OFF,5,0.218408,0.191426,0.42964,28.523656,384.248801
0,algorithma_gmappingoff_10min.csv,AlgorithmA,OFF,10,0.428548,0.373248,0.773025,28.143886,380.305109
6,algorithma_gmappingon_2min.csv,AlgorithmA,ON,2,0.049956,0.045292,0.10295,35.009435,430.102479
5,algorithma_gmappingon_5min.csv,AlgorithmA,ON,5,0.111942,0.098839,0.210905,35.565945,424.34217
8,algorithma_gmappingon_10min.csv,AlgorithmA,ON,10,0.217239,0.190038,0.389361,35.123617,420.364987
2,algorithmb_gmappingoff_2min.csv,AlgorithmB,OFF,2,0.148677,0.133288,0.313018,32.005446,402.789335
10,algorithmb_gmappingoff_5min.csv,AlgorithmB,OFF,5,0.332668,0.292338,0.6264,32.744006,395.336306
11,algorithmb_gmappingoff_10min.csv,AlgorithmB,OFF,10,0.648087,0.565424,1.175991,32.121463,390.364294
3,algorithmb_gmappingon_2min.csv,AlgorithmB,ON,2,0.083354,0.07587,0.158725,40.070506,463.065771


In [5]:
class MemoryStore:
    """Very simple in-notebook memory for past queries & results."""
    def __init__(self):
        self.history = []

    def add_entry(self, question: str, result: dict):
        self.history.append({"question": question, "result": result})

    def get_history(self):
        return self.history


class DataAnalysisAgent:
    def __init__(self, summary_df: pd.DataFrame):
        self.summary_df = summary_df

    def get_min_rmse(self):
        idx = self.summary_df["rmse_error"].idxmin()
        return self.summary_df.loc[idx].to_dict()

    def filter_by_gmapping(self, state: str):
        return self.summary_df[self.summary_df["gmapping"] == state].copy()

    def filter_by_duration(self, minutes: int):
        return self.summary_df[self.summary_df["duration_min"] == minutes].copy()


class ExplanationAgent:
    def explain_best_config(self, best_row: dict) -> str:
        return (
            f"The best configuration by RMSE is {best_row['algorithm']} with "
            f"Gmapping {best_row['gmapping']} for {best_row['duration_min']} minutes.\n"
            f"RMSE = {best_row['rmse_error']:.4f} m, "
            f"MAE = {best_row['mae_error']:.4f} m, "
            f"CPU ≈ {best_row['cpu_mean']:.1f}%, "
            f"Memory ≈ {best_row['memory_mean']:.1f} MB."
        )

    def explain_gmapping_effect(self, df_on: pd.DataFrame, df_off: pd.DataFrame, duration: int) -> str:
        mean_on = df_on["rmse_error"].mean()
        mean_off = df_off["rmse_error"].mean()
        diff = mean_off - mean_on
        direction = "lower" if diff > 0 else "higher"
        return (
            f"For {duration}-minute runs, Gmapping ON yields average RMSE of {mean_on:.4f} m, "
            f"while Gmapping OFF yields {mean_off:.4f} m. "
            f"That means Gmapping ON has {abs(diff):.4f} m {direction} error on average."
        )


class OrchestratorAgent:
    def __init__(self, summary_df: pd.DataFrame):
        self.memory = MemoryStore()
        self.analysis_agent = DataAnalysisAgent(summary_df)
        self.explainer = ExplanationAgent()

    def answer(self, question: str) -> str:
        question_lower = question.lower()

        if "minimum localization error" in question_lower or "lowest rmse" in question_lower:
            best = self.analysis_agent.get_min_rmse()
            text = self.explainer.explain_best_config(best)
            self.memory.add_entry(question, {"best_config": best})
            return text

        if "gmapping" in question_lower and "10 min" in question_lower or "10-minute" in question_lower:
            df_on = self.analysis_agent.filter_by_gmapping("ON")
            df_on = df_on[df_on["duration_min"] == 10]
            df_off = self.analysis_agent.filter_by_gmapping("OFF")
            df_off = df_off[df_off["duration_min"] == 10]
            text = self.explainer.explain_gmapping_effect(df_on, df_off, duration=10)
            self.memory.add_entry(question, {"gmapping_on": df_on.to_dict(), "gmapping_off": df_off.to_dict()})
            return text

        return "I understand your question, but this prototype currently supports only a small set of query types (best configuration, Gmapping vs non-Gmapping)."


In [6]:
agent = OrchestratorAgent(summary_df)

print(agent.answer("Which configuration has the minimum localization error?"))
print()
print(agent.answer("How does Gmapping affect localization for 10-minute runs?"))


The best configuration by RMSE is AlgorithmA with Gmapping ON for 2 minutes.
RMSE = 0.0500 m, MAE = 0.0453 m, CPU ≈ 35.0%, Memory ≈ 430.1 MB.

For 10-minute runs, Gmapping ON yields average RMSE of 0.2842 m, while Gmapping OFF yields 0.5383 m. That means Gmapping ON has 0.2542 m lower error on average.


## Multi-Agent Architecture

The analysis above is fully deterministic and reproducible. 
On top of this, we build a simple multi-agent layer that separates:

- **DataAnalysisAgent** — queries the summary table, filters configurations, finds best runs.  
- **ExplanationAgent** — turns numeric results into human-readable text.  
- **MemoryStore** — retains past questions and results to support iterative analysis.  
- **OrchestratorAgent** — interprets high-level questions and delegates to the other agents.

This pattern mirrors how a human researcher might work with an assistant: 
the assistant looks up numbers, compares experiments, and then explains the outcome, 
while the researcher keeps asking follow-up questions.



In [7]:
class MemoryStore:
    """Simple in-notebook memory for past queries and results."""
    def __init__(self):
        self.history = []

    def add_entry(self, question: str, result: dict):
        self.history.append({"question": question, "result": result})

    def get_history(self):
        return self.history


class DataAnalysisAgent:
    def __init__(self, summary_df: pd.DataFrame):
        self.summary_df = summary_df

    def get_min_rmse(self) -> dict:
        idx = self.summary_df["rmse_error"].idxmin()
        return self.summary_df.loc[idx].to_dict()

    def filter_by_gmapping_and_duration(self, gmapping_state: str, duration_min: int) -> pd.DataFrame:
        return self.summary_df[
            (self.summary_df["gmapping"] == gmapping_state) &
            (self.summary_df["duration_min"] == duration_min)
        ].copy()


class ExplanationAgent:
    def explain_best_config(self, best_row: dict) -> str:
        return (
            f"The best configuration by RMSE is {best_row['algorithm']} "
            f"with Gmapping {best_row['gmapping']} for {int(best_row['duration_min'])} minutes.\n"
            f"RMSE = {best_row['rmse_error']:.4f} m, "
            f"MAE = {best_row['mae_error']:.4f} m, "
            f"max error = {best_row['max_error']:.4f} m,\n"
            f"CPU ≈ {best_row['cpu_mean']:.1f}%, "
            f"Memory ≈ {best_row['memory_mean']:.1f} MB."
        )

    def explain_gmapping_effect(self, df_on: pd.DataFrame, df_off: pd.DataFrame, duration: int) -> str:
        if df_on.empty or df_off.empty:
            return f"No matching runs found for {duration}-minute duration."

        mean_on = df_on["rmse_error"].mean()
        mean_off = df_off["rmse_error"].mean()
        diff = mean_off - mean_on
        if diff > 0:
            direction = "lower"
        elif diff < 0:
            direction = "higher"
        else:
            direction = "the same"

        return (
            f"For {duration}-minute runs:\n"
            f"- Gmapping ON:  average RMSE = {mean_on:.4f} m\n"
            f"- Gmapping OFF: average RMSE = {mean_off:.4f} m\n\n"
            f"On average, Gmapping ON shows {abs(diff):.4f} m {direction} error compared to Gmapping OFF."
        )


class OrchestratorAgent:
    """
    Very simple natural-language router.
    In a production deployment, this could be powered by Gemini to interpret
    arbitrary questions and decide which tools/agents to call.
    """
    def __init__(self, summary_df: pd.DataFrame):
        self.memory = MemoryStore()
        self.analysis_agent = DataAnalysisAgent(summary_df)
        self.explainer = ExplanationAgent()

    def answer(self, question: str) -> str:
        q = question.lower()

        # Query 1: best / minimum localization error
        if "minimum localization error" in q or "lowest rmse" in q or "best configuration" in q:
            best = self.analysis_agent.get_min_rmse()
            text = self.explainer.explain_best_config(best)
            self.memory.add_entry(question, {"best_config": best})
            return text

        # Query 2: gmapping effect for 10-minute runs
        if "gmapping" in q and "10" in q:
            df_on = self.analysis_agent.filter_by_gmapping_and_duration("ON", 10)
            df_off = self.analysis_agent.filter_by_gmapping_and_duration("OFF", 10)
            text = self.explainer.explain_gmapping_effect(df_on, df_off, duration=10)
            self.memory.add_entry(question, {"gmapping_on": df_on.to_dict(), "gmapping_off": df_off.to_dict()})
            return text

        # Fallback
        return (
            "I understand your question, but this prototype currently supports only a small set of "
            "query types, such as:\n"
            "- 'Which configuration has the minimum localization error?'\n"
            "- 'How does Gmapping affect localization for 10-minute runs?'"
        )


In [8]:
agent = OrchestratorAgent(summary_df)

print("Q1: Which configuration has the minimum localization error?\n")
print(agent.answer("Which configuration has the minimum localization error?"))

print("\n" + "="*80 + "\n")

print("Q2: How does Gmapping affect localization for 10-minute runs?\n")
print(agent.answer("How does Gmapping affect localization for 10-minute runs?"))


Q1: Which configuration has the minimum localization error?

The best configuration by RMSE is AlgorithmA with Gmapping ON for 2 minutes.
RMSE = 0.0500 m, MAE = 0.0453 m, max error = 0.1030 m,
CPU ≈ 35.0%, Memory ≈ 430.1 MB.


Q2: How does Gmapping affect localization for 10-minute runs?

For 10-minute runs:
- Gmapping ON:  average RMSE = 0.2842 m
- Gmapping OFF: average RMSE = 0.5383 m

On average, Gmapping ON shows 0.2542 m lower error compared to Gmapping OFF.


## Demo: Conversational Analysis over Synthetic TB3 Experiments

Below, we query the OrchestratorAgent using natural language instructions:

1. *“Which configuration has the minimum localization error?”*  
2. *“How does Gmapping affect localization for 10-minute runs?”*

The Orchestrator delegates numeric work to the `DataAnalysisAgent`, 
which calculates localization metrics directly from the CSV logs. 
The `ExplanationAgent` translates the results into concise insights, 
while interaction history is stored in the `MemoryStore` to support follow-up queries.

The current prototype uses deterministic Python tools to compute metrics such as RMSE, MAE, 
and CPU/memory averages. These computations remain transparent and reproducible — a requirement in robotics research.

In a production deployment, the orchestration and conversational layers would be powered by 
Gemini Agents, enabling:
- multi-step reasoning over large experiment sets,
- richer natural language interaction,
- and persistent memory across research sessions.

This hybrid design ensures that scientific evaluation remains grounded in deterministic computation, 
while leveraging LLM agents for interpretation, question decomposition, and experiment guidance.
