# Create and test your own forge

In [None]:
import asyncio
import sys
from datetime import UTC, datetime
from pathlib import Path
import random
from typing import Literal
from pydantic import BaseModel
import nest_asyncio
nest_asyncio.apply()

# loading dotenv
from dotenv import load_dotenv
load_dotenv()

from ebiose.core.agent import Agent
from ebiose.core.agent_forge import AgentForge
from ebiose.core.evo_forging_cycle import EvoForgingCycleConfig

from loguru import logger
logger.remove()
logger.add(sys.stderr, level="DEBUG")

### 1. Describe the problem you wish to solve with an Ebiose agent

In [None]:
PROBLEM_DESCRIPTION = """Given my CRM API connection, 
run a mailing campaign to all my customers who have not purchased anything 
in the last 6 months. The email should be personalized and include a discount 
code for their next purchase. The email should also include a link to my website 
and a call to action to encourage them to make a purchase."""

### 2. Define the I/O model of your agent

In [None]:
class AgentInput(BaseModel):
    pass

class AgentOutput(BaseModel):
    pass

### 3. Create the forge class

In [None]:
class YourForge(AgentForge):
    name: str = "YourForge"
    description: str = PROBLEM_DESCRIPTION

    agent_input_model: type[BaseModel] = AgentInput
    agent_output_model: type[BaseModel] = AgentOutput
    default_generated_agent_engine_type: str = "langgraph_engine"

    async def compute_fitness(self, agent: Agent, compute_token_id: str, **kwargs: dict[str, any]) -> float:  # noqa: C901
        return random.random()

### 4. Setup a forge cycle

In [None]:
# run parameters
BUDGET = 0.5 # budget in dollars
N_AGENTS = 2 # number of agents by generation

# the path where results will be saved
timestamp = datetime.now(tz=UTC).strftime("%Y-%m-%d_%H-%M-%S")
SAVE_PATH = Path(f"./data/{timestamp}/")
if not SAVE_PATH.exists():
    SAVE_PATH.mkdir(parents=True)

# node types
NODE_TYPES = [
    "StartNode", "EndNode", "LLMNode", 
]
MOCK_NODE_TYPES = [
    "PythonNode", "DatabaseNode", "APINode", "WebScraperNode",
    "FileNode", "UserQueryNode"
] # these nodes are not implemented yet

cycle_config = EvoForgingCycleConfig(
    budget=BUDGET,
    n_agents_in_population=N_AGENTS,
    n_selected_agents_from_ecosystem=0,
    n_best_agents_to_return=2,
    replacement_ratio=0.5,
    save_path=SAVE_PATH,
    node_types= NODE_TYPES + MOCK_NODE_TYPES,

)

### 5. Run a forge cycle

In [None]:
# instantiate the forge
forge = YourForge()

best_agents, best_finess = asyncio.run(
    forge.run_new_cycle(
        config=cycle_config
    ),
)

### 6. Display final results

In [None]:
forge.display_results(best_agents, best_finess)