## ü™£ Clone the Prompt-Learning Repository

This section clones the **Arize Prompt-Learning** repo and adds it to `sys.path`  
so that modules like `optimizer_sdk`, `phoenix`, and utilities can be imported directly.

In [None]:
# !git clone https://github.com/Arize-ai/prompt-learning.git
%cd prompt-learning
import sys
sys.path.append('.')
%cd notebooks
sys.path.append('..')

## ‚öôÔ∏è Install Dependencies

Install all required packages for Google ADK, LiteLLM, Phoenix SDK, and Vertex AI integration.  
These libraries enable you to run and evaluate LLMs through **Vertex AI** and track results in **Arize Phoenix**.


In [None]:
%pip install --upgrade --quiet google-cloud-aiplatform[agent_engines,adk] a2a-sdk a2a-sdk[http-server] litellm pydantic pydantic-ai fastmcp numpy python-dotenv nest-asyncio arize-phoenix numpy scipy

## üîê Authenticate and Connect Phoenix

Authenticate your Google Cloud account and connect to your Phoenix workspace.  
You‚Äôll be prompted for:
- **Phoenix Collector Endpoint**
- **Phoenix API Key**

These allow experiment tracking and dataset creation within Phoenix.


In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

In [None]:
%pip install --upgrade arize-phoenix numpy scipy

In [None]:
import os, getpass
# If you're self-hosting Phoenix, change this value:
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = getpass.getpass('Phoenix Collector Endpoint:')

PHOENIX_API_KEY = getpass.getpass('Phoenix API Key:')
os.environ["PHOENIX_API_KEY"] = PHOENIX_API_KEY

from phoenix.client import Client
phoenix_client = Client()

In [None]:
import sys, os, getpass
import openai
import pandas as pd
import re
import pandas as pd
import vertexai

from google.adk.models.lite_llm import litellm
from google.adk.models.lite_llm import LiteLlm
from google.adk.agents import LlmAgent

from phoenix.client.types import PromptVersion

import nest_asyncio
nest_asyncio.apply()

## ‚òÅÔ∏è Configure Vertex AI Environment

Set up your Google Cloud project, region, and GCS bucket for Vertex AI.  
This ensures that all Vertex and ADK calls use your correct project context.


In [None]:
# fmt: off
PROJECT_ID = "primal-oxide-268801"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
LOCATION = "us-central1" # @param {type: "string", placeholder: "[your-location]", isTemplate: true}
# fmt: on

# Create the bucket
BUCKET_NAME = f"{PROJECT_ID}-agent"
BUCKET_URI = f"gs://{BUCKET_NAME}"

# Set environment variables for ADK
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["GOOGLE_CLOUD_LOCATION"] = LOCATION
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"

# For notebook async support
nest_asyncio.apply()

# Initiate the client
client = vertexai.Client(project=PROJECT_ID, location=LOCATION)

## üß† Define the System Prompt and Upload to Phoenix

Here we define the classifier prompt listing all supported **support-ticket categories**.  
Then we upload it to Phoenix Prompt Hub using `upload_prompt_phoenix()`,  
which versions and stores the prompt for tracking.


In [None]:
system_prompt = """
Account Creation
Login Issues
Password Reset
Two-Factor Authentication
Profile Updates
Billing Inquiry
Refund Request
Subscription Upgrade/Downgrade
Payment Method Update
Invoice Request
Order Status
Shipping Delay
Product Return
Warranty Claim
Technical Bug Report
Feature Request
Integration Help
Data Export
Security Concern
Terms of Service Question
Privacy Policy Question
Compliance Inquiry
Accessibility Support
Language Support
Mobile App Issue
Desktop App Issue
Email Notifications
Marketing Preferences
Beta Program Enrollment
General Feedback

Return just the category, no other text for the support query.
"""

def upload_prompt_phoenix(system_prompt, name, iteration, prompt_versions):
    prompt_version = PromptVersion(
        [{"role": "system", "content": system_prompt}],  # System message
        model_name="llama-3.3-70b-instruct-maas",  # Model being used
        description="Prompt for support query classification",
    )

    # Create prompt in Phoenix
    initial_prompt_version = phoenix_client.prompts.create(
        name=name,
        version=prompt_version,
    )

    prompt_versions.append({
        "iteration": iteration,
        "prompt": system_prompt,
        "phoenix_id": initial_prompt_version.id if hasattr(initial_prompt_version, 'id') else None,
    })
    return prompt_versions

