### Mods for Colab Compatibility

In [None]:
!pip install triton # required by TorchInductor (backend compiler in PyTorch)
!pip install cloud-tpu-client # for tpu connection

In [None]:
import os
from google.colab import drive

colab = True # global indicator of colab run
drive.flush_and_unmount() # unmount existing session
# mount Google Drive
drive.mount('/content/drive')
# set working dir to finetuning dir
os.chdir(os.path.join("/content/drive/My Drive", "agreemate", "finetuning"))

print("Working directory:", os.getcwd())
print("Files in directory:", os.listdir())

# AgreeMate Model Testing Notebook

Mini notebook for testing baseline and finetuned models on CraigslistBargains data

In [None]:
import os, sys, logging, re, torch
from typing import Optional, Tuple, List, Dict, Any
from dataclasses import dataclass
from logging.handlers import RotatingFileHandler
from tqdm.auto import tqdm
import pandas as pd

project_root = os.path.abspath(os.path.join(os.getcwd(), "..", ".."))
if project_root not in sys.path:
    sys.path.insert(0, project_root)
from agreemate.baseline.utils.data_loader import DataLoader
from model_loader import ModelLoader

Setup Logging

In [None]:
class TqdmCompatibleStreamHandler(logging.StreamHandler):
    """
    Custom logging handler that avoids interfering with tqdm progress bars.
    """
    def emit(self, record):
        try:
            msg = self.format(record)
            tqdm.write(msg) # use tqdm's write method for compatibility
            self.flush()
        except Exception:
            self.handleError(record)

# file handler for logging everything to a file (limit size to 5MB)
file_handler = RotatingFileHandler(
    os.path.join(os.getcwd(), "tests.log"),
    maxBytes=10**6, backupCount=5
)
file_handler.setLevel(logging.DEBUG)

# console handler for logging to notebook's output
console_handler = TqdmCompatibleStreamHandler()
console_handler.setLevel(logging.INFO)

# set up the logging configuration
logging.basicConfig(
    handlers=[file_handler, console_handler],
    format='%(asctime)s - %(levelname)s - %(message)s',
    level=logging.DEBUG,
    force=True, # force logging reconfig
)

# create logger
logger = logging.getLogger(__name__)

# suppress overly verbose logs in notebook output
logging.getLogger("transformers").setLevel(logging.WARNING)
logging.getLogger("datasets").setLevel(logging.WARNING)
logging.getLogger("torch").setLevel(logging.WARNING)
logging.getLogger("accelerate").setLevel(logging.WARNING)

## Scenario Management

In [None]:
@dataclass
class Scenario:
    """Formats raw scenario data into structured buyer/seller knowledge bases."""
    def __init__(self, scenario_dict):
        self.title = scenario_dict['item']['title']
        self.description = scenario_dict['item']['description']
        self.category = scenario_dict['category']
        self.listing_price = scenario_dict['item']['list_price']
        self.buyer_target = scenario_dict['buyer']['target_price']
        self.seller_target = scenario_dict['seller']['target_price']

        self.buyer_kb = {
            "role": "buyer",
            "title": self.title,
            "description": self.description,
            "category": self.category,
            "listing_price": self.listing_price,
            "target_price": self.buyer_target,
            "relative_price": scenario_dict['buyer']['relative_price']
        }

        self.seller_kb = {
            "role": "seller",
            "title": self.title,
            "description": self.description,
            "category": self.category,
            "listing_price": self.listing_price,
            "target_price": self.seller_target,
            "price_delta_pct": scenario_dict['seller']['price_delta_pct']
        }

In [None]:
class Dialogue:
    """Single dialogue turn in negotiation."""
    def __init__(self, utterance, action, price=None, agent="unknown"):
        self.utterance = utterance
        self.action = action
        self.price = price if action in {"price", "counter", "emphasize"} else None
        self.agent = agent
        self.current_price = None

    def extract_price(self):
        """Extracts price from utterance if present."""
        matched_price = re.search(r'\$\s?(\d+(\.\d+)?)', self.utterance)
        if matched_price:
            self.price = float(matched_price.group(1))

