In [34]:
# code_agent.py
import os
from typing import Union, Any

from smolagents.agents import ActionStep, CodeAgent, ToolCall, SystemPromptStep, TaskStep  # Import from correct place
from smolagents.utils import AgentParsingError, parse_code_blobs, AgentGenerationError, AgentExecutionError, truncate_content, LogLevel
from smolagents.local_python_executor import fix_final_answer_code
from collections import deque
from smolagents.models import ChatMessage
from rich import box
from rich.console import Group
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.text import Text
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union
import time

# Define a constant for the yellow hex color code (if you want to use it for logging)
YELLOW_HEX = "#d4b702"

class MyCodeAgent(CodeAgent):
    
    def run(
        self,
        task: str,
        stream: bool = False,
        reset: bool = True,
        single_step: bool = False,
        images: Optional[List[str]] = None,
        additional_args: Optional[Dict] = None,
    ):
        """
        Overridden so that we do NOT append the `additional_args` to the prompt.
        Everything else is the same logic as `MultiStepAgent.run`.
        """

        # 1. Store the user’s request in `self.task`
        self.task = task

        # 2. Update the internal state with additional_args BUT do *not* modify the prompt.
        if additional_args is not None:
            self.state.update(additional_args)
            # We skip the line: `self.task += f"...{str(additional_args)}"`

        # 3. Re-initialize the system prompt
        self.initialize_system_prompt()
        system_prompt_step = SystemPromptStep(system_prompt=self.system_prompt)

        # 4. Reset logs if requested
        if reset:
            self.logs = []
            self.logs.append(system_prompt_step)
            self.monitor.reset()
        else:
            if len(self.logs) > 0:
                self.logs[0] = system_prompt_step
            else:
                self.logs.append(system_prompt_step)

        # 5. Print a "New run" message
        self.logger.log(
            Panel(
                f"\n[bold]{self.task.strip()}\n",
                title="[bold]New run",
                subtitle=f"{type(self.model).__name__} - {(self.model.model_id if hasattr(self.model, 'model_id') else '')}",
                border_style=YELLOW_HEX,
                subtitle_align="left",
            ),
            level=LogLevel.INFO,
        )

        # 6. Log a new TaskStep
        self.logs.append(TaskStep(task=self.task, task_images=images))

        # 7. Handle single-step mode
        if single_step:
            step_start_time = time.time()
            step_log = ActionStep(start_time=step_start_time, observations_images=images)
            # run a single step, done
            result = self.step(step_log)
            step_log.end_time = time.time()
            step_log.duration = step_log.end_time - step_start_time
            self.logs.append(step_log)
            return result

        # 8. Handle streaming mode
        if stream:
            # returns a generator of steps
            return self._run(task=self.task, images=images)

        # 9. Otherwise, normal multi-step mode; collect all steps in a deque
        return deque(self._run(task=self.task, images=images), maxlen=1)[0]
    
    def step(self, log_entry: ActionStep) -> Union[None, Any]:
        """
        Perform one step in the ReAct framework: the agent thinks, acts, and observes the result.
        Returns None if the step is not final.
        """
        memory_messages = self.write_memory_to_messages()  # Use the correct method

        self.input_messages = memory_messages.copy()

        # Add new step in logs
        log_entry.model_input_messages = memory_messages.copy()
        try:
            additional_args = {"grammar": self.grammar} if self.grammar is not None else {}
            chat_message: ChatMessage = self.model(
                self.input_messages,
                stop_sequences=["<end_code>", "Observation:"],
                **additional_args,
            )
            log_entry.model_output_message = chat_message
            model_output = chat_message.content
            log_entry.model_output = model_output
        except Exception as e:
            raise AgentGenerationError(f"Error in generating model output:\n{e}", self.logger) from e

        self.logger.log(
            Group(
                Rule(
                    "[italic]Output message of the LLM:",
                    align="left",
                    style="orange",
                ),
                Syntax(
                    model_output,
                    lexer="markdown",
                    theme="github-dark",
                    word_wrap=True,
                ),
            ),
            level=1, # LogLevel.DEBUG,
        )

        # Parse
        try:
            code_action = fix_final_answer_code(parse_code_blobs(model_output))
        except Exception as e:
            error_msg = f"Error in code parsing:\n{e}\nMake sure to provide correct code blobs."
            raise AgentParsingError(error_msg, self.logger)

        # Modification logic:
        code_action = self.modify_code(code_action)  # Call your modification function

        log_entry.tool_calls = [
            ToolCall(
                name="python_interpreter",
                arguments=code_action,
                id=f"call_{len(self.logs)}",
            )
        ]

        # Execute
        self.logger.log(
            Panel(
                Syntax(
                    code_action,
                    lexer="python",
                    theme="monokai",
                    word_wrap=True,
                ),
                title="[bold]Executing this code:",
                title_align="left",
                box=box.HORIZONTALS,
            ),
            level=1, # LogLevel.INFO,
        )
        observation = ""
        is_final_answer = False
        try:
            output, execution_logs, is_final_answer = self.python_executor(
                code_action,
                self.state,
            )
            execution_outputs_console = []
            if len(execution_logs) > 0:
                execution_outputs_console += [
                    Text("Execution logs:", style="bold"),
                    Text(execution_logs),
                ]
            observation += "Execution logs:\n" + execution_logs
        except Exception as e:
            error_msg = str(e)
            if "Import of " in error_msg and " is not allowed" in error_msg:
                self.logger.log(
                    "[bold red]Warning to user: Code execution failed due to an unauthorized import - Consider passing said import under `additional_authorized_imports` when initializing your CodeAgent.",
                    level=1, # LogLevel.INFO,
                )
            raise AgentExecutionError(error_msg, self.logger)

        truncated_output = truncate_content(str(output))
        observation += "Last output from code snippet:\n" + truncated_output
        log_entry.observations = observation

        execution_outputs_console += [
            Text(
                f"{('Out - Final answer' if is_final_answer else 'Out')}: {truncated_output}",
                style=(f"bold {YELLOW_HEX}" if is_final_answer else ""),
            ),
        ]
        self.logger.log(Group(*execution_outputs_console), level=1, )# LogLevel.INFO)
        log_entry.action_output = output
        return output if is_final_answer else None

    def modify_code(self, code_action: str) -> str:
        """
        Modifies the generated code to replace specific patterns.

        Args:
            code_action (str): The original code generated by the model.

        Returns:
            str: The modified code with replacements applied.
        """
        # Replace fig.write_html calls to use a session-specific folder
        # (or any other modification you need)
        current_working_directory = os.getcwd()
        code_action = code_action.replace(
            'fig.write_html("',
            f'fig.write_html("{current_working_directory}/'
        )
        return code_action

