In [None]:
import os
import subprocess
import time
import bittensor as bt

pow_timeout = 30
pow_min_difficulty = 7
pow_max_difficulty = 12

success_weight = 1
difficulty_weight = 1
time_elapsed_weight = 0.3
failed_penalty_weight = 0.4
allocation_weight = 0.21

max_score_challenge = 100 * (success_weight + difficulty_weight + time_elapsed_weight)
max_score = max_score_challenge + 100 * allocation_weight

failed_penalty_exp = 1.5

# Define the stock python reward function

def normalize(val, min_value, max_value):
    return (val - min_value) / (max_value - min_value)


def prevent_none(val):
    return 0 if not val else val


def percent(a, b):
    if b == 0:
        return 0
    return (a / b) * 100


def percent_yield(a, b):
    if a == 0:
        return 100
    return ((b - a) / b) * 100


def calc_score(
    response,
    hotkey,
    allocated_hotkeys,
    penalized_hotkeys,
    validator_hotkeys,
    jolt_params,
    i
):
    try:
        challenge_attempts = prevent_none(response.get("challenge_attempts", 1))
        challenge_successes = prevent_none(response.get("challenge_successes", 0))
        last_20_challenge_failed = prevent_none(
            response.get("last_20_challenge_failed", 0)
        )
        challenge_elapsed_time_avg = prevent_none(
            response.get("challenge_elapsed_time_avg", pow_timeout)
        )
        challenge_difficulty_avg = prevent_none(
            response.get("challenge_difficulty_avg", pow_min_difficulty)
        )
        has_docker = response.get("has_docker", False)

        assert challenge_attempts == jolt_params["challenge_attempts"][i], f"challenge_attempts: {challenge_attempts} != {jolt_params['challenge_attempts'][i]}"
        assert challenge_successes == jolt_params["challenge_successes"][i], f"challenge_successes: {challenge_successes} != {jolt_params['challenge_successes'][i]}"
        assert last_20_challenge_failed == jolt_params["last_20_challenge_failed"][i], f"last_20_challenge_failed: {last_20_challenge_failed} != {jolt_params['last_20_challenge_failed'][i]}"
        assert challenge_elapsed_time_avg == jolt_params["challenge_elapsed_time_avg"][i], f"challenge_elapsed_time_avg: {challenge_elapsed_time_avg} != {jolt_params['challenge_elapsed_time_avg'][i]}"
        assert challenge_difficulty_avg == jolt_params["challenge_difficulty_avg"][i], f"challenge_difficulty_avg: {challenge_difficulty_avg} != {jolt_params['challenge_difficulty_avg'][i]}"
        assert has_docker == jolt_params["has_docker"][i], f"has_docker: {has_docker} != {jolt_params['has_docker'][i]}"

        difficulty_val = max(
            min(challenge_difficulty_avg, pow_max_difficulty),
            pow_min_difficulty,
        )
        difficulty_modifier = percent(difficulty_val, pow_max_difficulty)

        difficulty = difficulty_modifier * difficulty_weight
        successes_ratio = percent(challenge_successes, challenge_attempts)
        successes = successes_ratio * success_weight

        time_elapsed_modifier = percent_yield(
            challenge_elapsed_time_avg, pow_timeout
        )
        time_elapsed = time_elapsed_modifier * time_elapsed_weight

        last_20_challenge_failed_modifier = percent(
            last_20_challenge_failed, 20
        )
        failed_penalty = (
            failed_penalty_weight
            * (last_20_challenge_failed_modifier / 100) ** failed_penalty_exp
            * 100
        )

        allocation_score = difficulty_modifier * allocation_weight
        allocation_status = hotkey in allocated_hotkeys

        assert max_score_challenge == 100 * (
            success_weight + difficulty_weight + time_elapsed_weight
        )
        max_score_allocation = 100 * allocation_weight
        assert max_score == max_score_challenge + max_score_allocation
        final_score = difficulty + successes + time_elapsed - failed_penalty

        penalty = not (has_docker)

        if allocation_status:
            final_score = (
                max_score_challenge * (1 - allocation_weight) + allocation_score
            )
        else:
            final_score = difficulty + successes + time_elapsed - failed_penalty
            if penalty:
                final_score = final_score / 2

        if (
            last_20_challenge_failed >= 19 or challenge_successes == 0
        ) and not allocation_status:
            print("Returning 0 due to high failure rate or no successes")
            return 0

        if hotkey in penalized_hotkeys:
            penalty_count = penalized_hotkeys.count(hotkey)
            half_validators = len(validator_hotkeys) / 2
            assert half_validators == jolt_params["half_validators"], f"half_validators: {half_validators} != {jolt_params['half_validators']}"

            if penalty_count >= half_validators:
                final_score = 0
            else:
                penalty_multiplier = max(1 - (penalty_count / half_validators), 0)
                final_score *= penalty_multiplier

        final_score = max(0, final_score)

        normalized_score = normalize(final_score, 0, max_score)

        return normalized_score
    except Exception as e:
        bt.logging.error(
            f"An error occurred while calculating score for the following hotkey - {hotkey}: {e}"
        )
        return 0


