In [14]:
import json
import csv
import polars as pl
import seaborn as sns
from datetime import datetime
from dateutil import parser
import os

price_file = "../../../benchmark/prices.json"
with open(price_file) as f:
    prices = json.load(f)

vcpu_price = 0.004
mem_gb_price = 0.001


def get_price_map():
    price_map = {}
    for gpu_obj in prices["items"]:
        gpu_name = gpu_obj["name"].lower()
        price_map[gpu_name] = {}
        for price_obj in gpu_obj["prices"]:
            price_map[gpu_name][price_obj["priority"]
                                ] = float(price_obj["price"])
    return price_map


price_map = get_price_map()
gpt4o_mini_price_per_million = {
    "input": .15,
    "output": .6
}
gpt4o_mini_token_multiplier = 2.0


def get_df(gpu):
    datafile = f"{gpu}.jsonl"
    node_counts = f"{gpu}-node-count.csv"
    test_config = f"{gpu}-test-config.json"

    df_file = f"{gpu}-df.csv"

    with open(test_config) as f:
        test_config = json.load(f)

    gpu_uuid = test_config["container"]["resources"]["gpu_classes"][0]
    gpu_obj = next((x for x in prices["items"] if x["id"] == gpu_uuid), None)
    gpu_name = gpu_obj["name"].lower()

    cost = {}
    for priority in price_map[gpu_name]:
        cost[priority] = price_map[gpu_name][priority] + (vcpu_price * test_config["container"]["resources"]["cpu"]) + (
            mem_gb_price * test_config["container"]["resources"]["memory"] // 1024)

    if os.path.exists(df_file):
        return pl.read_csv(df_file), cost

    all_results = []

    with open(datafile) as f:
        for line in f:
            if not line:
                continue
            data = json.loads(line)
            if "data" in data and "time" in data["data"]:
                data["data"]["time"] = parser.isoparse(data["data"]["time"])
                all_results.append(data)

    tz = all_results[0]["data"]["time"].tzinfo

    with open(node_counts) as f:
        reader = csv.reader(f)
        for row in reader:
            if not row:
                continue
            time, count = row
            if not time or not count:
                continue
            all_results.append({
                "type": "Point",
                "metric": "node_count",
                "data": {
                    "time": datetime.fromtimestamp(int(time), tz=tz),
                    "value": int(count)
                }
            })

    metrics = ["http_req_duration", "http_req_failed",
               "vus", "node_count", "inputTokens", "outputTokens"]

    all_results = sorted(all_results, key=lambda x: x["data"]["time"])

    first_time = all_results[0]["data"]["time"]
    all_results = [x for x in all_results if x["type"]
                   == "Point" and x["metric"] in metrics]
    results = []
    for result in all_results:
        time_from_start = (result["data"]["time"] - first_time).total_seconds()
        value = result["data"]["value"]
        metric = result["metric"]
        results.append({
            "time_from_start": time_from_start,
            "value": value,
            "metric": metric,
            "gpu": gpu_name,
            "cpu": test_config["container"]["resources"]["cpu"],
            "memory": test_config["container"]["resources"]["memory"],
        })

    df = pl.DataFrame(results)
    
    
    df.sort("time_from_start", multithreaded=True)
    df.write_csv(df_file)
    return df, cost


import polars as pl

def tokens_per_second_by_vu(df, window_size=5.0, min_vu=30):
    """
    Calculate rolling average input and output tokens per second grouped by number of VUs,
    with times rounded to the nearest second and filtering out VU counts below a minimum.
    
    Parameters:
    df (pl.DataFrame): Polars dataframe with schema containing time_from_start, value, metric
    window_size (float): Size of the rolling window in seconds (default: 5.0)
    min_vu (int): Minimum VU count to include in results (default: 30)
    
    Returns:
    pl.DataFrame: Dataframe with rolling token throughput statistics by VU count
    """
    # Round times to the nearest second 
    df = df.with_columns(
        pl.col("time_from_start").round(0).alias("time_rounded")
    )
    
    # Extract the metrics using the rounded time
    vu_counts = df.filter(pl.col("metric") == "vus")
    input_tokens = df.filter(pl.col("metric") == "inputTokens")
    output_tokens = df.filter(pl.col("metric") == "outputTokens")
    
    # Sort all dataframes by rounded time
    vu_counts = vu_counts.sort("time_rounded")
    input_tokens = input_tokens.sort("time_rounded")
    output_tokens = output_tokens.sort("time_rounded")
    
    # Create a combined dataframe with all unique rounded time points
    all_times = pl.concat([
        vu_counts.select("time_rounded"),
        input_tokens.select("time_rounded"),
        output_tokens.select("time_rounded")
    ]).unique().sort("time_rounded")
    
    # For each time point, we'll calculate the tokens in the past window_size seconds
    result_rows = []
    
    for time_point in all_times["time_rounded"]:
        window_start = time_point - window_size
        
        # Find the latest VU count at or before this time
        vu_at_time = vu_counts.filter(pl.col("time_rounded") <= time_point)
        if len(vu_at_time) > 0:
            latest_vu = vu_at_time.sort("time_rounded", descending=True).head(1)["value"][0]
            
            # Skip if VU count is below minimum threshold
            if latest_vu < min_vu:
                continue
        else:
            continue  # Skip if no VU data available
        
        # Calculate input tokens in the window
        input_in_window = input_tokens.filter(
            (pl.col("time_rounded") > window_start) & 
            (pl.col("time_rounded") <= time_point)
        )
        input_token_count = input_in_window["value"].sum() if len(input_in_window) > 0 else 0
        
        # Calculate output tokens in the window
        output_in_window = output_tokens.filter(
            (pl.col("time_rounded") > window_start) & 
            (pl.col("time_rounded") <= time_point)
        )
        output_token_count = output_in_window["value"].sum() if len(output_in_window) > 0 else 0
        
        # Calculate tokens per second
        input_tokens_per_second = input_token_count / window_size
        output_tokens_per_second = output_token_count / window_size
        
        result_rows.append({
            "time_from_start": time_point,
            "vu_count": latest_vu,
            "input_tokens_per_second": input_tokens_per_second,
            "output_tokens_per_second": output_tokens_per_second,
            "total_tokens_per_second": input_tokens_per_second + output_tokens_per_second
        })
    
    # Convert to Polars DataFrame
    rolling_df = pl.DataFrame(result_rows)
    
    # Group by VU count to get averages
    if len(rolling_df) > 0:
        tokens_by_vu = rolling_df.group_by("vu_count").agg(
            pl.col("input_tokens_per_second").mean().alias("avg_input_tokens_per_second"),
            pl.col("output_tokens_per_second").mean().alias("avg_output_tokens_per_second"),
            pl.col("total_tokens_per_second").mean().alias("avg_total_tokens_per_second"),
            pl.count().alias("sample_count")
        ).sort("vu_count")
        
        # Additional filter to ensure we only include VU counts >= min_vu
        tokens_by_vu = tokens_by_vu.filter(pl.col("vu_count") >= min_vu)
        
        return tokens_by_vu
    else:
        # Return empty DataFrame with correct schema if no data
        return pl.DataFrame({
            "vu_count": [],
            "avg_input_tokens_per_second": [],
            "avg_output_tokens_per_second": [],
            "avg_total_tokens_per_second": [],
            "sample_count": []
        })


def get_gpt4o_mini_price(df):
    total_input_tokens = df.filter(pl.col("metric") == "inputTokens").select(
        pl.col("value")).sum().item()
    total_output_tokens = df.filter(pl.col("metric") == "outputTokens").select(
        pl.col("value")).sum().item()
    gpt4o_mini_price = gpt4o_mini_price_per_million["input"] * total_input_tokens * \
        gpt4o_mini_token_multiplier / 1e6 + \
        gpt4o_mini_price_per_million["output"] * total_output_tokens / 1e6
    return gpt4o_mini_price


def find_best_throughput(tokens_by_vu_df):
    """
    Finds the best total token throughput and the corresponding VU count.
    
    Parameters:
    tokens_by_vu_df (pl.DataFrame): Output from tokens_per_second_by_vu function
    
    Returns:
    tuple: (best_throughput, optimal_vu_count, throughput_data)
        - best_throughput (float): Maximum total tokens per second
        - optimal_vu_count (float): VU count that achieved the maximum throughput
        - throughput_data (dict): Dictionary with detailed throughput metrics
    """
    if len(tokens_by_vu_df) == 0:
        return (0, 0, {
            "best_throughput": 0,
            "optimal_vu_count": 0,
            "input_throughput": 0,
            "output_throughput": 0,
            "sample_count": 0
        })
    
    # Sort by total throughput in descending order and get the top row
    best_row = tokens_by_vu_df.sort("avg_total_tokens_per_second", descending=True).head(1)
    
    # Extract the values
    best_throughput = best_row["avg_total_tokens_per_second"][0]
    optimal_vu_count = best_row["vu_count"][0]
    input_throughput = best_row["avg_input_tokens_per_second"][0]
    output_throughput = best_row["avg_output_tokens_per_second"][0]
    sample_count = best_row["sample_count"][0]
    
    # Create a dictionary with detailed metrics
    throughput_data = {
        "best_total_throughput": best_throughput,
        "optimal_vu_count": optimal_vu_count,
        "input_throughput": input_throughput,
        "output_throughput": output_throughput,
        "sample_count": sample_count
    }
    
    return throughput_data


def calculate_token_prices(throughput_data, cost_of_cluster_per_second, output_multiplier=4):
    """
    Calculate the price per token given cluster cost and throughput data, with output tokens costing 
    more than input tokens.
    
    Parameters:
    throughput_data (dict): Output from find_best_throughput function
    cost_of_cluster_per_second (float): Cost of running the cluster per second in dollars
    output_multiplier (float): How much more output tokens cost vs input (default: 4)
    
    Returns:
    tuple: (input_price_per_token, output_price_per_token, price_details)
    """
    # Extract the values from throughput_data
    input_tokens_per_second = throughput_data["input_throughput"]
    output_tokens_per_second = throughput_data["output_throughput"]
    
    # Calculate token prices using constraint: cost = input_price*input_tokens + output_price*output_tokens
    # where output_price = input_price * output_multiplier
    
    weighted_token_sum = input_tokens_per_second + (output_multiplier * output_tokens_per_second)
    
    if weighted_token_sum > 0:
        input_price_per_token = cost_of_cluster_per_second / weighted_token_sum
        output_price_per_token = input_price_per_token * output_multiplier
        
        # Calculate price per million tokens
        input_price_per_million = input_price_per_token * 1_000_000
        output_price_per_million = output_price_per_token * 1_000_000
        
        # Calculate the cost allocation
        input_cost = input_tokens_per_second * input_price_per_token
        output_cost = output_tokens_per_second * output_price_per_token
        
        # Calculate percentage of cluster cost allocated to each token type
        input_cost_percentage = (input_cost / cost_of_cluster_per_second) * 100
        output_cost_percentage = (output_cost / cost_of_cluster_per_second) * 100
        
        price_details = {
            "input_price_per_token": input_price_per_token,
            "output_price_per_token": output_price_per_token,
            "input_price_per_million": input_price_per_million,
            "output_price_per_million": output_price_per_million,
            "input_cost_percentage": input_cost_percentage,
            "output_cost_percentage": output_cost_percentage,
            "input_tokens_per_second": input_tokens_per_second,
            "output_tokens_per_second": output_tokens_per_second,
            "total_tokens_per_second": input_tokens_per_second + output_tokens_per_second
        }
        
        return (input_price_per_token, output_price_per_token, price_details)
    else:
        # Handle the case where there are no tokens
        return (0, 0, {
            "error": "No token throughput data available"
        })


def process_gpu(gpu):
    df, cost = get_df(gpu)
    tokens_by_vu = tokens_per_second_by_vu(df, window_size=60.0)
    throughput_data = find_best_throughput(tokens_by_vu)
    print(throughput_data)
    # gpt4o_mini_price = get_gpt4o_mini_price(df)
    # avg_input_tokens_per_image = df.filter(pl.col("metric") == "inputTokens").select(pl.col("value")).mean().item()
    # test_duration = df.filter(pl.col("metric") == "http_req_duration").select(pl.col("time_from_start")).max().item()
    max_nodes = df.filter(pl.col("metric") == "node_count").select(pl.col("value")).max().item()
    cost_of_cluster_per_second = max_nodes * cost["batch"] / 3600
    input_price_per_token, output_price_per_token, price_details = calculate_token_prices(
        throughput_data, cost_of_cluster_per_second)
    print(price_details)
    


gpus = ["3090"]
for gpu in gpus:
    print(json.dumps(process_gpu(gpu), indent=2))

  pl.count().alias("sample_count")


{'best_total_throughput': 16150.352500000003, 'optimal_vu_count': 193.0, 'input_throughput': 13654.386666666667, 'output_throughput': 2495.9658333333327, 'sample_count': 20}
{'input_price_per_token': 1.363139074264052e-08, 'output_price_per_token': 5.452556297056208e-08, 'input_price_per_million': 0.01363139074264052, 'output_price_per_million': 0.05452556297056208, 'input_cost_percentage': 57.76394896689335, 'output_cost_percentage': 42.23605103310664, 'input_tokens_per_second': 13654.386666666667, 'output_tokens_per_second': 2495.9658333333327, 'total_tokens_per_second': 16150.3525}
null