In [35]:
import sys
import os

# Move up one level to access 'app' folder
sys.path.append(os.path.abspath(".."))

import pandas as pd
import plotly.express as px
import os
from string import Template
from smolagents import CodeAgent, HfApiModel, tool, OpenAIServerModel, LiteLLMModel
from app.backend.prompts_backup import SYSTEM_PROMPT


agent = MyCodeAgent(
    tools=[], # Tools are not explicitly needed as CodeAgent can use python_interpreter by default
    model=OpenAIServerModel(model_id="gemini-2.0-flash-exp", api_key="AIzaSyATdFldOK7Id7s3vjRAdNlOR5Z03Pdg9_Y" ,api_base="https://generativelanguage.googleapis.com/v1beta/openai/"),
    additional_authorized_imports=["pandas", "plotly", "numpy"],
    system_prompt= SYSTEM_PROMPT,
    verbosity_level=1
)

user_question = "How did interest expense impact the company’s pretax income in both years?" #Procede step by step and check your intermediate results. Also check wheather the graph you are going to save is readable." #Can show me which company was most successful in terms of revenue in 2023? 

In [36]:
agent.run(user_question, reset=False)

AttributeError: 'MyCodeAgent' object has no attribute 'write_memory_to_messages'

In [37]:
import os
from typing import Union, Any

from smolagents.agents import CodeAgent, ActionStep, ToolCall
from smolagents.models import ChatMessage
from smolagents.local_python_executor import fix_final_answer_code
from smolagents.utils import (
    parse_code_blobs,
    truncate_content,
    AgentExecutionError,
    AgentGenerationError,
    AgentParsingError
)


