In [None]:
# Load pickle from data folder

import pickle
import json

# Load the pickle file
with open("data/curve_trades.pickle", "rb") as f:
    data_trades = pickle.load(f)

with open("data/curve_liquidity.pickle", "rb") as f:
    data_liquidity = pickle.load(f)

all_data_sorted = sorted([*data_trades, *data_liquidity], key=lambda x: x["block_number"])

# filter out where pool_state is null
all_data_sorted = [x for x in all_data_sorted if x["pool_state"] is not None]
print(len(all_data_sorted))
# Print the first 10 trades (pretty json)
print(json.dumps(all_data_sorted[-30:], indent=4))

In [None]:
# Run the rebalancing analysis
def analyze_rebalancing(trades_data):
    rebalance_stats = {
        "not_met": 0,
        "attempts": 0,  # Times condition was met
        "successful": 0,  # Times virtual_price decreased after condition met
        "failed": 0,  # Times virtual_price increased after condition met
        "events": [],  # Detailed info about each rebalance attempt
    }

    for i in range(1, len(trades_data)):
        current = trades_data[i]["pool_state"]
        previous = trades_data[i - 1]["pool_state"]

        # Calculate rebalancing condition for previous state
        vp_squared = (previous["virtual_price"]) ** 2
        threshold = previous["xcp_profit"] * (10**18 + 2 * previous["allowed_extra_profit"])
        condition_met = vp_squared > threshold
        status = ""

        if not condition_met:
            rebalance_stats["not_met"] += 1
            status = "not_met"
        else:
            # Condition was met, this is a rebalance attempt
            rebalance_stats["attempts"] += 1

            vp_change = current["virtual_price"] - previous["virtual_price"]
            # xcp_change = current["xcp_profit"] - previous["xcp_profit"]

            if vp_change < 0:
                # virtual price decreased => rebalancing worked
                rebalance_stats["successful"] += 1
                status = "successful"
            else:
                # virtual price increased => rebalancing failed
                rebalance_stats["failed"] += 1
                status = "failed"

        # Store detailed information about this rebalance attempt
        rebalance_stats["events"].append(
            {
                "index": i,
                "ramp": previous["a"] != current["a"] or previous["gamma"] != current["gamma"],
                "block": trades_data[i]["block_number"],
                "time": trades_data[i]["time"],
                "tx_hash": trades_data[i]["transaction_hash"],
                "vp_before": previous["virtual_price"],
                "vp_after": current["virtual_price"],
                "vp_change": current["virtual_price"] - previous["virtual_price"],
                "xcp_change": current["xcp_profit"] - previous["xcp_profit"],
                "condition_met": condition_met,
                "status": status,
                "condition_ratio": vp_squared / threshold if threshold != 0 else 0,
                "allowed_extra_profit": previous["allowed_extra_profit"],
                "xcp_profit": previous["xcp_profit"],
            }
        )

    return rebalance_stats

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# Analyze rebalancing
rebalance_stats = analyze_rebalancing(all_data_sorted)

# Extract data for plotting
data_points = []
for i, tx in enumerate(all_data_sorted):
    data_points.append(
        {
            "index": i,
            "block": tx["block_number"],
            "time": tx["time"],
            "virtual_price": tx["pool_state"]["virtual_price"] / 10**18,
            "xcp_profit": tx["pool_state"]["xcp_profit"] / 10**18,
            "tx_hash": tx["transaction_hash"],
        }
    )

# Convert to DataFrame
df = pd.DataFrame(data_points)
df["time"] = pd.to_datetime(df["time"])

# Create events DataFrame
events_df = pd.DataFrame(rebalance_stats["events"])
events_df["time"] = pd.to_datetime(events_df["time"])

# Find successful and failed rebalances
successful_rebalances = events_df[events_df["status"] == "successful"]
failed_rebalances = events_df[events_df["status"] == "failed"]

ramping_events = events_df[events_df["ramp"]]

# Create plot
plt.figure(figsize=(16, 10))

# Plot virtual price and xcp_profit
plt.plot(df["time"], df["virtual_price"], "b-", label="Virtual Price")
# plt.plot(df['time'], df['xcp_profit'], 'g-', label='XCP Profit')
plt.plot(df["time"], 1 + (df["xcp_profit"] - 1) / 2, "r:", label="XCP Profit/2")
plt.plot(df["time"], np.sqrt(df["xcp_profit"]), "orange", linestyle=":", label="sqrt(XCP Profit)")