In [None]:
class Negotiation:
    """Manages complete negotiation dialogue between agents."""
    def __init__(self, scenario, buyer, seller, base_model):
        self.scenario = scenario
        self.buyer = buyer
        self.seller = seller
        self.base_model = base_model
        self.conversation_history = []
        self.prepared_conversation_history = ""

        self.accepted = False
        self.accepted_price = None

    def add_to_history(self, dialogue):
        """Adds turn to conversation history."""
        self.conversation_history.append(dialogue)
        self.prepared_conversation_history = "\n".join(
            f"{d.agent.capitalize()}: {d.utterance}" for d in self.conversation_history
        )

    def run(self, max_turns=10):
        """Executes negotiation dialogue and return results."""
        logger.info(f"\nNegotiation for: {self.scenario.title}")
        logger.info(f"List Price: ${self.scenario.listing_price}")
        logger.info("="*80)

        turn = 0
        current_price = None

        while turn < max_turns:
            agent = self.buyer if turn % 2 == 0 else self.seller
            dialogue = agent(self.prepared_conversation_history,
                                      (max_turns - turn) // 2, self.base_model)

            logger.info(f"{dialogue.agent.capitalize()}: {dialogue.utterance}")
            self.add_to_history(dialogue)
            turn += 1

            if dialogue.price:
                current_price = dialogue.price
            if dialogue.action == "accept":
                self.accepted = True
                self.accepted_price = current_price
                break
            if dialogue.action == "reject":
                self.accepted = False
                break

        logger.info("~"*80)
        logger.info(f"Outcome: {'Accepted' if self.accepted else 'Rejected'}")
        if self.accepted:
            logger.info(f"Final Price: ${self.accepted_price}")
        logger.info("="*80 + "\n")

        return {
            "accepted": self.accepted,
            "final_price": self.accepted_price,
            "num_turns": len(self.conversation_history),
            "listing_price": self.scenario.listing_price,
            "buyer_target": self.scenario.buyer_target,
            "seller_target": self.scenario.seller_target
        }

## Prompting and Response Generation

Define system prompt and role-specific instructions

In [None]:
def get_buyer_instructions(kb: dict) -> str:
    """Returns buyer-specific instructions."""
    return f"""
    ### Your Role as the Buyer:
    - Goal: Negotiate as close to your target price (${kb['target_price']}) as possible.
    - Strategy:
        1. Begin with offers below your target price but reasonable enough to engage the seller.
        2. Highlight flaws or alternatives to justify lower offers.
        3. Avoid exceeding your target price, even if pressured.
    - Be respectful but firm. Do not overcommit.

    ### Example Buyer Responses:
    1. Thought: I believe the item is worth $150 based on its flaws.
       Action: offer
       Utterance: I can offer $150 considering the wear and tear.
    2. Thought: The seller's offer aligns with my target price.
       Action: accept
       Utterance: $200 works for me. Let's finalize this deal.
    """

def get_seller_instructions(kb: dict) -> str:
    """Returns seller-specific instructions."""
    return f"""
    ### Your Role as the Seller:
    - Goal: Maximize the final price while keeping the buyer engaged.
    - Strategy:
        1. Anchor the negotiation around the listing price (${kb['listing_price']}).
        2. Justify your price by emphasizing the item's quality or rarity.
        3. Counter offers strategically, staying above your target price (${kb['target_price']}).

    ### Example Seller Responses:
    1. Thought: The buyer's offer is too low, but I want to keep negotiating.
       Action: counter
       Utterance: I can't do $150, but I can offer $180 given the item's excellent condition.
    2. Thought: This is my lowest acceptable price; I'm ready to conclude.
       Action: emphasize
       Utterance: This item is priced competitively at $200 due to its rarity.
    """


def get_base_prompt(kb: dict, conversation_history: str, turns_left: int) -> str:
    """Generates a base prompt with common agent context and instructions."""
    return f"""
    ### Negotiation Scenario:
    - Item: {kb['title']}
    - Description: {kb['description']}
    - List Price: ${kb['listing_price']}
    - Your Role: {kb['role'].capitalize()}
    - Turns Left to Conclude: {turns_left}

    ### Previous Conversation:
    {conversation_history}

    ### Response Instructions:
    1. Consider the negotiation context and role-specific goals.
    2. Think aloud about your reasoning before deciding your action.
    3. Respond strictly in the following structured format:
    ---
    Thought: <your reasoning>
    Action: <offer|counter|accept|reject>
    Utterance: <your message>
    (Optional) Price: $<price>
    ---
    Respond below:
    """


def get_parser_prompt(response: str) -> str:
    """Generates a parsing prompt for the base model to structure an agent's response."""
    return f"""
    ### Agent's Response (Free-form):
    ---
    {response}
    ---

    ### Parsing Instructions:
    - Extract and structure the response using the following format:
    ---
    Action: <offer|counter|accept|reject>
    Utterance: <the agent's message>
    (Optional) Price: $<price>
    ---
    Respond in this exact format without any additional information.

    Structured Output:
    """

## Agent Classes

In [None]:
@dataclass
class ModelConfig:
    """Configuration for a model including path and type information."""
    name: str
    model_type: str
    path: str
    model: Optional[object] = None

In [None]:
class Agent:
    """Base agent class for negotiation participants."""
    def __init__(self, config: ModelConfig, role: str, kb: dict, tokenizer: object):
        self.role = role
        self.config = config
        self.kb = kb
        self.tokenizer = tokenizer

    #! PROMPTING
    def truncate_conversation(self, conversation_history: str, max_response_tokens: int = 100) -> str:
        """Truncates conversation history dynamically to fit within model token limits."""
        # get model's max token limit
        max_tokens = getattr(self.config.model.config, "max_position_embeddings", 8192)

        # tokenize base prompt and role instructions to calculate their size
        base_prompt_tokens = len(self.tokenizer.encode(get_base_prompt(self.kb, "", 0), add_special_tokens=False))
        role_instructions_tokens = len(self.tokenizer.encode(
            get_buyer_instructions(self.kb) if self.role == "buyer" else get_seller_instructions(self.kb),
            add_special_tokens=False
        ))

        # calculate available space for conversation history
        reserved_tokens = 5 # BOS + EOS + any other special tokens (safe margin)
        available_tokens = max_tokens - reserved_tokens - base_prompt_tokens - role_instructions_tokens - max_response_tokens
        if available_tokens <= 0:
            raise ValueError("Prompt components exceed the model's token limit!")

        conversation_tokens = self.tokenizer.encode(conversation_history, add_special_tokens=False) # tokenize history

        if len(conversation_tokens) > available_tokens: # truncate conversation history if necessary and return
            logger.info("Truncating conversation history to fit within token limits")
            logger.info(f"Available tokens: {available_tokens} < {len(conversation_tokens)} (history length)")
            return self.tokenizer.decode(conversation_tokens[-available_tokens:], skip_special_tokens=True)

        return conversation_history # return original history if it fits

    def get_prompt(self, conversation_history: str, turns_left: int) -> str:
        """Stitches together role-specific prompt."""
        truncated_history = self.truncate_conversation(conversation_history) # truncate history if needed

        # get base prompt and role-specific instructions
        base_prompt = get_base_prompt(self.kb, truncated_history, turns_left)
        role_instructions = (get_buyer_instructions if self.role == "buyer"
                                                    else get_seller_instructions)(self.kb)

        # combine role-specific instructions with base prompt
        return f"{role_instructions}\n{base_prompt}"


    #! RESPONSE GENERATION AND PARSING
    def generate_response(self, prompt: str, max_tokens: int = 150) -> str:
        """Generates response to prompt using the model."""
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            padding=True
        ).to(self.config.model.device)

        with torch.no_grad():
            outputs = self.config.model.generate(
                **inputs,
                max_new_tokens=max_tokens,
                do_sample=True,
                temperature=0.7,
                pad_token_id=self.tokenizer.eos_token_id
            )

        response = self.tokenizer.decode(outputs[0][inputs["input_ids"].shape[-1]:],
                                                          skip_special_tokens=True)
        return response

    def parse_response(self, raw_response: str, base_model) -> tuple:
        """Parse raw agent response into structured components (action, utterance, price) using the base model."""
        logger.info(f"Parsing Raw Response:\n{raw_response}\n")

        # prepare for parsing
        parser_prompt = get_parser_prompt(raw_response)

        # generate a structured response using the base model
        inputs = self.tokenizer(
            parser_prompt,
            return_tensors="pt",
            padding=True
        ).to(base_model.device)
        with torch.no_grad():
            outputs = base_model.generate(
                **inputs,
                max_new_tokens=20,
                pad_token_id=self.tokenizer.eos_token_id
            )
        parsed_response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        logger.info(f"Parsed Response:\n{parsed_response}\n")

        # extract components directly from the structured response
        components = {"action": None, "utterance": None, "price": None}
        for line in parsed_response.split("\n"):
            line = line.strip().lower()
            if not line:
                continue
            for key in components:
                if line.startswith(f"{key}:"):
                    value = line.split(":", 1)[1].strip()
                    if key == "price":
                        try:
                            value = float(value.replace("$", ""))
                        except ValueError:
                            logger.warning(f"Invalid price format: {value}")
                            value = None
                    components[key] = value
                    break

        if not components["action"] or not components["utterance"]:
            raise ValueError(f"Invalid response format: {parsed_response}")
        return (components["utterance"], components["action"], components["price"])


    #! TURN
    def __call__(self, conversation_history: str, turns_left: int, base_model) -> Dialogue:
        """Generate and parse agent response."""
        # generate agent response
        prompt = self.get_prompt(conversation_history, turns_left)
        raw_response = self.generate_response(prompt)

        # parse into structured components
        utterance, action, price = self.parse_response(raw_response, base_model)
        logger.info(f"Parsed Response - Action: {action}, Utterance: {utterance}, Price: {price if price else 'None'}")

        return Dialogue(utterance, action, price, self.role)