prompt_versions = upload_prompt_phoenix(system_prompt, "support_query_classification", 0, [])


## üìä Load and Split Dataset

Load the `support_queries.csv` dataset, then split it into **training** (70%) and **test** (30%) sets.  
Each dataset is uploaded to Phoenix as a tracked dataset for experimentation.


In [None]:
data = pd.read_csv("../datasets/support_queries.csv")

train_set = data.sample(frac=0.7, random_state=42)
test_set = data.drop(train_set.index)

train_dataset = phoenix_client.datasets.create_dataset(
        name="training_data_support_query_classification",
        dataframe=train_set,
        input_keys=['query'],
        output_keys=['ground_truth'],
    )

test_dataset = phoenix_client.datasets.create_dataset(
        name="test_data_support_query_classification",
        dataframe=test_set,
        input_keys=['query'],
        output_keys=['ground_truth'],
    )

## ü§ñ Create Agent, Runner, and Session Utilities

Initialize the **Llama-3.3-70B-Instruct (Vertex)** model via `LiteLlm`.  
Define helper functions to:
- Create a reusable agent + runner  
- Manage sessions (`get_or_create_session`)  
- Generate completions  
- Wrap tasks for experiment execution


In [None]:
import os, asyncio
from google.genai import types
from google.adk.agents.llm_agent import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
import litellm

litellm.vertex_project = os.environ.get("GOOGLE_CLOUD_PROJECT")
litellm.vertex_location = os.environ.get("GOOGLE_CLOUD_LOCATION")

def create_agent_runner(system_prompt: str):

  agent = LlmAgent(
      model=LiteLlm(model="vertex_ai/meta/llama-3.3-70b-instruct-maas"),
      name="support_query_classifier",
      instruction=(
          system_prompt
      ),
  )

  session_service = InMemorySessionService()
  runner = Runner(app_name=agent.name, agent=agent, session_service=session_service)
  return agent, session_service, runner

async def get_or_create_session(user_id: str, session_id: str, agent, session_service) -> str:
    sid = session_id or f"{user_id}-default"
    sess = await session_service.get_session(app_name=agent.name, user_id=user_id, session_id=sid)
    if sess is None:
        await session_service.create_session(app_name=agent.name, user_id=user_id, session_id=sid)
    return sid

async def completion(user_id: str, query: str, session_id: str, agent, session_service, runner) -> str:
    sid = await get_or_create_session(user_id, session_id, agent, session_service)
    content = types.Content(role="user", parts=[types.Part(text=query)])
    final_text = None
    async for event in runner.run_async(session_id=sid, user_id=user_id, new_message=content):
        if event.is_final_response() and event.content:
            for part in event.content.parts:
                if getattr(part, "text", None):
                    final_text = part.text
            break
    return final_text or ""


# üß™ Phoenix Experiments

In order to iterate on our prompts, we must experiment with our prompts at each iteration. Phoenix allows you to experiment with your prompts at scale, by running them over large datasets.

Phoenix also allows you to evaluate your experiment, by setting up LLM and code evaluations.

In order to run an experiment, you must define a task function and evaluator function(s).

**task function**: In our task we define the output generation of your experiment. For us, we'll call our `completion` function with the proper input

**evaluators**: We define two evaluators.

*test_evaluator*: This is a simple code evaluator that compares the generated class with the ground truth class. This gives us accuracy.

*output_evaluator*: This is an LLM evaluator that generates our feedback for optimization. It looks to answer questions like why certain outputs were wrong, and why the model made the wrong decision. You can see the entire prompt used for evaluation in `prompts` -> `support_query_classification` -> `simple_evaluator_prompt.txt`.

We'll be using a stronger Llama model, **Llama-3.3-70B-Instruct** for evals. This will help us generate better evaluations, which will allow us to build a prompt that works even with a smaller model.