runtimes = [0]
time_now = time.time()
existing_dir = os.getcwd()
os.chdir(
    "../../neurons/deployment_layer/model_1d60d545b7c5123fd60524dcbaf57081ca7dc4a9ec36c892927a3153328d17c0"
)
subprocess.run(["cargo", "build", "--release"], check=True)
runtimes.append(time.time() - time_now)
time_now = time.time()

In [None]:
from typing import List, Dict, Any
import random
import json
import subprocess
import numpy as np
from rich.console import Console
from rich.table import Table
import matplotlib.pyplot as plt

BATCH_SIZE = 256

def generate_test_data(batch_size: int):
    # Generate random test data tensors
    def generate_tensor(min_val, max_val, batch_size):
        return torch.tensor(
            [[float(rand.randint(min_val, max_val))] for _ in range(batch_size)]
        )

    def generate_random_tensor(min_val, max_val, batch_size):
        return torch.tensor(
            [[min_val + rand.random() * (max_val - min_val)] for _ in range(batch_size)]
        )

    def generate_uid_tensor(batch_size, prob=0.1):
        return torch.tensor(
            [
                [rand.randint(0, 256) if rand.random() < prob else 0]
                for _ in range(batch_size)
            ]
        )

    challenge_attempts = generate_tensor(5, 10, batch_size)
    challenge_successes = generate_tensor(4, 8, batch_size)
    last_20_challenge_failed = generate_tensor(0, 3, batch_size)
    challenge_elapsed_time_avg = generate_random_tensor(4.0, 8.0, batch_size)
    last_20_difficulty_avg = generate_random_tensor(1.5, 2.5, batch_size)
    has_docker = torch.tensor([[True] for _ in range(batch_size)])
    uid = generate_tensor(0, 256, batch_size)
    allocated_uids = generate_uid_tensor(batch_size)
    penalized_uids = generate_uid_tensor(batch_size)
    validator_uids = generate_uid_tensor(batch_size)

    return {
        "challenge_attempts": challenge_attempts,
        "challenge_successes": challenge_successes,
        "last_20_challenge_failed": last_20_challenge_failed,
        "challenge_elapsed_time_avg": challenge_elapsed_time_avg,
        "last_20_difficulty_avg": last_20_difficulty_avg,
        "has_docker": has_docker,
        "uid": uid,
        "allocated_uids": allocated_uids,
        "penalized_uids": penalized_uids,
        "validator_uids": validator_uids
    }

def compare_scores(circuit_scores: torch.Tensor, jolt_scores: List[float]) -> None:
    table = Table(title="Score Comparison")
    table.add_column("UID", justify="right", style="cyan")
    table.add_column("Circuit Score", justify="right", style="magenta")
    table.add_column("Jolt Score", justify="right", style="green")
    table.add_column("Difference", justify="right", style="red")

    red_diff_count = 0
    total_diff = 0.0

    for i, (circ, jolt) in enumerate(zip(circuit_scores, jolt_scores)):
        diff = abs(float(circ) - float(jolt))
        total_diff += diff
        diff_color = "red" if diff > 1e-6 else "green"
        red_diff_count += diff_color == "red"

        table.add_row(
            f"{i}",
            f"{float(circ):.6f}",
            f"{float(jolt):.6f}",
            f"[{diff_color}]{diff:+.6f}[/{diff_color}]"
        )

    total_count = len(circuit_scores)
    red_percentage = (red_diff_count / total_count) * 100
    avg_diff = total_diff / total_count

    table.add_row("", "", "", "")
    table.add_row("", "", "High Loss %:", f"[red]{red_percentage:.2f}%[/red]")
    table.add_row("", "", "Avg Difference:", f"[yellow]{avg_diff:.6f}[/yellow]")

    console = Console()
    console.width = 140
    console.print(table)

def plot_scores(circuit_scores: torch.Tensor, jolt_scores: List[float]) -> None:
    def sort_and_unzip(scores):
        return zip(*sorted(enumerate(scores), key=lambda x: x[1]))

    circuit_indices, circuit_sorted = sort_and_unzip([float(x) for x in circuit_scores])
    jolt_indices, jolt_sorted = sort_and_unzip(jolt_scores)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))

    ax1.scatter(range(len(circuit_sorted)), circuit_sorted, alpha=0.6)
    ax1.set_title('Circuit Scores (Sorted)')
    ax1.set_xlabel('Index (Sorted by Score)')
    ax1.set_ylabel('Score')

    ax2.scatter(range(len(jolt_sorted)), jolt_sorted, alpha=0.6)
    ax2.set_title('Jolt Scores (Sorted)')
    ax2.set_xlabel('Index (Sorted by Score)')
    ax2.set_ylabel('Score')

    plt.tight_layout()
    plt.show()

def main():
    circuit = Circuit()
    test_data = generate_test_data(BATCH_SIZE)

    # Run circuit
    circuit_scores = circuit(**test_data)

    # Run Jolt comparison
    with open("input.json", "r") as f:
        jolt_inputs = json.load(f)

    subprocess.run(
        ["cargo", "run", "--release", "prove", "--input", "input.json", "--output", "output.json", "--proof", "proof.bin"],
        check=True
    )

    with open("output.json", "r") as f:
        jolt_scores = json.load(f)

    compare_scores(circuit_scores, jolt_scores)
    plot_scores(circuit_scores, jolt_scores)

if __name__ == "__main__":
    main()