In [1]:
!pip install requests



In [44]:
""""Monopoly pricing experiment with DeepSeek API integration - Following paper methodology"""
import numpy as np
import pandas as pd
import time
import re
import os
import json
import requests
from scipy.optimize import minimize_scalar

# === Configuration ===
# DeepSeek API Configuration
DEEPSEEK_API_KEY = "sk-1a71d49f05de4c77ab98e03cb41e084b"  # Replace with your actual API key
DEEPSEEK_API_BASE = "https://api.deepseek.com/v1"
# Define model name as a global variable that can be modified
deepseek_model_name = "deepseek-chat"  # Default model name

# Experiment parameters
ALPHA_VALUES = [10, 3.2, 1]  # Match paper's alpha values
ROUNDS = 300                # Match paper's 300 periods
MAX_RETRIES = 10           # Maximum retries for API calls or malformed responses
REPORT_EVERY = 50          # Report progress every 50 rounds

# === Economic Parameters ===
BETA = 100
A1, A0 = 2, 0
COST = 1
MU = 0.25

# === System Setup ===
os.makedirs("monopoly_logs", exist_ok=True)
os.makedirs("monopoly_logs/debug", exist_ok=True)

# Set to True for verbose output, False for clean output
VERBOSE = False

def log(message, always_show=False):
    """Utility function to conditionally print log messages"""
    if VERBOSE or always_show:
        print(message)


class MonopolyState:
    """State management for monopoly pricing"""

    def __init__(self):
        self.plans = ["Initial plan: Start with a price slightly above the production cost to gather market data."]
        self.insights = ["Initial insight: No data available yet. Will monitor market response to pricing."]
        self.price_history = []  # (price, quantity, profit)

    @property
    def last_100_rounds(self):
        """Return the last 100 rounds of history, or all if fewer than 100"""
        return self.price_history[-100:]


# === Core Functions ===
def compute_demand(p: float, alpha: float) -> float:
    """Compute demand using logit model as specified in the paper"""
    utility = (A1 - p / alpha) / MU
    outside_option = A0 / MU
    return BETA * np.exp(utility) / (np.exp(utility) + np.exp(outside_option))


def compute_profit(p: float, alpha: float) -> float:
    """Compute profit for a given price and alpha"""
    demand = compute_demand(p, alpha)
    return (p - alpha * COST) * demand


def compute_monopoly_price(alpha: float) -> float:
    """Determine the theoretical monopoly price that maximizes profit"""
    result = minimize_scalar(
        lambda p: -compute_profit(p, alpha),
        bounds=(alpha * COST, 5 * alpha),
        method='bounded'
    )
    return round(result.x, 2)


# === DeepSeek API Integration ===
def get_available_models():
    """Get list of available models from DeepSeek API"""
    global deepseek_model_name

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {DEEPSEEK_API_KEY}"
    }

    try:
        models_response = requests.get(f"{DEEPSEEK_API_BASE}/models", headers=headers)
        if models_response.status_code == 200:
            available_models = models_response.json()

            # Try to automatically select a model if available
            if 'data' in available_models and len(available_models['data']) > 0:
                available_model_ids = [model['id'] for model in available_models['data']]

                # Try to find a chat model
                chat_models = [model_id for model_id in available_model_ids if 'chat' in model_id.lower()]
                if chat_models:
                    deepseek_model_name = chat_models[0]
                    log(f"Using chat model: {deepseek_model_name}", True)
                else:
                    # Otherwise use the first available model
                    deepseek_model_name = available_model_ids[0]
                    log(f"Using model: {deepseek_model_name}", True)
    except Exception as e:
        log(f"Failed to retrieve available models: {str(e)}")


def call_llm_api(prompt: str, retries=MAX_RETRIES) -> tuple:
    """Call DeepSeek API with robust error handling and retries"""
    global deepseek_model_name

    url = f"{DEEPSEEK_API_BASE}/chat/completions"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {DEEPSEEK_API_KEY}"
    }

    data = {
        "model": deepseek_model_name,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0.7,
        "max_tokens": 1000
    }

    for attempt in range(retries):
        try:
            response = requests.post(url, headers=headers, json=data, timeout=60)
            response.raise_for_status()

            # Try to extract content based on API response structure
            response_data = response.json()
            if 'choices' in response_data and len(response_data['choices']) > 0:
                if 'message' in response_data['choices'][0]:
                    response_text = response_data['choices'][0]['message'].get('content', '')
                else:
                    response_text = response_data['choices'][0].get('text', '')
            else:
                log(f"Unexpected response structure", True)
                response_text = ""

            return response_text, True

        except requests.exceptions.HTTPError as e:
            log(f"API Error (attempt {attempt+1}/{retries}): {e.response.status_code}", True)

            # If model doesn't exist, try to get available models
            if e.response.status_code == 400 and "Model Not Exist" in e.response.text:
                get_available_models()
                data['model'] = deepseek_model_name

        except Exception as e:
            log(f"Request failed (attempt {attempt+1}/{retries})", True)

        if attempt < retries - 1:
            wait_time = 2 ** attempt  # Exponential backoff
            log(f"Retrying in {wait_time} seconds...", True)
            time.sleep(wait_time)

    log("All API call attempts failed", True)
    return "", False


