<a href="https://colab.research.google.com/github/atrotsyuk/MultiAgentFramework/blob/main/TwoAgentsConversation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install edsl

In [2]:
# from edsl import Question
# Question.available()

In [3]:
from google.colab import userdata
import os
os.environ['EXPECTED_PARROT_API_KEY'] = userdata.get('EXPECTED_PARROT_API_KEY')
# os.environ['DEEP_INFRA_API_KEY'] = userdata.get('DEEP_INFRA_API_KEY')

In [18]:
import textwrap

create_agent_text = textwrap.dedent(
        """\
Create an agent persona for {{agent_name}} given agent description as {{agent_description}}.\
The output must be in a form of python dict:
{
    "name": agent_name,
    "traits": {
        "persona": "...",
        "leadership": "...",
        "negotiation": "...",
        "military_policy": "...",
        "economic_strategy": "...",
        "communication_style": "...",
        "goal": Optional,
        "past_achievements": "..."
    }
}

Examples:
{{example_traits}}
"""
)

In [20]:
import textwrap

next_statement_template = textwrap.dedent(
            """\
            You are {{ agent_name }} engaged in a high level conversation with {{other_agent_names}} on Topic: "{{topic}}".

            {% if agent_transcript is not none %}
            Here are your *real* prior statements from transcripts:
            ----------
            {{ agent_transcript }}
            ----------
            {% endif %}

            This is the conversation so far:
            {{ conversation }}

            {% if round_message is not none %}
            Round Info: {{ round_message }}
            {% endif %}

            Only YOU should speak now - continue the conversation from your side **only**. Do NOT simulate what the other person might say.

            Based on your real world positions and past statements, respond as you would in real life.
            What do you say next to move the discussion forward toward a resolution?
            Your output must be in a form of python dict:
            {
                "Statement": "...",
                "grounding_sources": [
                    {
                        "title": "...",
                        "type": "speech/article/News/Book/Quote",
                        "url": "...",
                        "quote": "...",
                        "explanation": "Why it's relevant"
                    }
                ]
            }
            Grounding sources can include quotes, phrases from given transcripts as well.
            Also the output should contain *ONLY* above format without any explanations.
            """
        )

In [21]:
per_round_message_template = """
        Round {{current_turn}} of {{max_turns}}.
        {% if current_turn == 1 %}
        Lay out your key concerns and priorities. Set the tone for negotiation.
        {% elif current_turn == max_turns %}
        This is the final round. Make or accept a concrete proposal to finalize the agreement.
        {% elif current_turn >= max_turns - 2 %}
        Time is short. Converge toward actionable solutions or compromises.
        {% else %}
        Build on what has been said. Move the negotiation forward constructively.
        {% endif %}
        """

In [22]:
import textwrap

start_statement_template = textwrap.dedent(
            """\
            You are {{agent_name}}, engaging in a negotiation with {{other_agent_names}} on the topic: "{{topic}}".
            Your goal is to propose a clear starting position, outline your priorities, and invite a collaborative resolution.
            Avoid being combative; aim for mutual understanding an a potential deal.
            Your output must be in a form of python dict:
            {
                "Statement": "...",
                "grounding_sources": [
                    {
                        "title": "...",
                        "type": "speech/article/News/Book/Quote",
                        "url": "...",
                        "quote": "...",
                        "explanation": "Why it's relevant"
                    }
                ]
            }
            Also the output should contain *ONLY* above format without any explanations.
            """
        )

In [23]:
example_traits = """
{
    "name" = "Vladimir Putin",
    "traits"={
        "persona": "President of Russia, former intelligence officer",
        "leadership": "Authoritarian, strategic, and pragmatic",
        "negotiation": "Hardline, calculated and tactical",
        "military_policy": "Strong focus on military dominance and security",
        "economic_strategy": "Energy leverage, sanction resilience, and state-controlled economy",
        "communication_style": "Cold, calculated and propagandist",
        "goal": "Minimise impact of western sanctions and improve diplomatic and economic ties with China to heral a new world order"
    }
},
{
    "name"="Condoleezza Rice",
    "traits" = {
        "persona": "Former U.S. Secretary of State and National Security Advisor; Stanford professor and political scientist",
        "leadership": "Principled, composed under pressure, institutionally grounded",
        "economic_strategy": "Market-oriented with support for global liberal trade frameworks",
        "global_diplomacy": "Proponent of rules-based order, U.S. global leadership, multilateral engagement",
        "military_policy": "Believes in strong defense posture backed by diplomatic strategy; post-9/11 security coordination",
        "communication_style": "Disciplined, analytical, persuasive without inflammatory rhetoric",
        "goal": "Promote U.S. leadership through diplomacy, democratic values, and strategic alliances"
    }
},
{
    "name" = "Xi Jinping",
    "traits"={
        "persona": "President of China, chairman of communist party",
        "leadership": "Authoritarian, long-term strategist, nationalist",
        "economic_strategy": "State-controlled economy, tech-driven growth, BRI expansion",
        "global_diplomacy": "Soft power strategist, U.S. rival, Russia-China alliance builder",
        "military_policy": "Modernizes PLA, cyber warfare focus, South China sea expansion",
        "communication_style": "Diplomatic, calculated and propagandist",
        # "goal": "Minimize impacts of US tariff war with China by seeking new economic and diplomatic avenues in Russia"
    }
}
"""

In [53]:
import re
from edsl import Model, QuestionFreeText, Scenario
# from backend.source.prompts.find_agent_traits import create_agent_text
# from backend.source.examples.traits import example_traits