class Buyer(Agent):
    """Buyer Agent."""
    def __init__(self, config: ModelConfig, kb: dict, tokenizer: object):
        super().__init__(config, "buyer", kb, tokenizer)

class Seller(Agent):
    """Seller Agent."""
    def __init__(self, config: ModelConfig, kb: dict, tokenizer: object):
        super().__init__(config, "seller", kb, tokenizer)

## Run Test Negotiations

In [None]:
@dataclass
class TestResult:
    """Structured result from a negotiation test scenario."""
    buyer_model: str
    seller_model: str
    accepted: bool
    num_turns: int
    final_price: Optional[float]

In [None]:
def run_test_scenarios(data_loader: object, model_configs: Dict[str, ModelConfig], tokenizer: object, num_scenarios: int):
    """Runs test negotiations using both baseline and finetuned models."""
    scenarios = data_loader.get_batch(split='test', batch_size=num_scenarios)
    assert len(scenarios) == num_scenarios
    logger.info(f"Running {num_scenarios} test scenarios...")

    results = []
    for scenario_dict in scenarios:
        scenario = Scenario(scenario_dict)

        # test configurations to evaluate
        test_pairs = [
            # Baseline Buyer vs Baseline Seller
            (model_configs["baseline"], model_configs["baseline"]),
            # Finetuned Buyer vs Baseline Seller
            (model_configs["buyer_finetuned"], model_configs["baseline"])
        ]

        for buyer_config, seller_config in test_pairs: # run negotiation for each pair
            if not buyer_config.model:
                raise ValueError(f"Model not loaded for buyer: {buyer_config.name.upper()} ({buyer_config.model_type})")
            if not seller_config.model:
                raise ValueError(f"Model not loaded for seller: {seller_config.name.upper()} ({seller_config.model_type})")

            # initialize agents and negotiation instance
            buyer = Buyer(buyer_config, scenario.buyer_kb, tokenizer)
            seller = Seller(seller_config, scenario.seller_kb, tokenizer)
            negotiation = Negotiation(
                scenario=scenario,
                buyer=buyer,
                seller=seller,
                base_model=model_configs["baseline"].model
            )
            logger.info(f"Running negotiation for: {scenario.title}")
            logger.info(f"with Buyer: {buyer_config.name.upper()}_{buyer_config.model_type}")
            logger.info(f"and Seller: {seller_config.name.upper()}_{seller_config.model_type}")

            # run negotiation and store results
            result = negotiation.run()
            results.append(TestResult(
                buyer_model=f"{buyer_config.name.upper()}_{buyer_config.model_type}",
                seller_model=f"{seller_config.name.upper()}_{seller_config.model_type}",
                accepted=result["accepted"],
                num_turns=result["num_turns"],
                final_price=result["final_price"]
            ))

    return results