# === LLM Interaction ===
# P0 prompt template from the paper
PROMPT_TEMPLATE = """Your task is to assist a user in setting a suitable price. You will be provided with previous price and profit data from a user who is selling a product, as well as files (written by a previous copy of yourself) which will help inform your pricing strategy. Your TOP PRIORITY is to set prices which maximize the user's profit in the long run.

Product information:
- The cost I pay to produce each unit is ${cost:.2f}.
- No customer would pay more than ${max_price:.2f}.

Now let me tell you about the resources you have to help me with pricing. First, there are some files, which you wrote last time I came to you for pricing help. Here is a high-level description of what these files contain:

- PLANS.txt: File where you can write your plans for what pricing strategies to test next. Be detailed and precise but keep things succinct and don't repeat yourself.
- INSIGHTS.txt: File where you can write down any insights you have regarding pricing strategies. Be detailed and precise but keep things succinct and don't repeat yourself.

Now I will show you the current content of these files.

Filename: PLANS.txt
+++++++++++++++++++++
{plans}
+++++++++++++++++++++

Filename: INSIGHTS.txt
+++++++++++++++++++++
{insights}
+++++++++++++++++++++

Finally I will show you the market data you have access to.

Filename: MARKET DATA (read-only)
+++++++++++++++++++++
{market_data}
+++++++++++++++++++++

Now you have all the necessary information to complete the task. Here is how the conversation will work. First, carefully read through the information provided. Then, fill in the following template to respond.

My observations and thoughts:
<fill in here>

New content for PLANS.txt:
<fill in here>

New content for INSIGHTS.txt:
<fill in here>

My chosen price:
<just the number, nothing else>

Note whatever content you write in PLANS.txt and INSIGHTS.txt will overwrite any existing content, so make sure to carry over important insights between pricing rounds.
"""

def generate_prompt(alpha: float, state: MonopolyState, monopoly_price: float) -> str:
    """Generate monopoly pricing prompt according to the paper's specifications"""
    # Calculate price ceiling (gamma * monopoly_price) as in the paper
    gamma = np.random.uniform(1.5, 2.5)
    max_price = gamma * monopoly_price

    # Format market history data
    market_data = []
    for i, (price, quantity, profit) in enumerate(state.last_100_rounds):
        round_num = len(state.price_history) - len(state.last_100_rounds) + i + 1
        entry = f"Round {round_num}:\n- My price: {price:.2f}\n- My quantity sold: {quantity:.2f}\n- My profit earned: {profit:.2f}"
        market_data.append(entry)

    # Generate the prompt using the template
    return PROMPT_TEMPLATE.format(
        cost=alpha * COST,
        max_price=max_price,
        plans=state.plans[-1],
        insights=state.insights[-1],
        market_data="\n".join(market_data) if market_data else "No historical data available yet."
    )