class EdslAgent:
    def __init__(self, model_name="google/gemma-2-9b-it", service_name=None, examples=None):
        self.model = Model(model_name=model_name, service_name=service_name) if service_name else Model(model_name=model_name)
        self.create_agent_text = create_agent_text
        self.examples = examples or example_traits

    def find(self, agent_name: str, agent_description: str) -> str:
        question = QuestionFreeText(
            question_text=self.create_agent_text,
            question_name="create_agent_q"
        )

        scenario = Scenario({
            "agent_name": agent_name,
            "agent_description": agent_description,
            "example_trais": self.examples
        })

        response = question.by(scenario).by(self.model).run()
        traits_code = response[0]["answer"]["create_agent_q"]
        try:
          cleaned = re.sub(r"^```python|```$", "", traits_code.strip()).strip()
        except Exception as e:
          print(f"Fetching agent {agent_name} failed")
          cleaned = str({"name": agent_name, "traits": {"persona": agent_description}})
        finally:
          return cleaned

        return cleaned

edsl_find_agent = EdslAgent()


In [54]:
from collections import UserList
import asyncio
import inspect
from typing import Optional, Callable, TYPE_CHECKING
from edsl import QuestionFreeText, Results, AgentList, ScenarioList, Scenario, Model
from edsl.questions import QuestionBase
from edsl.results.result import Result
from jinja2 import Template
from edsl.caching import Cache

if TYPE_CHECKING:
    from edsl.language_models.model import Model

from edsl.conversation.next_speaker_utilities import (
    default_turn_taking_generator,
    speaker_closure,
)


class AgentStatement:
    def __init__(self, statement: Result):
        self.statement = statement

    @property
    def agent_name(self):
        return self.statement["agent"]["name"]

    def to_dict(self):
        return self.statement.to_dict()

    @classmethod
    def from_dict(cls, data):
        return cls(Result.from_dict(data))

    @property
    def text(self):
        return self.statement["answer"]["dialogue"]


class AgentStatements(UserList):
    def __init__(self, data=None):
        super().__init__(data)

    @property
    def transcript(self):
        return [{s.agent_name: s.text} for s in self.data]

    def to_dict(self):
        return [d.to_dict() for d in self.data]

    @classmethod
    def from_dict(cls, data):
        return cls([AgentStatement.from_dict(d) for d in data])


class Conversation:
    """A conversation between a list of agents. The first agent in the list is the first speaker.
    After that, order is determined by the next_speaker function.
    The question asked to each agent is determined by the next_statement_question.

    If the user has passed in a "per_round_message_template", this will be displayed at the beginning of each round.
    {{ round_message }} must be in the question_text.
    """

    def __init__(
        self,
        agent_list: AgentList,
        max_turns: int = 20,
        stopping_function: Optional[Callable] = None,
        start_statement_question: Optional[QuestionBase] = None,
        next_statement_question: Optional[QuestionBase] = None,
        next_speaker_generator: Optional[Callable] = None,
        verbose: bool = False,
        per_round_message_template: Optional[str] = None,
        conversation_index: Optional[int] = None,
        cache=None,
        disable_remote_inference=False,
        default_model: Optional[Model] = None,
        topic: Optional[str] = None,
        transcript: Optional[str] = None
    ):
        self.disable_remote_inference = disable_remote_inference
        self.per_round_message_template = per_round_message_template

        if cache is None:
            self.cache = Cache()
        else:
            self.cache = cache

        self.agent_list = agent_list

        from edsl import Model

        for agent in self.agent_list:
            if not hasattr(agent, "model"):
                if default_model is not None:
                    agent.model = default_model
                else:
                    agent.model = Model()

        self.verbose = verbose
        self.agent_statements = []
        self._conversation_index = conversation_index
        self.agent_statements = AgentStatements()

        self.max_turns = max_turns
        self.topic = topic
        self.agent_transcript = transcript

        self.start_statement_question = start_statement_question


        self.next_statement_question = next_statement_question
        if (
            per_round_message_template
            and "{{ round_message }}" not in next_statement_question.question_text
        ):
            from edsl.conversation.exceptions import ConversationValueError

            raise ConversationValueError(
                "If you pass in a per_round_message_template, you must include {{ round_message }} in the question_text."
            )

        # Determine how the next speaker is chosen
        if next_speaker_generator is None:
            func = default_turn_taking_generator
        else:
            func = next_speaker_generator

        # Choose the next speaker
        self.next_speaker = speaker_closure(
            agent_list=self.agent_list, generator_function=func
        )

        # Determine when the conversation ends
        if stopping_function is None:
            self.stopping_function = lambda agent_statements: False
        else:
            self.stopping_function = stopping_function

    async def continue_conversation(self, **kwargs) -> bool:
        if len(self.agent_statements) >= self.max_turns:
            return False

        if inspect.iscoroutinefunction(self.stopping_function):
            should_stop = await self.stopping_function(self.agent_statements, **kwargs)
        else:
            should_stop = self.stopping_function(self.agent_statements, **kwargs)

        return not should_stop

    def add_index(self, index) -> None:
        self._conversation_index = index

    @property
    def conversation_index(self):
        return self._conversation_index

    def to_dict(self):
        return {
            "agent_list": self.agent_list.to_dict(),
            "max_turns": self.max_turns,
            "verbose": self.verbose,
            "agent_statements": [d.to_dict() for d in self.agent_statements],
            "conversation_index": self.conversation_index,
        }

    @classmethod
    def from_dict(cls, data):
        agent_list = AgentList.from_dict(data["agent_list"])
        max_turns = data["max_turns"]
        verbose = data["verbose"]
        agent_statements = (AgentStatements.from_dict(data["agent_statements"]),)
        conversation_index = data["conversation_index"]
        return cls(
            agent_list=agent_list,
            max_turns=max_turns,
            verbose=verbose,
            results_data=agent_statements,
            conversation_index=conversation_index,
        )

    def to_results(self):
        return Results(data=[s.statement for s in self.agent_statements])

    def summarize(self):
        d = {
            "num_agents": len(self.agent_list),
            "max_turns": self.max_turns,
            "conversation_index": self.conversation_index,
            "transcript": self.to_results().select("agent_name", "dialogue").to_list(),
            "number_of_agent_statements": len(self.agent_statements),
        }
        return Scenario(d)

    async def get_start_statement(self, *, speaker, others, topic) -> "Result":
        """Get the first statement from the speaker related to the topic"""
        q = self.start_statement_question
        from edsl import Scenario
        s = Scenario(
            {
                "agent_name": speaker.name,
                "other_agent_names": others,
                "topic": topic,
                "conversation_index": self.conversation_index,
                "index": 0,
                "round_message": None,
            }
        )
        jobs = q.by(s).by(speaker).by(speaker.model)
        jobs.show_prompts()
        results = await jobs.run_async(
            cache = self.cache, disable_remote_inference=self.disable_remote_inference
        )
        return results[0]

    async def get_next_statement(self, *, index, speaker, topic, conversation, other_agent_names) -> "Result":
        """Get the next statement from the speaker."""
        q = self.next_statement_question
        # assert q.parameters == {"agent_name", "conversation"}, q.parameters
        from edsl import Scenario

        if self.per_round_message_template is None:
            round_message = None
        else:
            round_message = Template(self.per_round_message_template).render(
                {"max_turns": self.max_turns, "current_turn": index}
            )

        s = Scenario(
            {
                "agent_name": speaker.name,
                "topic": topic,
                "conversation": conversation,
                "conversation_index": self.conversation_index,
                "index": index,
                "other_agent_names": other_agent_names,
                "round_message": round_message,
                "agent_transcript": self.agent_transcript
            }
        )
        jobs = q.by(s).by(speaker).by(speaker.model)
        jobs.show_prompts()
        results = await jobs.run_async(
            cache=self.cache, disable_remote_inference=self.disable_remote_inference
        )
        return results[0]

    def converse(self):
        return asyncio.run(self._converse())

    async def _converse(self):
        i = 0
        while await self.continue_conversation():
            speaker = self.next_speaker()
            print(f"Agent {speaker.name} speaking at index: {i}\n")

            if (i==0):
                next_statement = AgentStatement(
                    statement=await self.get_start_statement(
                        speaker=speaker,
                        others=[a.name for a in self.agent_list if a != speaker][0],
                        topic = self.topic,
                    )
                )
            else:
                next_statement = AgentStatement(
                    statement=await self.get_next_statement(
                        index=i,
                        speaker=speaker,
                        conversation="\n".join([f"{k}:{v}" for d in self.agent_statements.transcript for k,v in d.items()]),
                        topic=self.topic,
                        other_agent_names=[a.name for a in self.agent_list if a != speaker][0]
                    )
                )
            self.agent_statements.append(next_statement)
            if self.verbose:
                print(f"'{speaker.name}':{next_statement.text}")
                print("\n")
            i += 1