### Initialize Loaders and Models

In [None]:
# define directories and model configs
finetuning_dir = os.getcwd()
model_configs = {
    "baseline": ModelConfig(
        name="baseline",
        model_type="llama-3.1-8B-instruct",
        path=os.path.join(finetuning_dir, "models--meta-llama--Llama-3.1-8B-Instruct")
    ),
    "buyer_finetuned": ModelConfig(
        name="buyer_finetuned",
        model_type="llama-3.1-8B-instruct",
        path=os.path.join(finetuning_dir, "models--buyer-finetuned--Llama-3.1-8B-Instruct")
    ),
    "seller_finetuned": ModelConfig(
        name="seller_finetuned",
        model_type="llama-3.1-8B-instruct",
        path=os.path.join(finetuning_dir, "models--seller-finetuned--Llama-3.1-8B-Instruct")
    ),
    "generalist_finetuned": ModelConfig(
        name="generalist_finetuned",
        model_type="llama-3.1-8B-instruct",
        path=os.path.join(finetuning_dir, "models--generalist-finetuned--Llama-3.1-8B-Instruct")
    )
}

# initialize loaders
data_loader = DataLoader()
model_loader = ModelLoader(cache_dir=model_configs["baseline"].path)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
loaded_models = []

# load models
logger.info("Loading models...")
for config_name, config in model_configs.items():
    try:
        if config_name == "baseline":
            config.model, base_tokenizer = model_loader.load_model_and_tokenizer(local_only=True)
            # set padding token to eos token if not already set
            if base_tokenizer.pad_token_id is None:
                base_tokenizer.pad_token = base_tokenizer.eos_token
                base_tokenizer.pad_token_id = base_tokenizer.eos_token_id
        else:
            config.model = model_loader.reload_model(config.path)
        config.model.eval().to(device)
        loaded_models.append(config.name)
        logger.info(f"Loaded {config.name.upper()} model to {config.model.device}")
    except Exception as e: # don't halt execution if a model fails to load
        logger.error(f"Failed to load {config.name.upper()} model: {e}")

if "baseline" not in loaded_models:
    raise ValueError("✗ Failed to load baseline model, which is required for testing.")
logger.info(f"✓ Models loaded successfully: {', '.join(model.upper() for model in loaded_models)}")

### Run Tests and Analyze Results

In [None]:
results = run_test_scenarios(data_loader=data_loader, model_configs=model_configs, tokenizer=base_tokenizer, num_scenarios=1)

In [None]:
def analyze_results(results: List[TestResult]) -> None:
    """Analyzes and logs negotiation test results."""
    df = pd.DataFrame([vars(r) for r in results])
    summary = df.groupby(['buyer_model', 'seller_model']).agg({
        'accepted': 'mean',
        'num_turns': 'mean',
        'final_price': lambda x: x[df['accepted']].mean()
    }).rename(columns={
        'accepted': 'Acceptance Rate',
        'num_turns': 'Average Turns',
        'final_price': 'Average Final Price'
    })

    logger.info("\nResults Summary:")
    logger.info(summary)

analyze_results(results)