class MyCodeAgent(CodeAgent):
    """
    A custom CodeAgent that modifies code calls to save HTML plots
    inside a user-specified base folder, which is provided at runtime.
    """
    def run(
        self,
        task: str,
        stream: bool = False,
        reset: bool = True,
        single_step: bool = False,
        images: Optional[List[str]] = None,
        additional_args: Optional[Dict] = None,
    ):
        """
        Overridden so that we do NOT append the `additional_args` to the prompt.
        Everything else is the same logic as `MultiStepAgent.run`.
        """

        # 1. Store the user’s request in `self.task`
        self.task = task

        # 2. Update the internal state with additional_args BUT do *not* modify the prompt.
        if additional_args is not None:
            self.state.update(additional_args)
            # We skip the line: `self.task += f"...{str(additional_args)}"`

        # 3. Re-initialize the system prompt
        self.initialize_system_prompt()
        system_prompt_step = SystemPromptStep(system_prompt=self.system_prompt)

        # 4. Reset logs if requested
        if reset:
            self.logs = []
            self.logs.append(system_prompt_step)
            self.monitor.reset()
        else:
            if len(self.logs) > 0:
                self.logs[0] = system_prompt_step
            else:
                self.logs.append(system_prompt_step)

        # 5. Print a "New run" message
        self.logger.log(
            Panel(
                f"\n[bold]{self.task.strip()}\n",
                title="[bold]New run",
                subtitle=f"{type(self.model).__name__} - {(self.model.model_id if hasattr(self.model, 'model_id') else '')}",
                border_style=YELLOW_HEX,
                subtitle_align="left",
            ),
            level=LogLevel.INFO,
        )

        # 6. Log a new TaskStep
        self.logs.append(TaskStep(task=self.task, task_images=images))

        # 7. Handle single-step mode
        if single_step:
            step_start_time = time.time()
            step_log = ActionStep(start_time=step_start_time, observations_images=images)
            # run a single step, done
            result = self.step(step_log)
            step_log.end_time = time.time()
            step_log.duration = step_log.end_time - step_start_time
            self.logs.append(step_log)
            return result

        # 8. Handle streaming mode
        if stream:
            # returns a generator of steps
            return self._run(task=self.task, images=images)

        # 9. Otherwise, normal multi-step mode; collect all steps in a deque
        return deque(self._run(task=self.task, images=images), maxlen=1)[0]

    def modify_code(self, code_action: str, base_folder: str) -> str:
        """
        Rewrites occurrences of fig.write_html("some_path.html")
        to fig.write_html("{base_folder}/some_path.html").
        You can extend this logic to match your own pattern(s).
        """
        return code_action.replace(
            'fig.write_html("',
            f'fig.write_html("{base_folder}/'
        )

    def step(self, log_entry: ActionStep) -> Union[None, Any]:
        """
        Similar to the original CodeAgent.step, but with an added line:
            code_action = self.modify_code(code_action, base_folder)
        right after parsing the code blobs from the LLM output.
        """
        # 1. Retrieve memory from previous steps
        agent_memory = self.write_inner_memory_from_logs()
        self.input_messages = agent_memory.copy()
        log_entry.agent_memory = agent_memory.copy()

        # 2. Generate LLM output
        try:
            additional_args = {"grammar": self.grammar} if self.grammar is not None else {}
            llm_output = self.model(
                self.input_messages,
                stop_sequences=["<end_code>", "Observation:"],
                **additional_args
            ).content
            log_entry.llm_output = llm_output
        except Exception as e:
            raise AgentGenerationError(f"Error in generating model output:\n{e}", self.logger) from e

        # 3. Parse code from the LLM output
        try:
            code_action = fix_final_answer_code(parse_code_blobs(llm_output))
        except Exception as e:
            error_msg = f"Error in code parsing:\n{e}\nMake sure to provide correct code blobs."
            raise AgentParsingError(error_msg, self.logger) from e

        # 4. Retrieve the base_folder from self.state (default to '.')
        #    so each run can pass it via run(..., additional_args={"base_folder": ...}).
        base_folder = self.state.get("base_folder", ".")

        # 5. Rewrite code to place HTML in the user-specific folder
        code_action = self.modify_code(code_action, base_folder)

        log_entry.tool_calls = [
            ToolCall(
                name="python_interpreter",
                arguments=code_action,
                id=f"call_{len(self.logs)}",
            )
        ]

        # 6. Execute the code
        observation = ""
        is_final_answer = False
        try:
            output, execution_logs, is_final_answer = self.python_executor(
                code_action,
                self.state
            )
            observation += "Execution logs:\n" + execution_logs
        except Exception as e:
            error_msg = str(e)
            if "Import of " in error_msg and " is not allowed" in error_msg:
                self.logger.log(
                    "[bold red]Warning: Code execution failed due to an unauthorized import.\n"
                    "Consider passing said import under `additional_authorized_imports`.\n",
                    level=1
                )
            raise AgentExecutionError(error_msg, self.logger)

        truncated_output = truncate_content(str(output))
        observation += "Last output from code snippet:\n" + truncated_output
        log_entry.observations = observation

        log_entry.action_output = output
        return output if is_final_answer else None