class ConversationList:
    """A collection of conversations to be run in parallel."""

    def __init__(self, conversations: list[Conversation], cache=None):
        self.conversations = conversations
        for i, conversation in enumerate(self.conversations):
            conversation.add_index(i)

        if cache is None:
            self.cache = Cache()
        else:
            self.cache = cache

        for c in self.conversations:
            c.cache = self.cache

    async def run_conversations(self):
        await asyncio.gather(*[c._converse() for c in self.conversations])

    def run(self) -> None:
        """Run all conversations in parallel"""
        asyncio.run(self.run_conversations())

    def to_dict(self) -> dict:
        return {"conversations": c.to_dict() for c in self.conversations}

    @classmethod
    def from_dict(cls, data):
        conversations = [Conversation.from_dict(d) for d in data["conversations"]]
        return cls(conversations)

    def to_results(self) -> Results:
        """Return the results of all conversations as a single Results"""
        first_convo = self.conversations[0]
        results = first_convo.to_results()
        for conv in self.conversations[1:]:
            results += conv.to_results()
        return results

    def summarize(self) -> ScenarioList:
        return ScenarioList([c.summarize() for c in self.conversations])

In [55]:
import ast
from edsl import Agent, AgentList
# from backend.source.edsl_client.edsl_find_agent_traits import edsl_find_agent

class AgentManager:
    def __init__(self):
        pass

    def _create_agent_persona(self, agent_traits: dict) -> Agent:
        return Agent(
            name=agent_traits['name'],
            traits=agent_traits['traits']
        )

    def build_agents(self, agent_1: dict, agent_2: dict) -> AgentList:
        """
        Accepts two agent descriptors as dicts:
        {
            'agent_name': <str>,
            'agent_description': <str>
        }
        Returns an AgentList of EDSL Agent objects with extracted traits.
        """
        agent_1_traits_str = edsl_find_agent.find(agent_1['agent_name'], agent_1['agent_description'])
        agent_2_traits_str = edsl_find_agent.find(agent_2['agent_name'], agent_2['agent_description'])

        agent_1_traits = ast.literal_eval(agent_1_traits_str)
        agent_2_traits = ast.literal_eval(agent_2_traits_str)

        agent_obj_1 = self._create_agent_persona(agent_1_traits)
        agent_obj_2 = self._create_agent_persona(agent_2_traits)

        return AgentList([agent_obj_1, agent_obj_2])

agent_manager = AgentManager()