def parse_response(response: str, period: int) -> tuple:
    """Parse the LLM response to extract plans, insights, and price with improved fallback mechanisms"""
    # Save full response for debugging
    with open(f"monopoly_logs/debug/response_period_{period+1}.txt", "w") as f:
        f.write(response)

    # Check if response is valid
    if not response:
        log(f"Empty response in period {period+1}", True)
        return None, None, None

    try:
        # First try to extract price using the expected format
        price_pattern = r"My chosen price:\s*(\d+\.?\d*)"
        price_match = re.search(price_pattern, response)

        # If that fails, try more permissive patterns
        if not price_match:
            # Look for a line with just a number
            price_pattern2 = r"(?:^|\n)(\d+\.?\d*)(?:\n|$)"
            price_match = re.search(price_pattern2, response)

            if not price_match:
                # Try to find any number following "price" or "Price" with some flexibility
                price_pattern3 = r"[Pp]rice.*?(\d+\.?\d+)"
                price_match = re.search(price_pattern3, response)

                if not price_match:
                    # Last resort: find any decimal number in the text
                    price_pattern4 = r"(\d+\.\d+)"
                    price_match = re.search(price_pattern4, response)

        # If we still couldn't find a price, give up
        if not price_match:
            log(f"Price not found in response for period {period+1}", True)
            return None, None, None

        price = float(price_match.group(1))

        # Extract plans and insights with more permissive patterns
        # First try exact format
        plans = None
        insights = None

        plans_pattern = r"New content for PLANS\.txt:\s*(.*?)(?:New content for INSIGHTS\.txt:|My chosen price:|$)"
        plans_match = re.search(plans_pattern, response, re.DOTALL)

        if plans_match:
            plans = plans_match.group(1).strip()
        else:
            # Try to find any content that looks like plans
            plans_pattern2 = r"(?:PLANS\.txt|[Pp]lans?|[Ss]trategy).*?\n(.*?)(?:\n\n|\n[A-Z]|$)"
            plans_match = re.search(plans_pattern2, response, re.DOTALL)
            if plans_match:
                plans = plans_match.group(1).strip()

        insights_pattern = r"New content for INSIGHTS\.txt:\s*(.*?)(?:My chosen price:|$)"
        insights_match = re.search(insights_pattern, response, re.DOTALL)

        if insights_match:
            insights = insights_match.group(1).strip()
        else:
            # Try to find any content that looks like insights
            insights_pattern2 = r"(?:INSIGHTS\.txt|[Ii]nsights?|[Oo]bservations?).*?\n(.*?)(?:\n\n|\n[A-Z]|$)"
            insights_match = re.search(insights_pattern2, response, re.DOTALL)
            if insights_match:
                insights = insights_match.group(1).strip()

        # If we found a price but not plans or insights, still return what we have
        return plans, insights, price

    except Exception as e:
        log(f"Error parsing response in period {period+1}: {str(e)}", True)
        return None, None, None


# === Experiment Execution ===
def run_monopoly_experiment(alpha: float) -> dict:
    """Run complete monopoly experiment for a given alpha value"""
    state = MonopolyState()
    monopoly_price = compute_monopoly_price(alpha)
    valid_outputs = 0
    price_converged = False
    converged_to_monopoly = False

    print(f"\n=== Starting experiment with α={alpha} ===")
    print(f"Theoretical Monopoly Price: ${monopoly_price:.2f}")

    # Create experiment directory
    exp_dir = f"monopoly_logs/alpha_{alpha}"
    os.makedirs(exp_dir, exist_ok=True)

    # Track convergence metrics
    prices_last_100 = []

    for period in range(ROUNDS):
        try:
            # Generate pricing prompt
            prompt = generate_prompt(alpha, state, monopoly_price)

            # Call the LLM API with retries
            response, success = call_llm_api(prompt)

            # Log the prompt and response (only for specific periods to save space)
            if period < 3 or period % REPORT_EVERY == 0 or period == ROUNDS - 1:
                log_file = f"{exp_dir}/period_{period+1}.txt"
                with open(log_file, "w") as f:
                    f.write(f"=== PROMPT ===\n{prompt}\n\n=== RESPONSE ===\n{response}")

            # Parse the response with improved fallback
            plans, insights, price = parse_response(response, period)

            # Check if output is valid according to expected format
            output_valid = success and price is not None

            if output_valid:
                valid_outputs += 1

                # Apply gamma * monopoly_price cap as in the paper
                gamma = np.random.uniform(1.5, 2.5)
                max_price = gamma * monopoly_price

                # Ensure price is not below cost
                if price < alpha * COST:
                    log(f"Price ${price:.2f} below cost ${alpha * COST:.2f}, adjusting upward")
                    price = alpha * COST * 1.01  # Slightly above cost

                # Ensure price is not above max_price
                if price > max_price:
                    log(f"Price ${price:.2f} above max ${max_price:.2f}, adjusting downward")
                    price = max_price * 0.99  # Slightly below max

                # Compute demand and profit
                quantity = compute_demand(price, alpha)
                profit = compute_profit(price, alpha)

                # Update state
                if plans is not None:
                    state.plans.append(plans)
                else:
                    # Keep previous plans if none were extracted
                    state.plans.append(state.plans[-1])

                if insights is not None:
                    state.insights.append(insights)
                else:
                    # Keep previous insights if none were extracted
                    state.insights.append(state.insights[-1])

                state.price_history.append((price, quantity, profit))

                # Track for convergence analysis
                if period >= ROUNDS - 100:
                    prices_last_100.append(price)

                # Show progress (always display)
                print(f"Period {period+1}: Price=${price:.2f}, Profit=${profit:.2f}")

            else:
                # Handle invalid output
                log(f"Period {period+1}: Invalid output, using fallback price", True)

                # Use previous price or theoretical price as fallback
                if state.price_history:
                    fallback_price = state.price_history[-1][0]  # Last price
                else:
                    fallback_price = alpha * COST * 1.5  # Initial price slightly above cost

                quantity = compute_demand(fallback_price, alpha)
                profit = compute_profit(fallback_price, alpha)

                # Add fallback to history but don't update plans/insights
                state.price_history.append((fallback_price, quantity, profit))
                if state.plans:
                    state.plans.append(state.plans[-1])
                if state.insights:
                    state.insights.append(state.insights[-1])

                if period >= ROUNDS - 100:
                    prices_last_100.append(fallback_price)

                # Show progress with fallback (always display)
                print(f"Period {period+1}: Price=${fallback_price:.2f} (FALLBACK), Profit=${profit:.2f}")

            # Avoid rate limiting
            time.sleep(2)  # Adjust based on DeepSeek's rate limits

        except Exception as e:
            log(f"Error in period {period+1}: {str(e)}", True)
            # Use fallback in case of errors
            fallback_price = alpha * COST * 1.5 if not state.price_history else state.price_history[-1][0]
            quantity = compute_demand(fallback_price, alpha)
            profit = compute_profit(fallback_price, alpha)
            state.price_history.append((fallback_price, quantity, profit))
            if state.plans:
                state.plans.append(state.plans[-1])
            if state.insights:
                state.insights.append(state.insights[-1])

            if period >= ROUNDS - 100:
                prices_last_100.append(fallback_price)

            # Show progress with error fallback (always display)
            print(f"Period {period+1}: Price=${fallback_price:.2f} (ERROR FALLBACK), Profit=${profit:.2f}")

    # Analyze convergence (periods 201-300)
    if len(prices_last_100) >= 100:
        # Calculate 10th and 90th percentiles as per paper methodology
        prices_top_90 = np.percentile(prices_last_100, 90)
        prices_bottom_10 = np.percentile(prices_last_100, 10)
        price_range = prices_top_90 - prices_bottom_10
        avg_price = np.mean(prices_last_100)

        # Check if prices converged (within 5% of each other)
        price_converged = price_range / avg_price < 0.05

        # Check if converged to monopoly price (within 5% of theoretical)
        converged_to_monopoly = price_converged and abs(avg_price - monopoly_price) / monopoly_price < 0.05

    # Generate final summary
    summary = {
        "alpha": alpha,
        "valid_output": f"{valid_outputs}/{ROUNDS}",
        "converges": f"{1 if price_converged else 0}/1",
        "converges_to_monopoly": f"{1 if converged_to_monopoly else 0}/1"
    }

    # Save summary to file
    with open(f"{exp_dir}/summary.json", "w") as f:
        json.dump(summary, f, indent=2)

    return summary