In [None]:
from smolagents.models import OpenAIServerModel


agent = MyCodeAgent(
    tools=[], # Tools are not explicitly needed as CodeAgent can use python_interpreter by default
    model=OpenAIServerModel(model_id="gemini-2.0-flash-exp", api_key="AIzaSyATdFldOK7Id7s3vjRAdNlOR5Z03Pdg9_Y" ,api_base="https://generativelanguage.googleapis.com/v1beta/openai/"),
    additional_authorized_imports=["pandas", "plotly", "numpy"],
    system_prompt= SYSTEM_PROMPT,
    verbosity_level=1
)

In [40]:


# Run 1: store HTML in /tmp/sessionA
agent.run(
    "Plot a random histogram of 1000 points and save it to an HTML file.",
    additional_args={"base_folder": "test_output"},
)




ValueError: Provided prompt template does not contain the managed agents descriptions placeholder '{{managed_agents_descriptions}}'

In [32]:
print(agent.system_prompt)

You are a financial analyst, who analyzes financial data using code blobs to answer the questions of users. Your job is to answer user questions and create graphs. If possible generate plotly graphs and write them as html file. Never show the html graph - you are unable to display them in the environment in which you execute code. 

The dataset that you are provided with is balance_data_2020_2023.csv containing financial fundamentals of US companies from 2020 to 2023. All columns in the dataset are fully populated with no missing values (non-NaN).
All the information you need in regards to the dataset, including all columns follows:

**Dataset Columns**:
1. **Ticker** (`string`): The stock ticker symbol representing each company.
2. **Fiscal Year** (`integer`): The fiscal year for which the data is reported.
3. **Report Date** (`datetime`): The date when the fiscal year report was filed.
4. **Revenue** (`float`): Total revenue generated by the company in the fiscal year.
5. **Cost of R

In [39]:
agent.system_prompt_template = "Hello how are you doing"

In [33]:
agent.logs

[SystemPromptStep(system_prompt='You are a financial analyst, who analyzes financial data using code blobs to answer the questions of users. Your job is to answer user questions and create graphs. If possible generate plotly graphs and write them as html file. Never show the html graph - you are unable to display them in the environment in which you execute code. \n\nThe dataset that you are provided with is balance_data_2020_2023.csv containing financial fundamentals of US companies from 2020 to 2023. All columns in the dataset are fully populated with no missing values (non-NaN).\nAll the information you need in regards to the dataset, including all columns follows:\n\n**Dataset Columns**:\n1. **Ticker** (`string`): The stock ticker symbol representing each company.\n2. **Fiscal Year** (`integer`): The fiscal year for which the data is reported.\n3. **Report Date** (`datetime`): The date when the fiscal year report was filed.\n4. **Revenue** (`float`): Total revenue generated by the 

In [None]:
# Run 2: store HTML in /tmp/sessionB
agent.run(
    "Plot a bar chart of the given data and export to HTML.",
    additional_args={"base_folder": "/tmp/sessionB"}
)