In [56]:
from collections import defaultdict
import re

def group_transcripts_by_speaker(raw_text: str) -> dict:
    speaker_map = defaultdict(list)
    pattern = re.compile(r"(?P<speaker>[\w\s]+): (?P<line>.+)")

    for line in raw_text.splitlines():
        match = pattern.match(line.strip())
        if match:
            speaker = match.group("speaker").strip()
            text = match.group("line").strip()
            speaker_map[speaker].append(text)

    return {k: "\n".join(v) for k, v in speaker_map.items()}


import os
TRANSCRIPT_FOLDER = os.path.join("backend", "static", "transcripts")
os.makedirs(TRANSCRIPT_FOLDER, exist_ok=True)
def load_all_transcripts():
    transcripts = {}
    for filename in os.listdir(TRANSCRIPT_FOLDER):
        if filename.endswith(".txt"):  # or .vtt if using VTT
            path = os.path.join(TRANSCRIPT_FOLDER, filename)
            with open(path, "r", encoding="utf-8") as f:
                transcripts[filename] = f.read()
    return transcripts

In [57]:
from typing import List
from edsl import Model, QuestionFreeText, Agent

# from backend.source.edsl_client.edsl_conversation import Conversation
# from backend.source.tools.agent_manager import agent_manager
# from backend.source.prompts import start_statement, next_statement_template, per_round_message_template

class ConversationManager:
    def __init__(self, max_turns: int, agent_params: List[dict], topic: str, model_name="google/gemma-2-9b-it", service_name="deep_infra"):
        self.model = Model(model_name=model_name, service_name=service_name)

        self.turns = max_turns
        self.agent_params = agent_params
        self.topic = topic

        self.start_statement_template = start_statement_template

        self.next_statement_template = next_statement_template

        self.per_round_message_template = per_round_message_template
        self.agent_transcript = load_all_transcripts()


    def start_conversation(
        self,
        verbose: bool = True
    ):

        print("-----Creating Agent Personas------\n")
        agents = agent_manager.build_agents(
            agent_1=self.agent_params[0],
            agent_2=self.agent_params[1]
        )
        print("-----Agent Personas Created\n")

        start_question = QuestionFreeText(
            question_text=self.start_statement_template,
            question_name="dialogue",
        )

        next_question = QuestionFreeText(
            question_text=self.next_statement_template,
            question_name="dialogue",
        )

        conversation = Conversation(
            agent_list=agents,
            start_statement_question=start_question,
            next_statement_question=next_question,
            max_turns=self.turns,
            verbose=verbose,
            default_model=self.model,
            topic=self.topic,
            per_round_message_template=self.per_round_message_template,
            transcript = self.agent_transcript
        )

        conversation.converse()
        return conversation  # Optional: return to access results after

# conversation_manager = ConversationManager()

In [58]:
import math
import re
import ast
import os
import pandas as pd
from edsl import Results
# from backend.source.tools.conversation_manager import ConversationManager

class ConversationResultExtractor:
    def __init__(self, agent_params, topic, max_turns=3, results_per_page=10):
        self.agent_params = agent_params
        self.topic = topic
        self.max_turns = max_turns
        self.results_per_page = results_per_page
        self.res_arr = []
        self.conversation = None
        self.df = None

    def run_conversation(self):
        self.conversation_manager = ConversationManager(
            max_turns=self.max_turns,
            agent_params=self.agent_params,
            topic=self.topic
        )
        self.conversation = self.conversation_manager.start_conversation()

    def fetch_and_parse_results(self):
        total_pages = math.ceil(self.max_turns / self.results_per_page)
        results_on_last_page = (
            self.max_turns % self.results_per_page
            if self.max_turns % self.results_per_page != 0
            else self.results_per_page
        )

        for i in range(1, total_pages + 1):
            print(f"\nFetching page: {i}")
            page_size = results_on_last_page if i == total_pages else self.results_per_page
            results = Results.list(page=i, page_size=page_size, sort_ascending=False).fetch()

            for item in results:
                try:
                    iteration = item[0]['scenario']['index']
                    agent_name = item[0]['agent']['name']
                    response_raw = item[0]['answer']['dialogue']
                    cleaned_resp = re.sub(r"^```python|```$", "", response_raw.strip()).strip()
                    parsed_resp = ast.literal_eval(cleaned_resp)

                    self.res_arr.append({
                        'iteration': iteration,
                        'agent': agent_name,
                        'answer': parsed_resp['Statement'],
                        'sources': parsed_resp['grounding_sources']
                    })
                except Exception as e:
                    print(f"Failed to parse response: {e}")
                    iteration = item[0]['scenario']['index']
                    agent_name = item[0]['agent']['name']
                    response_raw = item[0]['answer']['dialogue']
                    parsed_resp = response_raw[10:-3]
                    self.res_arr.append({
                        'iteration': iteration,
                        'agent': agent_name,
                        'answer': parsed_resp,
                        'sources': ""
                    })

        self.df = pd.DataFrame(self.res_arr)
        self.df.sort_values(by="iteration", inplace=True)

    def save_to_csv(self, output_path=None):
        if self.df is None:
            raise ValueError("No results found. Run fetch_and_parse_results() first.")

        agent_1_name = self.agent_params[0]['agent_name']
        agent_2_name = self.agent_params[1]['agent_name']
        turns = self.max_turns

        base_directory_path = os.path.join("backend", "static")

        if not os.path.exists(base_directory_path):
            os.makedirs(base_directory_path, exist_ok = True)
        # save_to_folder = os.path.join("app", "backend", "static")

        if output_path is None:
            output_path = f"output-{agent_1_name}_{agent_2_name}_{turns}.csv"
        filename = os.path.join(base_directory_path, output_path)

        # filename = output_path or f'{save_to_folder}/output-{agent_1_name}_{agent_2_name}_{turns}.csv'
        self.df.to_csv(filename, index=False)
        print(f"\n✅ Results saved to {filename}")

    def get_results_df(self):
        return self.df.to_dict(orient="records")


