In [None]:
# Imports
import os
import asyncio
from typing import cast, List, Tuple
from dotenv import load_dotenv

# Agent Framwork
from agents import Agent, Runner, OpenAIChatCompletionsModel, AsyncOpenAI
from opentelemetry import trace as otel_trace
from phoenix.otel import register

# Markdown output display
from IPython.display import Markdown, display

In [None]:
# Cast to String
def env_to_str(env: str) -> str:
    return cast(str, os.getenv(env))

In [None]:
# Environment Variables

# Load Environment Variables
load_dotenv(override=True)

# Ollama Environment Variables
ollama_api_key = env_to_str('OLLAMA_API_KEY')
ollama_base_url = env_to_str('OLLAMA_BASE_URL')

# Gemini Environment Variables
gemini_api_key = env_to_str("GEMINI_API_KEY")
gemini_base_url = env_to_str("GEMINI_BASE_URL")

# Phoenix Collector Variables
phoenix_collector_endpoint = env_to_str("PHOENIX_COLLECTOR_ENDPOINT") or "http://localhost:6006/v1/traces"

# Maileroo Environment Variables
maileroo_api_key = env_to_str("MAILEROO_API_KEY")
maileroo_base_url = env_to_str("MAILEROO_BASE_URL")
maileroo_template_url = env_to_str("MAILEROO_TEMPLATE_URL")

In [None]:
# Available AI Models

# Ollama General Models
model_mistral = env_to_str("MODEL_MISTRAL")
model_llama = env_to_str('MODEL_LLAMA3')
model_mistral_nemo = env_to_str("MODEL_MISTRAL_NEMO")
model_gemma = env_to_str("MODEL_GEMMA3")

# Ollama Resoning Models 
model_phi = env_to_str("MODEL_PHI3.5")
model_qwen = env_to_str("MODEL_QWEN3")
model_deepseek = env_to_str("MODEL_DEEPSEEK_R1")

# Gemini Models
model_gemini_flash = env_to_str("MODEL_GEMINI_FLASH")

In [None]:
# Dot Notation Class
class DotNotation:
    def __init__(self, dictionary: dict):
        for key, value in dictionary.items():
            if isinstance(value, dict):
                value = DotNotation(value)

            setattr(self, key, value)

    def __getitem__(self, key):
        return getattr(self, key)
    
    def __setitem__(self, key, value):
        setattr(self, key, value)

    def to_dict(self):
        result = {}

        for key in self.__dict__:
            value = getattr(self, key)

            if isinstance(value, DotNotation):
                value = value.to_dict()
            
            result[key] = value
        
        return result

In [None]:
# Configurations

# Set Current Model
current_model = model_llama

# Multi Model Flag
multi_model = False

# Check for the need of multiple models, 
# and if not, make sure to have selected 
# a current ai model.
if multi_model and current_model != "":
    print("This is a multi model setup. Leave current model as empty string: ''.")
elif multi_model and current_model == "":
    print("This is a multi model setup. ")
elif not multi_model and current_model == "":
    print("This is a single model setup. Please set a current model.")
elif not multi_model and current_model != "":
    print(f"This is a single model setup. The current model is: {model_llama}")

In [None]:
# Agent Workflow Instructions

agent1 = ("","")

instructions1 = "You are a sales agent working for ComplAI, \
a company that provides a SaaS tool for ensuring SOC2 compliance and preparing for audits, powered by AI. \
You write professional, serious cold emails."

instructions2 = "You are a humorous, engaging sales agent working for ComplAI, \
a company that provides a SaaS tool for ensuring SOC2 compliance and preparing for audits, powered by AI. \
You write witty, engaging cold emails that are likely to get a response."

instructions3 = "You are a busy sales agent working for ComplAI, \
a company that provides a SaaS tool for ensuring SOC2 compliance and preparing for audits, powered by AI. \
You write concise, to the point cold emails."

instructions4 = "You pick the best cold sales email from the given options. \
Imagine you are a customer and pick the one you are most likely to respond to. \
Do not give an explanation; reply with the selected email only."

In [None]:
# Configure Phoenix Tracer

if current_model != "":
    phoenix_project_name = f"{current_model}_phoenix"
else:
    phoenix_project_name = "multi_model_phoenix"

try:
    tracer_provider = register(
    project_name=phoenix_project_name,
    auto_instrument=True,
    endpoint=phoenix_collector_endpoint,
    set_global_tracer_provider=True
)
    print("✅ Phoenix tracer registered successfully")
except Exception as e:
    print(f"❌ Phoenix registration failed: {e}")

In [None]:
# Llama3 Model
llama = OpenAIChatCompletionsModel(
    model=model_llama,
    openai_client=AsyncOpenAI(
        api_key=ollama_api_key, 
        base_url=ollama_base_url
    )
)

In [None]:
# Agents

sales_agent1 = Agent(
        name="Professional Sales Agent",
        instructions=instructions1,
        model=llama
)

sales_agent2 = Agent(
        name="Engaging Sales Agent",
        instructions=instructions2,
        model=llama
)

sales_agent3 = Agent(
        name="Busy Sales Agent",
        instructions=instructions3,
        model=llama
)

sales_picker = Agent(
    name="sales_picker",
    instructions=instructions4,
    model=llama
)

In [None]:
# Agentic Workflow