# Mark successful rebalances with green X
if not successful_rebalances.empty:
    indices = successful_rebalances["index"].values
    plt.plot(
        df["time"].iloc[indices],
        df["virtual_price"].iloc[indices],
        "gx",
        markersize=12,
        label="Successful Rebalance",
        alpha=0.7,
    )

# Mark failed rebalances with red X
if not failed_rebalances.empty:
    indices = failed_rebalances["index"].values
    plt.plot(
        df["time"].iloc[indices],
        df["virtual_price"].iloc[indices],
        "rx",
        markersize=12,
        label="Failed Rebalance",
        alpha=0.7,
    )

# Mark ramping events with "!" symbols
if not ramping_events.empty:
    indices = ramping_events["index"].values
    # Get y positions for markers
    y_positions = df["virtual_price"].iloc[indices] + 0.0003

    # Plot markers
    plt.plot(df["time"].iloc[indices], y_positions, "y^", markersize=2, label="Ramping", alpha=0.7)

# Formatting
plt.title("Virtual Price and XCP Profit with Rebalance Events", fontsize=16)
plt.xlabel("Time", fontsize=14)
plt.ylabel("Value", fontsize=14)
plt.grid(True, alpha=0.3)
plt.legend(fontsize=12)

# Add annotation for rebalance counts
plt.figtext(
    0.35,
    0.85,
    f"Total Rebalance Attempts: {rebalance_stats['attempts']}\n"
    + f"Successful: {rebalance_stats['successful']} ({rebalance_stats['successful']/max(1, rebalance_stats['attempts'])*100:.1f}%)\n"
    + f"Failed: {rebalance_stats['failed']} ({rebalance_stats['failed']/max(1, rebalance_stats['attempts'])*100:.1f}%)",
    fontsize=12,
    bbox=dict(facecolor="white", alpha=0.8),
)

plt.tight_layout()
plt.show()

# Print summary statistics
print(f"Total swaps/liquidity events: {len(all_data_sorted)}")
print(f"Total rebalance attempts: {rebalance_stats['attempts']}")
print(
    f"Successful rebalances: {rebalance_stats['successful']} ({rebalance_stats['successful']/max(1, rebalance_stats['attempts'])*100:.1f}%)"
)
print(
    f"Failed rebalances: {rebalance_stats['failed']} ({rebalance_stats['failed']/max(1, rebalance_stats['attempts'])*100:.1f}%)"
)

In [None]:
diff = df["virtual_price"] - df["xcp_profit"]
plt.plot(df["time"], df["virtual_price"], label="Virtual Price")
plt.plot(df["time"], df["xcp_profit"], label="XCP Profit")
plt.legend(fontsize=12)
plt.show()

In [None]:
vp = df["virtual_price"]
xcp_profit = np.array([1])
for i in range(1, len(vp)):
    xcpp_prev = xcp_profit[i - 1]
    vp_cur = vp[i]
    vp_prev = vp[i - 1]
    xcp_profit = np.append(xcp_profit, xcpp_prev * vp_cur / vp_prev)

print(vp)
print(xcp_profit)

plt.subplot(2, 1, 1)
plt.plot(df["time"], vp, label="Virtual Price")
plt.plot(df["time"], xcp_profit, label="XCP Profit")
plt.legend(fontsize=12)
plt.show()

plt.subplot(2, 1, 2)
plt.plot(df["time"], df["virtual_price"] - df["xcp_profit"], label="diff")
plt.legend(fontsize=12)
plt.show()

In [None]:
print(df["virtual_price"][0])
print(df["virtual_price"][1])
print(df["xcp_profit"][0])

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from decimal import Decimal

# Configuration
PRECISION = Decimal("1e18")  # 10^18 precision as used in Vyper
NUM_STEPS = 100


# Function to update xcp_profit as in the Curve contract
def update_xcp_profit(old_xcp_profit, virtual_price, old_virtual_price):
    return old_xcp_profit * virtual_price // old_virtual_price