In [59]:
from pydantic import BaseModel
from typing import List

# from fastapi import APIRouter, HTTPException, Request, Query
import os
# from fastapi.responses import FileResponse

# from app.backend.source.tools.conversation_results_manager import ConversationResultExtractor
# from backend.source.tools.conversation_results_manager import ConversationResultExtractor


# from source import get_results
# from source.agent_conversation import

# client = MongoClient("mongodb://localhost:27017/")
# db = client["mydb"]
# collection = db["items"]


class AgentParam(BaseModel):
    agent_name: str
    agent_description: str

class ConversationRequest(BaseModel):
    turns: str
    topic: str
    agent_params: List[AgentParam]



# router = APIRouter()
# # Serve index.html on root path
# @router.get("/")
# async def serve_index():
#     index_path = os.path.join("frontend", "dist", "index.html")
#     return FileResponse(index_path)

# @router.post("/run")
def run(request: ConversationRequest):
    try:
        agent_params = [
            {
                "agent_name": agent.agent_name,
                "agent_description": agent.agent_description
            }
            for agent in request.agent_params
        ]

        extractor = ConversationResultExtractor(
            agent_params=agent_params,
            topic=request.topic,
            max_turns=int(request.turns)
        )

        extractor.run_conversation()
        extractor.fetch_and_parse_results()
        extractor.save_to_csv()

        results_dict = extractor.get_results_df()

        agent1 = agent_params[0]['agent_name']
        agent2 = agent_params[1]['agent_name']
        filename = f"output-{agent1}_{agent2}_{request.turns}.csv"

        return {
            "message": f"Conversation complete. Saved to {filename}",
            "file": filename,
            "conversation": results_dict
        }


    except Exception as e:
      return {
          "message": f"Conversation failed: {str(e)}"
      }
        # raise HTTPException(status_code=500, detail=f"Conversation failed: {str(e)}")

# @router.get("/download")
# async def download_csv(filename: str = Query(...)):
#     file_path = os.path.join("backend", "static", filename)

#     if not os.path.exists(file_path):
#         raise HTTPException(status_code=404, detail="File not found")

#     return FileResponse(path=file_path, filename=filename, media_type='text/csv')

In [62]:
import ipywidgets as widgets
from IPython.display import display

# def start_agent(upload_widget)


agent_1_name = input("Name of First Agent(This Agent Starts the conversation):\n")
agent_1_description = input("Description of First Agent:\n")

agent_2_name = input("Name of Second Agent in the conversation:\n")
agent_2_description = input("Description of Second Agent:\n")

turns = input("How many turns should the conversation have?\n")
topic = input("What is the topic of the conversation?\n")

#*************WARNING******************
# Please uncomment below lines to Enable upload files
# upload_widget = widgets.FileUpload(
#     accept='.txt, .vtt',
#     multiple=True
# )
# display(upload_widget)


Name of First Agent(This Agent Starts the conversation):
asda
Description of First Agent:
asda
Name of Second Agent in the conversation:
asda
Description of Second Agent:
asda
How many turns should the conversation have?
3
What is the topic of the conversation?
adas


In [61]:

if not upload_widget.value:
  print("No file uploaded")
else:
  upload_dir = os.path.join("backend", "static", "transcripts")
  os.makedirs(upload_dir, exist_ok=True)
  print(type(upload_widget.value))
  for key, val in upload_widget.value.items():
    # uploaded_file = upload_widget.value[0]
    filename = key
    content = val['content']
    with open(os.path.join(upload_dir, filename), 'wb') as f:
      f.write(content)



agent_params = []
agent_params.append(AgentParam(agent_name=agent_1_name, agent_description=agent_1_description))
agent_params.append(AgentParam(agent_name=agent_2_name, agent_description=agent_2_description))



conversation_request = ConversationRequest(
    turns=turns,
    topic=topic,
    agent_params=agent_params
)

run(conversation_request)

ERROR:edsl:
An API Key for model `google/gemma-2-9b-it` is missing from the .env file.
This key is associated with the inference service `deep_infra`.
Please see https://docs.expectedparrot.com/en/latest/api_keys.html for more information.



<class 'dict'>
-----Creating Agent Personas------



Exception Type,Service,Model,Question,Count
TimeoutError,deep_infra,google/gemma-2-9b-it,create_agent_q,5
LanguageModelNoResponseError,deep_infra,google/gemma-2-9b-it,create_agent_q,1


ERROR:edsl:
An API Key for model `google/gemma-2-9b-it` is missing from the .env file.
This key is associated with the inference service `deep_infra`.
Please see https://docs.expectedparrot.com/en/latest/api_keys.html for more information.



Fetching agent David Magnus failed


Service,Model,Input Tokens,Input Cost,Output Tokens,Output Cost,Total Cost,Total Credits
deep_infra,google/gemma-2-9b-it,133,$0.0001,695,$0.0001,$0.0002,0.02
Totals,Totals,133,$0.0001,695,$0.0001,$0.0002,0.02


ERROR:edsl:
An API Key for model `google/gemma-2-9b-it` is missing from the .env file.
This key is associated with the inference service `deep_infra`.
Please see https://docs.expectedparrot.com/en/latest/api_keys.html for more information.



-----Agent Personas Created

Agent David Magnus speaking at index: 0