In [None]:
def generate_task(user_id, session_id, agent, session_service, runner):
  async def task(input):
    output = await completion(user_id, input["query"], session_id, agent, session_service, runner)
    return output
  return task

In [None]:
def normalize(label):
        return label.strip().strip('"').strip("'").lower()

In [None]:
import json
from phoenix.evals import create_evaluator

with open("../prompts/support_query_classification/simple_evaluator_prompt.txt", "r") as f:
    evaluator_template = f.read()

evaluator = LlmAgent(
    model=LiteLlm(model="vertex_ai/meta/llama-3.3-70b-instruct-maas"),
    name="output_evaluator_agent",
    instruction="Return only JSON with keys: correctness ('correct'|'incorrect') and explanation.",
)

evaluator_session_service = InMemorySessionService()
evaluator_runner = Runner(app_name=evaluator.name, agent=evaluator, session_service=evaluator_session_service)

async def _get_or_create_session(user_id: str, session_id: str) -> None:
    sess = await evaluator_session_service.get_session(app_name=evaluator.name, user_id=user_id, session_id=session_id)
    if sess is None:
        await evaluator_session_service.create_session(app_name=evaluator.name, user_id=user_id, session_id=session_id)

async def run_vertex_llama(prompt: str) -> str:
    user_id, session_id = "evaluator_user", "evaluator_session"
    await _get_or_create_session(user_id, session_id)
    content = types.Content(role="user", parts=[types.Part(text=prompt)])
    final_text = None
    async for event in evaluator_runner.run_async(session_id=session_id, user_id=user_id, new_message=content):
        if event.is_final_response() and event.content:
            for part in event.content.parts:
                if getattr(part, "text", None):
                    final_text = part.text
            break
    return final_text or ""

@create_evaluator(name="output_evaluator", source="llm")
def output_evaluator(input, expected, output):
    prompt = (evaluator_template
              .replace("{query}", input["query"])
              .replace("{ground_truth}", expected["ground_truth"])
              .replace("{output}", output))
    text = asyncio.run(run_vertex_llama(prompt))
    try:
        obj = json.loads(text)
    except json.JSONDecodeError:
        low = text.lower()
        correctness = "correct" if ("correct" in low and "incorrect" not in low) else "incorrect"
        obj = {"correctness": correctness, "explanation": text.strip()}
    correctness = obj.get("correctness", "").strip().lower()
    explanation = obj.get("explanation", "").strip()
    score = 1.0 if correctness == "correct" else 0.0
    label = "correct" if correctness == "correct" else "incorrect"
    return {"score": score, "label": label, "explanation": explanation}

@create_evaluator(name="test_evaluator")
async def test_evaluator(expected, output):
    return normalize(expected.get("ground_truth")) == normalize(output)


## üßæ Process Experiment Results

Fetch completed experiment data from Phoenix via its REST API and merge results  
back into a pandas DataFrame. Adds columns like `feedback`, `ground_truth`, and `output`  
for use in later optimization.


In [None]:
import ast, requests

def process_experiment(experiment, train_set, input_column_name, output_column_name):
    """
    Update existing columns in `train_set` with feedback from experiment annotations.

    Args:
        experiment_json (list): JSON data from experiment.
        train_set (pd.DataFrame): DataFrame that already contains the feedback columns.
        feedback_columns (list): List of feedback field names to update.

    Returns:
        pd.DataFrame: Updated DataFrame with values filled in from experiment annotations.
    """

    experiment_id = experiment["experiment_id"]
    url = f"{os.environ['PHOENIX_COLLECTOR_ENDPOINT']}/v1/experiments/{experiment_id}/json"
    headers = {
        "Authorization": f"Bearer {os.environ['PHOENIX_API_KEY']}"
    }

    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise RuntimeError(f"Failed to fetch experiment data: {response.status_code} {response.text}")

    results = response.json()

    train_set["ground_truth"] = [None] * len(train_set)

    train_set = train_set.reset_index(drop=True)

    for i, entry in enumerate(results):
        eval_output = entry["annotations"][0]["explanation"]
        train_set.loc[i, "ground_truth"] = entry["reference_output"]["ground_truth"]
        train_set["feedback"] = eval_output

    if "output" in train_set.columns:
        train_set.rename(columns={"output": "ground_truth"}, inplace=True)

    train_set[output_column_name] = [entry.get("output") for entry in results]

    train_set[input_column_name] = [entry.get("input") for entry in results]

    return train_set