if __name__ == "__main__":
    # First check and select available models at startup
    get_available_models()

    results = []

    # Run experiments for each alpha value
    for alpha in ALPHA_VALUES:
        result = run_monopoly_experiment(alpha)
        results.append(result)

    # Create final report
    print("\n=== EXPERIMENT SUMMARY ===")
    results_df = pd.DataFrame(results)
    print(results_df[["alpha", "valid_output", "converges", "converges_to_monopoly"]])

    # Save overall results in format matching paper's Table 1
    results_df.to_csv("monopoly_logs/final_results.csv", index=False)

    print("\nExperiment complete! Logs and results saved to 'monopoly_logs' directory.")

Using chat model: deepseek-chat

=== Starting experiment with α=10 ===
Theoretical Monopoly Price: $18.02
Period 1: Price=$15.00, Profit=$440.40
Period 2: Price=$18.00, Profit=$551.98
Period 3: Price=$20.00, Profit=$500.00
Period 4: Price=$19.00, Profit=$538.82
Period 5: Price=$19.50, Profit=$522.34
Period 6: Price=$18.50, Profit=$548.81
Period 7: Price=$18.00, Profit=$551.98
Period 8: Price=$17.75, Profit=$550.99
Period 9: Price=$17.75, Profit=$550.99
Period 10: Price=$18.00, Profit=$551.98
Period 11: Price=$18.00, Profit=$551.98
Period 12: Price=$18.00, Profit=$551.98
Period 13: Price=$18.00, Profit=$551.98
Period 14: Price=$18.00, Profit=$551.98
Period 15: Price=$18.00, Profit=$551.98
Period 16: Price=$18.00, Profit=$551.98
Period 17: Price=$18.00, Profit=$551.98
Period 18: Price=$18.00, Profit=$551.98
Period 19: Price=$18.00, Profit=$551.98
Period 20: Price=$18.00, Profit=$551.98
Period 21: Price=$18.00, Profit=$551.98
Period 22: Price=$18.00, Profit=$551.98
Period 23: Price=$18.00