# Generate a virtual price series with random fluctuations
def main():
    # Initialize both metrics at 10^18 (1.0)
    virtual_prices = [PRECISION]
    xcp_profits = [PRECISION]

    # Generate random virtual price changes
    for i in range(1, NUM_STEPS):
        # Random change between 0% and +1.5% (slightly biased toward growth)
        # emulate swap
        if i < 50 or i > 70:
            change_pct = Decimal(np.random.uniform(0.00, 0.015))

        # Calculate new virtual price (simplified from contract)
        new_vp = virtual_prices[-1] * (Decimal("1") + change_pct)

        # Ensure integer arithmetic like in contract
        new_vp = Decimal(int(new_vp))

        # Calculate new xcp_profit
        new_xcp = update_xcp_profit(xcp_profits[-1], new_vp, virtual_prices[-1])

        # emulate tweak_price
        if i > 50 and i < 70:
            change_pct = Decimal(np.random.uniform(-0.005, -0.001))
            # Calculate new virtual price (simplified from contract)
            new_vp = virtual_prices[-1] * (Decimal("1") + change_pct)

        virtual_prices.append(new_vp)
        xcp_profits.append(new_xcp)

    # Convert to regular floats for plotting (normalize to 1.0)
    vp_display = [float(v / PRECISION) for v in virtual_prices]
    xcp_display = [float(x / PRECISION) for x in xcp_profits]

    # Calculate the ratio between them
    ratios = [xcp / vp for xcp, vp in zip(xcp_display, vp_display)]

    # Visualization
    plt.figure(figsize=(10, 8))

    # Plot values
    plt.subplot(2, 1, 1)
    plt.plot(vp_display, label="Virtual Price", linewidth=2)
    plt.plot(xcp_display, label="XCP Profit", linewidth=2, linestyle="--")
    plt.title("Virtual Price and XCP Profit Over Time")
    plt.ylabel("Value")
    plt.legend()
    plt.grid(True)

    # Plot ratio
    plt.subplot(2, 1, 2)
    plt.plot(ratios, label="XCP Profit / Virtual Price")
    plt.axhline(y=1.0, color="r", linestyle="--")
    plt.title("Ratio of XCP Profit to Virtual Price")
    plt.ylabel("Ratio")
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    # Print basic statistics
    print(f"Final VP: {vp_display[-1]:.6f}, Final XCP: {xcp_display[-1]:.6f}")
    print(f"Are VP and XCP proportional? {all(abs(r - 1.0) < 1e-10 for r in ratios)}")
    print(f"Max deviation from 1.0: {max(abs(r - 1.0) for r in ratios):.10f}")


if __name__ == "__main__":
    main()

In [None]:
A_MULTIPLIER = 10000


def geometric_mean(x):
    N = len(x)
    x = sorted(x, reverse=True)  # Presort - good for convergence
    D = x[0]
    for i in range(255):
        D_prev = D
        tmp = 10**18
        for _x in x:
            tmp = tmp * _x // D
        D = D * ((N - 1) * 10**18 + tmp) // (N * 10**18)
        diff = abs(D - D_prev)
        if diff <= 1 or diff * 10**18 < D:
            return D
    raise ValueError("Did not converge")


def newton_D(A, gamma, x, D0):
    D = D0

    S = sum(x)
    x = sorted(x, reverse=True)
    N = len(x)

    assert N == 2

    for i in range(255):
        D_prev = D

        K0 = 10**18
        for _x in x:
            K0 = K0 * _x * N // D

        _g1k0 = abs(gamma + 10**18 - K0)

        # D / (A * N**N) * _g1k0**2 / gamma**2
        mul1 = 10**18 * D // gamma * _g1k0 // gamma * _g1k0 * A_MULTIPLIER // A

        # 2*N*K0 / _g1k0
        mul2 = (2 * 10**18) * N * K0 // _g1k0

        neg_fprime = (S + S * mul2 // 10**18) + mul1 * N // K0 - mul2 * D // 10**18
        assert neg_fprime > 0  # Python only: -f' > 0

        # D -= f / fprime
        D = (D * neg_fprime + D * S - D**2) // neg_fprime - D * (mul1 // neg_fprime) // 10**18 * (
            10**18 - K0
        ) // K0

        if D < 0:
            D = -D // 2
        if abs(D - D_prev) <= max(100, D // 10**14):
            return D

    raise ValueError("Did not converge")


def solve_D(A, gamma, x):
    D0 = len(x) * geometric_mean(x)  # <- fuzz to make sure it's ok XXX
    return newton_D(A, gamma, x, D0)


# D is initialized at token add
# Assume BTC_PRICE is 100_000, paired with USD, both 18 decimals
p = [10**18, 10**18]

for i in range(1, 100):
    probe_x = i / 10
    x = [int(probe_x * p_val) for p_val in p]

    A = 10**18
    gamma = 2 * 10**25

    D = solve_D(A, gamma, x)

    print(f"For x={probe_x}, D={D}")
    if D > 10**17 - 1:
        print("D is good")
    else:
        print("D is too small")