Unfortunately as of now, Prompt Learning only supports OpenAI models for its meta prompting stage. We will add support for other models, like Llama, soon!

In [None]:
import getpass
os.environ["OPENAI_API_KEY"] = getpass.getpass("enter your openai api key")

## üîÅ Define the Optimization Loop

The `optimize_loop()` function:
1. Runs model + evaluators on the dataset.  
2. Retrieves feedback from Phoenix.  
3. Uses GPT-4o to refine the system prompt.  
4. Uploads the improved prompt to Phoenix.  
5. Repeats for multiple loops (default = 5).


In [None]:
from optimizer_sdk.prompt_learning_optimizer import PromptLearningOptimizer
from phoenix.client.experiments import async_run_experiment
import copy
import asyncio

prompt_name = "support_query_classification"

system_prompt = ("Account Creation\nLogin Issues\nPassword Reset\nTwo-Factor Authentication\n"
          "Profile Updates\nBilling Inquiry\nRefund Request\nSubscription Upgrade/Downgrade\n"
          "Payment Method Update\nInvoice Request\nOrder Status\nShipping Delay\nProduct Return\n"
          "Warranty Claim\nTechnical Bug Report\nFeature Request\nIntegration Help\nData Export\n"
          "Security Concern\nTerms of Service Question\nPrivacy Policy Question\nCompliance Inquiry\n"
          "Accessibility Support\nLanguage Support\nMobile App Issue\nDesktop App Issue\n"
          "Email Notifications\nMarketing Preferences\nBeta Program Enrollment\nGeneral Feedback\n\n"
          "Return just the category, no other text for the support query.")

async def optimize_loop(
    train_dataset,
    test_dataset,
    system_prompt,
    feedback_columns,
    loops=5,
    scorer="accuracy",
    prompt_versions=[],
):
    """
    scorer: one of "accuracy", "f1", "precision", "recall"
    """
    curr_loop = 1

    prompt_versions = upload_prompt_phoenix(system_prompt, prompt_name, 0, [])

    # Initialize all feedback columns

    while loops > 0:
        print(f"üìä Loop {curr_loop}: Optimizing prompt...")

        agent, session_service, runner = create_agent_runner(system_prompt)

        train_experiment = await async_run_experiment(
            dataset=train_dataset,
            task=generate_task(f"train_{loops}", f"train_{loops}", agent, session_service, runner),
            evaluators=[test_evaluator, output_evaluator],
            concurrency=10
        )

        train_df = process_experiment(train_experiment, train_set, "query", "output")

        test_experiment = await async_run_experiment(
            dataset=test_dataset,
            task=generate_task(f"test_{loops}", f"test_{loops}", agent, session_service, runner),
            evaluators=[test_evaluator],
            concurrency=10
        )

        optimizer = PromptLearningOptimizer(
            prompt=system_prompt,
            model_choice="gpt-4o",
            openai_api_key=os.getenv("OPENAI_API_KEY")
        )

        system_prompt = optimizer.optimize(
            dataset=train_df,
            output_column="output",
            feedback_columns=feedback_columns,
            context_size_k=90000,
        )

        prompt_versions = upload_prompt_phoenix(system_prompt, prompt_name, curr_loop, prompt_versions)

        loops -= 1
        curr_loop += 1

    return prompt_versions

# Main execution - use asyncio.run() to run the async function
feedback_columns=["ground_truth", "feedback"]
prompts = asyncio.run(optimize_loop(train_dataset, test_dataset, system_prompt, feedback_columns, loops=5, scorer="accuracy"))

# See your prompts and their accuracies!

In the Phoenix UI, you'll be able to visualize your experiments and their accuracies, to see how much your prompts improved after each iteration of prompt optimization.

To grab the prompts associated with those experiment runs, you can index into the prompts array you generated above.

In [None]:
prompts