class AgenticWorkflow:
    # Constructor
    def __init__(
            self, 
            tracer_name: str, 
            model_name: str, 
            generation_agents: List[Agent], 
            evaluator_agent: Agent):
        self.tracer_name = tracer_name
        self.model_name = model_name
        self.generation_agents = generation_agents
        self.evaluator_agent = evaluator_agent

        # Arize Phoenix 
        # OpenTelemetry Tracer
        self.tracer = otel_trace.get_tracer(__name__)

    # Run Generation
    async def _run_generation(self, prompt: str) -> List[str]:
        with self.tracer.start_as_current_span(f"{self.tracer_name} - Generation") as span:
            span.set_attribute("phase", "generation")
            span.set_attribute("input.prompt", prompt)
            span.set_attribute("model.used", self.model_name)

            # Check for generation agents provided
            if not self.generation_agents:
                print(f"Warning: No generation agents provided for '{self.tracer_name}'.")
                return []
            
            # Run all generation agents
            agent_run_results = await asyncio.gather(
                *[Runner.run(agent, prompt) for agent in self.generation_agents]
            )

            # Extract Final Outputs
            generated_outputs = [result.final_output for result in agent_run_results]

            if generated_outputs:
                total_length = sum(len(output) for output in generated_outputs)
                span.set_attribute("generation.count", len(generated_outputs))
                span.set_attribute("generation.total_length", total_length)
                span.set_attribute("generation.first_preview", generated_outputs[0][:100] if generated_outputs[0] else "")

            return generated_outputs
    
    # Run Evaluation
    async def _run_evaluation(self, generated_content: List[str]) -> str:
            with self.tracer.start_as_current_span(f"{self.tracer_name} - Evaluation") as span:
                span.set_attribute("phase", "evaluation")
                span.set_attribute("evaluator.agent_name", self.evaluator_agent.name)

                # Combine generated content into a string
                evaluator_input = "\n\n--- Option ---\n\n".join(generated_content)

                span.set_attribute("evaluator.input_preview", evaluator_input[:500] if evaluator_input else "")

            # Run Evaluator Agent
            eval_result = await Runner.run(self.evaluator_agent, evaluator_input)
            final_output = eval_result.final_output

            span.set_attribute("evaluation.output_length", len(final_output))
            span.set_attribute("evaluation.output_preview", final_output[:100] if final_output else "")

            return final_output
    
    # Execute Workflow
    async def execute_workflow(self, initial_prompt: str) -> Tuple[List[str], str]:
        with self.tracer.start_as_current_span(self.tracer_name) as main_span:
            main_span.set_attribute("workflow.type", "generic_generation_evaluation")
            main_span.set_attribute("workflow.initial_prompt", initial_prompt)

            # Execute Generation
            generated_items = await self._run_generation(initial_prompt)

            # Execute Evaluation
            evaluated_item = await self._run_evaluation(generated_items)

            print(f"\n--- All Generated Items for '{self.tracer_name}' ---")
            if generated_items:
                for i, item in enumerate(generated_items):
                    display(Markdown(f"## Generated Item {i+1}:\n\n{item}\n\n---"))
            else:
                print("No items were generated.")

            print(f"\n--- Best Evaluated Item for '{self.tracer_name}' ---")
            if evaluated_item:
                display(Markdown(evaluated_item))
            else:
                print("No item was evaluated/picked.")


            # Final Summary Attributes
            main_span.set_attribute("final.generated_count", len(generated_items))
            main_span.set_attribute("final.evaluated_item_length", len(evaluated_item))
            main_span.set_attribute("final.evaluated_item_preview", evaluated_item[:100] if evaluated_item else "")

        return generated_items, evaluated_item

In [None]:
# # Phoenix Tracer
# async def phoenix_tracer(trace_name, message, *agents):
#     tracer = otel_trace.get_tracer(__name__)
#     with tracer.start_as_current_span(trace_name) as current_span:
#         if agents:
            
#             # Trace Attributes
#             current_span.set_attribute("user.request", message)
#             current_span.set_attribute("model.name", model_llama)
            
#             for index, agent in enumerate(agents):
                
#                 # Trace Attributes
#                 current_span.set_attribute("agent.name", agent.name)
        
#                 results = await asyncio.gather(
#                     *[Runner.run(agent, message) for agent in agents]
#                 )

#                 outputs = []

#                 for result in results:
#                     outputs.append(result.final_output)

#                 combined_outputs = "\n\n--- Option ---\n\n".join(outputs)
#                 best = await Runner.run(agents[-1], combined_outputs)

#                 if outputs:
#                     total_length = sum(len(output) for output in outputs)
                    
#                     # Add response attributes
#                     current_span.set_attribute("response.length", total_length)
#                     # current_span.set_attribute("response.preview", outputs[0][:100] if outputs[0] else "")
#                     current_span.set_attribute("response.preview", outputs[0] if outputs[0] else "")
#         else:
#             print("No additional arguments provided.")

#         # outputs = [result.final_output for result in results]
        
        
#         for output in outputs:
#             display(Markdown(f"# Sales Agent:\n\n{output}\n\n"))
#             display(Markdown(f"# Sales Picker:\n\n{best}"))
#         return outputs, best

In [None]:
# Result
# result, best = await phoenix_tracer(
#     "Parallel Cold Emails", 
#     "Write a cold sales email.", 
#     sales_agent1, 
#     sales_agent2, 
#     sales_agent3,
#     sales_picker
# )