Service,Model,Input Tokens,Input Cost,Output Tokens,Output Cost,Total Cost,Total Credits
deep_infra,google/gemma-2-9b-it,201,$0.0001,653,$0.0001,$0.0002,0.02
Totals,Totals,201,$0.0001,653,$0.0001,$0.0002,0.02


ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
Exceptions were raised.



ERROR:edsl:
An API Key for model `google/gemma-2-9b-it` is missing from the .env file.
This key is associated with the inference service `deep_infra`.
Please see https://docs.expectedparrot.com/en/latest/api_keys.html for more information.



'David Magnus':None


Agent Russ Altman speaking at index: 1



Service,Model,Input Tokens,Input Cost,Output Tokens,Output Cost,Total Cost,Total Credits
deep_infra,google/gemma-2-9b-it,15941,$0.0008,681,$0.0001,$0.0009,0.09
Totals,Totals,15941,$0.0008,681,$0.0001,$0.0009,0.09

Exception Type,Service,Model,Question,Count
TimeoutError,deep_infra,google/gemma-2-9b-it,dialogue,1


ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
Exceptions were raised.



ERROR:edsl:
An API Key for model `google/gemma-2-9b-it` is missing from the .env file.
This key is associated with the inference service `deep_infra`.
Please see https://docs.expectedparrot.com/en/latest/api_keys.html for more information.



'Russ Altman':None


Agent David Magnus speaking at index: 2



Service,Model,Input Tokens,Input Cost,Output Tokens,Output Cost,Total Cost,Total Credits
deep_infra,google/gemma-2-9b-it,15298,$0.0008,423,$0.0001,$0.0009,0.09
Totals,Totals,15298,$0.0008,423,$0.0001,$0.0009,0.09


ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
ERROR:edsl:No key found for service 'deep_infra'
ERROR:edsl:Language model did not return a response for question 'dialogue.'
Exceptions were raised.



'David Magnus':None



Fetching page: 1
Failed to parse response: malformed node or string on line 7: <ast.Name object at 0x7f2c6de2f610>

✅ Results saved to backend/static/output-David Magnus_Russ Altman_3.csv


{'message': 'Conversation complete. Saved to output-David Magnus_Russ Altman_3.csv',
 'file': 'output-David Magnus_Russ Altman_3.csv',
 'conversation': [{'iteration': 0,
   'agent': 'David Magnus',
   'answer': "Russ, it's a pleasure to be discussing this crucial intersection of AI ethics and biosecurity with you. I believe we both recognize the profound potential – and inherent risks – that AI presents in this domain. My initial proposal is that we focus our collaborative efforts on establishing a framework for *responsible innovation* within AI-driven biosecurity applications. This means prioritizing transparency, accountability, and robust safety protocols from the very inception of these technologies. Specifically, I'd like to see us address the dual-use dilemma – ensuring AI tools designed for beneficial purposes aren’t easily repurposed for malicious intent – and develop mechanisms for ongoing ethical review and adaptation as the technology evolves. My priorities are, first, miti

In [None]:
 #That's it, DO NOT RUN THE CODE AFTER THIS !!

In [None]:
from collections import UserList
import asyncio
import inspect
from typing import Optional, Callable, TYPE_CHECKING
from edsl import QuestionFreeText, Results, AgentList, ScenarioList, Scenario, Model
from edsl.questions import QuestionBase
from edsl.results import Result
from jinja2 import Template
from edsl.caching import Cache

if TYPE_CHECKING:
    from edsl.language_models.model import Model

from edsl.conversation.next_speaker_utilities import (
    default_turn_taking_generator,
    speaker_closure,
)


class AgentStatement:
    def __init__(self, statement: Result):
        self.statement = statement

    @property
    def agent_name(self):
        return self.statement["agent"]["name"]

    def to_dict(self):
        return self.statement.to_dict()

    @classmethod
    def from_dict(cls, data):
        return cls(Result.from_dict(data))

    @property
    def text(self):
        return self.statement["answer"]["dialogue"]


class AgentStatements(UserList):
    def __init__(self, data=None):
        super().__init__(data)

    @property
    def transcript(self):
        return [{s.agent_name: s.text} for s in self.data]

    def to_dict(self):
        return [d.to_dict() for d in self.data]

    @classmethod
    def from_dict(cls, data):
        return cls([AgentStatement.from_dict(d) for d in data])


class Conversation:
    """A conversation between a list of agents. The first agent in the list is the first speaker.
    After that, order is determined by the next_speaker function.
    The question asked to each agent is determined by the next_statement_question.

    If the user has passed in a "per_round_message_template", this will be displayed at the beginning of each round.
    {{ round_message }} must be in the question_text.
    """

    def __init__(
        self,
        agent_list: AgentList,
        max_turns: int = 20,
        stopping_function: Optional[Callable] = None,
        next_statement_question: Optional[QuestionBase] = None,
        next_speaker_generator: Optional[Callable] = None,
        verbose: bool = False,
        per_round_message_template: Optional[str] = None,
        conversation_index: Optional[int] = None,
        cache=None,
        disable_remote_inference=False,
        default_model: Optional[Model] = None,
        topic: Optional[str] = None,
    ):
        self.disable_remote_inference = disable_remote_inference
        self.per_round_message_template = per_round_message_template

        if cache is None:
            self.cache = Cache()
        else:
            self.cache = cache

        self.agent_list = agent_list

        from edsl import Model

        for agent in self.agent_list:
            if not hasattr(agent, "model"):
                if default_model is not None:
                    agent.model = default_model
                else:
                    agent.model = Model()

        self.verbose = verbose
        self.agent_statements = []
        self._conversation_index = conversation_index
        self.agent_statements = AgentStatements()

        self.max_turns = max_turns

        if topic is not None:
            self.topic = topic
            import textwrap
            start_question = textwrap.dedent(
                """\
You are {{agent_name}}. You are starting the conversation on {{conversation}} with {{other_agent_names}}.
How do you want to start?
                """
            )
            self.start_statement_question = QuestionFreeText(
                question_text=start_question,
                question_name="dialogue",
            )
        else:
            self.start_statement_question = None
            self.topic = None

        if next_statement_question is None:
            import textwrap

            base_question = textwrap.dedent(
                """\
You are {{ agent_name }}. This is the conversation so far: {{ conversation }}
{% if round_message is not none %}
{{ round_message }}
{% endif %}
What do you say next?"""
            )
            self.next_statement_question = QuestionFreeText(
                question_text=base_question,
                question_name="dialogue",
            )
        else:
            self.next_statement_question = next_statement_question
            if (
                per_round_message_template
                and "{{ round_message }}" not in next_statement_question.question_text
            ):
                from edsl.conversation.exceptions import ConversationValueError
                raise ConversationValueError(
                    "If you pass in a per_round_message_template, you must include {{ round_message }} in the question_text."
                )

        # Determine how the next speaker is chosen
        if next_speaker_generator is None:
            func = default_turn_taking_generator
        else:
            func = next_speaker_generator

        # Choose the next speaker
        self.next_speaker = speaker_closure(
            agent_list=self.agent_list, generator_function=func
        )

        # Determine when the conversation ends
        if stopping_function is None:
            self.stopping_function = lambda agent_statements: False
        else:
            self.stopping_function = stopping_function

    async def continue_conversation(self, **kwargs) -> bool:
        if len(self.agent_statements) >= self.max_turns:
            return False

        if inspect.iscoroutinefunction(self.stopping_function):
            should_stop = await self.stopping_function(self.agent_statements, **kwargs)
        else:
            should_stop = self.stopping_function(self.agent_statements, **kwargs)

        return not should_stop

    def add_index(self, index) -> None:
        self._conversation_index = index

    @property
    def conversation_index(self):
        return self._conversation_index

    def to_dict(self):
        return {
            "agent_list": self.agent_list.to_dict(),
            "max_turns": self.max_turns,
            "verbose": self.verbose,
            "agent_statements": [d.to_dict() for d in self.agent_statements],
            "conversation_index": self.conversation_index,
        }

    @classmethod
    def from_dict(cls, data):
        agent_list = AgentList.from_dict(data["agent_list"])
        max_turns = data["max_turns"]
        verbose = data["verbose"]
        agent_statements = (AgentStatements.from_dict(data["agent_statements"]),)
        conversation_index = data["conversation_index"]
        return cls(
            agent_list=agent_list,
            max_turns=max_turns,
            verbose=verbose,
            results_data=agent_statements,
            conversation_index=conversation_index,
        )

    def to_results(self):
        return Results(data=[s.statement for s in self.agent_statements])

    def summarize(self):
        d = {
            "num_agents": len(self.agent_list),
            "max_turns": self.max_turns,
            "conversation_index": self.conversation_index,
            "transcript": self.to_results().select("agent_name", "dialogue").to_list(),
            "number_of_agent_statements": len(self.agent_statements),
        }
        return Scenario(d)

    async def get_start_statement(self, *, speaker, others, topic) -> "Result":
      """Get the first statement from the speaker related to the topic"""
      q = self.start_statement_question
      from edsl import Scenario
      s = Scenario(
          {
              "agent_name": speaker.name,
              "other_agent_names": others,
              "conversation": topic,
              "conversation_index": self.conversation_index,
              "index": 0,
              "round_message": None,
          }
      )
      jobs = q.by(s).by(speaker).by(speaker.model)
      jobs.show_prompts()
      results = await jobs.run_async(
          cache=self.cache, disable_remote_inference=self.disable_remote_inference
      )
      return results[0]

    async def get_next_statement(self, *, index, speaker, conversation) -> "Result":
        """Get the next statement from the speaker."""
        q = self.next_statement_question
        # assert q.parameters == {"agent_name", "conversation"}, q.parameters
        from edsl import Scenario

        if self.per_round_message_template is None:
            round_message = None
        else:
            round_message = Template(self.per_round_message_template).render(
                {"max_turns": self.max_turns, "current_turn": index}
            )

        s = Scenario(
            {
                "agent_name": speaker.name,
                "conversation": conversation,
                "conversation_index": self.conversation_index,
                "index": index,
                "round_message": round_message,
            }
        )
        jobs = q.by(s).by(speaker).by(speaker.model)
        jobs.show_prompts()
        results = await jobs.run_async(
            cache=self.cache, disable_remote_inference=self.disable_remote_inference,
        )
        # df = results.to_pandas()
        # df.to_pickle(f"results_{index}.pkl", compression='zip')
        return results[0]

    def converse(self):
        return asyncio.run(self._converse())

    async def _converse(self):
        i = 0
        while await self.continue_conversation():
            speaker = self.next_speaker()
            print(f"Agent {speaker.name} speaking at index: {i}\n")
            if (i==0):
              next_statement = AgentStatement(
                  statement = await self.get_start_statement(
                      speaker=speaker,
                      others=[a.name for a in self.agent_list if a != speaker],
                      topic=self.topic,
                  )
              )
            else:
              next_statement = AgentStatement(
                  statement=await self.get_next_statement(
                      index=i,
                      speaker=speaker,
                      conversation=self.agent_statements.transcript,
                  )
              )
            self.agent_statements.append(next_statement)
            if self.verbose:
                print(f"'{speaker.name}':{next_statement.text}")
                print("\n")
            i += 1


class ConversationList:
    """A collection of conversations to be run in parallel."""

    def __init__(self, conversations: list[Conversation], cache=None):
        self.conversations = conversations
        for i, conversation in enumerate(self.conversations):
            conversation.add_index(i)

        if cache is None:
            self.cache = Cache()
        else:
            self.cache = cache

        for c in self.conversations:
            c.cache = self.cache

    async def run_conversations(self):
        await asyncio.gather(*[c._converse() for c in self.conversations])

    def run(self) -> None:
        """Run all conversations in parallel"""
        asyncio.run(self.run_conversations())

    def to_dict(self) -> dict:
        return {"conversations": c.to_dict() for c in self.conversations}

    @classmethod
    def from_dict(cls, data):
        conversations = [Conversation.from_dict(d) for d in data["conversations"]]
        return cls(conversations)

    def to_results(self) -> Results:
        """Return the results of all conversations as a single Results"""
        first_convo = self.conversations[0]
        results = first_convo.to_results()
        for conv in self.conversations[1:]:
            results += conv.to_results()
        return results

    def summarize(self) -> ScenarioList:
        return ScenarioList([c.summarize() for c in self.conversations])

In [None]:
import sqlite3
from edsl import Agent, AgentList, Model, QuestionFreeText, Scenario
# from edsl.conversation import Conversation

# Load Expected Parrot Model
edsl_model = Model(model_name="meta-llama/Llama-3.2-1B-Instruct", service_name='deep_infra')

def negotiation_agents():
    putin = Agent(
        name="Vladimir Putin",
        traits={
            "persona": "President of Russia, former KGB officer",
            "leadership": "Authoritarian, strategic, and pragmatic",
            "negotiation": "Hardline, calculated, and tactical",
            "military_policy": "Strong focus on military dominance and security",
            "economic_strategy": "Energy leverage, sanctions resilience, and state-controlled economy",
            "communication_style": "Cold, calculated, and propagandist",
            "goal": "Minimize impacts of western sanctions and improve diplomatic and economic ties with China to herald a new world order",
        },
    )
    xi = Agent(
        name="Xi Jinping",
        traits={
            "persona": "President of China, Chairman of the Communist Party",
            "leadership": "Authoritarian, long-term strategist, nationalist",
            "economic_strategy": "State-controlled economy, tech-driven growth, BRI expansion",
            "global_diplomacy": "Soft power strategist, U.S. rival, Russia-China alliance builder",
            "military_policy": "Modernizes PLA, cyber warfare focus, South China Sea expansion",
            "communication_style": "Diplomatic, calculated, and propagandist",
            "goal": "Minimize impacts of US Tariff war with China by seeking new economic and diplomatic avenues in Russia",
        },
    )
    return AgentList([putin, xi])


# Generate Putin & Xi personas
# putin_traits = generate_realistic_persona("Vladimir Putin")
# xi_traits = generate_realistic_persona("Xi Jinping")

# # Define agents dynamically
# putin = Agent(name="Putin", traits={"persona": "You are Vladimir Putin, President of Russia."})
# xi = Agent(name="Xi Jinping", traits={"persona": "You are Xi Jinping, President of China."})

# agents = AgentList([putin, xi])
agents = negotiation_agents()
topic = "How should Russia and China together respond to western economic sanctions?"

# Define the conversation
conversation = Conversation(
    agent_list=agents,
    max_turns=10,
    verbose=True,
    default_model=edsl_model,
    topic = topic,
)

# Run the conversation and store results in SQLite
conversation.converse()
results = conversation.to_results()

# Save conversation to SQLite
# for row in results.to_dict():
#     cursor.execute("INSERT INTO conversations (agent_name, dialogue, topic) VALUES (?, ?, ?)",
#                    (row["agent_name"], row["dialogue"], row.get("topic", "")))
# conn.commit()

# Print conversation results
results.select("index", "agent_name", "dialogue").print(format="rich")
# results.select("agent.*", "answer.*").print(format="rich")

# Close database connection
# conn.close()


In [None]:
from edsl import Results
res_arr = ["bfc2881a-f230-4be9-8311-ff519adbee10",
           "9115af2c-e541-412e-a7b4-e161989f371c",
           "097fad39-8479-4ded-adb4-1d73f5794b85",
           "8b7d9c84-f8a5-40c0-8ebe-f9c4932af2b8",
           "a6538ab4-7a6e-41e4-9d2d-3c63700966a6",
           "b621823f-e261-4c99-8ae3-ae0f72fbf82a",
           "e9337113-cf13-48c7-ab81-86d142803467",
           "0d6da67d-942e-4762-9217-6c4fa825e33e",
           "02c0e85f-9b3d-45cf-b0e5-b41c4ffa4723",
           "0e6648db-851d-4ebd-9168-43f8f4a34b9a"]

dialogue_list = []
for item in res_arr:
  res = Results.pull(item)
  dial = res.select("index", "agent_name", "dialogue")
  # dial.print(format="rich")
  dialogue_list.append(dial)

In [None]:
dl_2 = []
for item in dialogue_list:
  dl_2.append(
      {
          "index": item.data[0]["scenario.index"][0],
          "agent_name": item.data[1]["agent.agent_name"][0],
          "dialogue": item.data[2]["answer.dialogue"][0]
      }
  )

print(dl_2)

In [None]:
import pandas as pd
df = pd.DataFrame.from_records(dl_2, columns=['index', 'agent_name', 'dialogue'])
df.head()
df.to_csv('dialogue.csv', index=False)
