In [None]:
import os
from collections import defaultdict

import clickhouse_connect
import pandas as pd
from dotenv import load_dotenv
from utils import fetch_prev_epoch_info

START_SLOT = 370453692
TX_TYPES = ["frontRun", "backRun", "victim", "transfer"]
ATTACKER_TX_TYPES = ["frontRun", "backRun", "transfer"]
EPS_WIN = 1e-5


def load_env():
    load_dotenv(dotenv_path=".env")
    return {
        "host": os.getenv("NEW_CLICKHOUSE_HOST"),
        "port": int(os.getenv("NEW_CLICKHOUSE_PORT")),
        "username": os.getenv("NEW_CLICKHOUSE_USERNAME"),
        "password": os.getenv("NEW_CLICKHOUSE_PASSWORD"),
    }


# Load credentials from .env
config = load_env()
# Initialize ClickHouse client
client = clickhouse_connect.get_client(
    host=config["host"],
    port=config["port"],
    username=config["username"],
    password=config["password"],
)

In [None]:
def query_current_slots_in_DB(start_slot=0, end_slot=500000000):
    """
    Query the ClickHouse DB to find the min and max slot stored
    """
    if start_slot < START_SLOT:
        print("Warning: start_slot is before the earliest valid slot in DB.")

    query = f"""
    SELECT
        min(slot) AS min_slot,
        max(slot) AS max_slot,
        countDistinct(slot) AS total_slots
    FROM solwich.slot_txs
    WHERE slot BETWEEN {start_slot} AND {end_slot} AND txFetched = true
    """
    result = client.query(query)
    df = pd.DataFrame(result.result_rows, columns=result.column_names)
    min_slot = df["min_slot"].iloc[0]
    max_slot = df["max_slot"].iloc[0]
    total_slots = df["total_slots"].iloc[0]
    return min_slot, max_slot, total_slots


def query_leader_slot_counts(start_slot: int, end_slot: int) -> pd.DataFrame:
    # Total slots led by each validator in the given slot range in schedule
    if start_slot < START_SLOT:
        print("Warning: start_slot is before the earliest valid slot in DB.")
    query = f"""
    SELECT
        leader,
        count() AS slot_count
    FROM solwich.slot_leaders
    WHERE slot BETWEEN {start_slot} AND {end_slot}
    GROUP BY leader
    ORDER BY slot_count DESC
    """
    res_schedule = client.query(query)
    df_schedule = pd.DataFrame(
        res_schedule.result_rows, columns=res_schedule.column_names
    )

    # Total slots actually led by each validator in the given slot range, checked in DB
    query = f"""
    SELECT
        l.leader,
        countDistinct(s.slot) AS actual_slot_count
    FROM solwich.slot_leaders AS l
    LEFT JOIN solwich.slot_txs  AS s
        ON s.slot = l.slot
    WHERE l.slot BETWEEN {start_slot} AND {end_slot} AND txFetched = true
    GROUP BY l.leader
    """
    res_actual = client.query(query)
    df_actual = pd.DataFrame(res_actual.result_rows, columns=res_actual.column_names)

    out = df_schedule.merge(df_actual, on="leader", how="left")
    out["actual_slot_count"] = out["actual_slot_count"].fillna(0).astype(int)
    out["slot_count"] = out["slot_count"].astype(int)
    out = (
        out.rename(columns={"leader": "validator"})
        .sort_values(["actual_slot_count", "slot_count"], ascending=[False, False])
        .reset_index(drop=True)
    )
    return out


def query_sandwiches_group_by_leader(start_slot: int, end_slot: int) -> pd.DataFrame:
    """
    Count sandwiches data per validator in the given slot range.
    """
    if start_slot < START_SLOT:
        print("Warning: start_slot is before the earliest valid slot in DB.")
    query = f"""
    SELECT
        l.leader,
        countDistinct(s.sandwichId) AS sandwich_count,               
        sum(s.victimCount) AS total_victim_count,      
        sumIf(s.profitA, s.tokenA = 'SOL') AS total_SOL_profit
    FROM solwich.sandwiches AS s
    INNER JOIN solwich.slot_leaders AS l ON s.slot = l.slot
    WHERE s.slot BETWEEN {start_slot} AND {end_slot}
    GROUP BY l.leader
    ORDER BY total_SOL_profit DESC
    """
    result = client.query(query)
    res = pd.DataFrame(result.result_rows, columns=result.column_names)
    res.rename(columns={"leader": "validator"}, inplace=True)
    return res


def query_sandwiches_with_txs_and_leader(
    start_slot: int, end_slot: int
) -> pd.DataFrame:
    """
    Step 1:
    - From solwich.sandwiches, fetch all sandwiches in [start_slot, end_slot].
    - Ensure each sandwichId appears at most once using LIMIT 1 BY sandwichId.
    Step 2:
    - Fetch all sandwiches' txs (*) from solwich.sandwich_txs.
    - Deduplicate txs where (type, signature) are the same
    - Return the integrated rows: sandwich columns (*) + tx columns (*).
    Step 3:
    - Join slot_leaders on each tx's slot to add leader for every row.
    """
    if start_slot < START_SLOT:
        print("Warning: start_slot is before the earliest valid slot in DB.")
    query = f"""
    WITH dedup_sandwiches AS
    (
        SELECT
            sandwichId, slot as sandwich_slot, crossBlock, tokenA, tokenB, signerSame, ownerSame, profitA, relativeDiffB
        FROM solwich.sandwiches
        WHERE slot BETWEEN {start_slot} AND {end_slot}
        ORDER BY slot DESC, sandwichId DESC
        LIMIT 1 BY sandwichId
    ),
    dedup_txs AS
    (
        SELECT
            *
        FROM solwich.sandwich_txs
        WHERE sandwichId IN (SELECT sandwichId FROM dedup_sandwiches) AND slot BETWEEN {start_slot} AND {end_slot}
        ORDER BY slot DESC
        LIMIT 1 BY sandwichId, type, signature
    ),
    s_leaders AS 
    (
        SELECT
            *
        FROM solwich.slot_leaders
        WHERE slot BETWEEN {start_slot} AND {end_slot}
        ORDER BY slot DESC
    )
    SELECT
        s.sandwichId as sandwichId,
        s.sandwich_slot,
        l.leader,
        s.crossBlock,
        s.tokenA,
        s.tokenB,
        s.signerSame,
        s.ownerSame,
        s.profitA,
        s.relativeDiffB,
        t.type,
        t.slot as tx_slot,
        t.position,
        t.timestamp,
        t.fee,
        t.signature,
        t.signer,
        t.inBundle,
        t.programs,
        t.fromToken,
        t.toToken,
        t.fromAmount,
        t.toAmount,
        t.attackerPreBalanceB,
        t.attackerPostBalanceB,
        t.ownersOfB,
        t.fromTotalAmount,
        t.toTotalAmount,
        t.diffA,
        t.diffB
    FROM dedup_sandwiches AS s
    INNER JOIN dedup_txs AS t
        ON t.sandwichId = s.sandwichId
    LEFT JOIN s_leaders AS l
        ON l.slot = t.slot
    ORDER BY sandwich_slot DESC
    """
    result = client.query(query)
    res = pd.DataFrame(result.result_rows, columns=result.column_names)
    res.rename(columns={"leader": "validator"}, inplace=True)
    return res


In [None]:
min_slot, max_slot, total_slots = query_current_slots_in_DB(start_slot=START_SLOT)
print(
    f"Current fetched slots in DB: min = {min_slot}, max = {max_slot}, total = {total_slots}"
)

prev_epoch, prev_epoch_start, prev_epoch_end = fetch_prev_epoch_info()
print(f"Previous epoch: {prev_epoch}, start: {prev_epoch_start}, end: {prev_epoch_end}")

min_slot, max_slot, total_slots = query_current_slots_in_DB(
    prev_epoch_start, prev_epoch_end
)
print(
    f"Current fetched slots in DB in previous epoch range: min = {min_slot}, max = {max_slot}, total = {total_slots}"
)

In [None]:
leader_slots = query_leader_slot_counts(prev_epoch_start, prev_epoch_end)
print(
    f"Total slots led in epoch {prev_epoch} (slots {prev_epoch_start} to {prev_epoch_end}):"
)
print(leader_slots.head(10))
print(
    "Total slots:",
    leader_slots["slot_count"].sum(),
    "Total actual slots checked:",
    leader_slots["actual_slot_count"].sum(),
)

# Query victims number, sandwich number and SOL profit per validator in the previous epoch
sandwiches_by_leader = query_sandwiches_group_by_leader(
    prev_epoch_start, prev_epoch_end
)
print(sandwiches_by_leader.head(10))
print("Total sandwiches:", sandwiches_by_leader["sandwich_count"].sum())
print("Total profit in SOL:", sandwiches_by_leader["total_SOL_profit"].sum())
print("Total victims:", sandwiches_by_leader["total_victim_count"].sum())
# Compute victims per slot, sandwiches per slot, profit per slot
leader_statistic = leader_slots.merge(sandwiches_by_leader, on="validator", how="left")
leader_statistic["victims_per_slot"] = leader_statistic.apply(
    lambda row: (
        row["total_victim_count"] / row["actual_slot_count"]
        if row["actual_slot_count"] > 0
        else 0
    ),
    axis=1,
)
leader_statistic["sandwiches_per_slot"] = leader_statistic.apply(
    lambda row: (
        row["sandwich_count"] / row["actual_slot_count"]
        if row["actual_slot_count"] > 0
        else 0
    ),
    axis=1,
)
leader_statistic["profit_per_slot"] = leader_statistic.apply(
    lambda row: (
        row["total_SOL_profit"] / row["actual_slot_count"]
        if row["actual_slot_count"] > 0
        else 0
    ),
    axis=1,
)
leader_statistic = leader_statistic.sort_values(
    by=["sandwiches_per_slot", "profit_per_slot", "victims_per_slot"],
    ascending=False,
)
print("Leader statistic with victims/slot, sandwiches/slot, profit/slot:")
print(leader_statistic.head(10))

Total slots led in epoch 863 (slots 372816000 to 373247999):
                                      validator  slot_count  actual_slot_count
0  DRpbCBMxVnDK7maPM5tGv6MvB3v1sRMC86PZ8okm21hy       14156              14154
1  HEL1USMZKAL2odpNBj2oCjffnFGaYwmbGmyewGv1e2TU       13888              13888
2   JupmVLmA8RoyTUbTMMuTtoPWHEiNQobxgTeGTrPNkzT       11780              11780
3  Fd7btgySsrjuo25CJCj7oE7VPMyezDhnx7pZkj2v69Nk       11028              11026
4  DtdSSG8ZJRZVv5Jx7K1MeWp7Zxcu19GD5wQRGRpQ9uMF        8960               8960
5  5pPRHniefFjkiaArbGX3Y8NUysJmQ9tMZg3FrFGwHzSm        8680               8679
6   q9XWcZ7T1wP4bW9SB4XgNNwjnFEJ982nE8aVbbNuwot        8476               8476
7  JD549HsbJHeEKKUrKgg4Fj2iyv2RGjsV7NTZjZUrHybB        7124               7124
8  Awes4Tr6TX8JDzEhCZY2QVNimT6iD1zWHzf1vNyGvpLM        7076               7064
9  FNKgX9dYUhYQFRTM9bkeKoRpsyEtZGNMxbdQLDzfqB8a        6364               6364
Total slots: 432000 Total actual slots checked: 431647
               

In [None]:
start = prev_epoch_start
end = prev_epoch_end

# start = START_SLOT
# end = 380000000

# Slots with sandwiches and their txs
sandwiches_txs = query_sandwiches_with_txs_and_leader(start, end)
print(
    f"Total rows of sandwiches with txs and leader in slots from {start} to {end}: sandwiches - {sandwiches_by_leader['sandwich_count'].sum()} sandwich_txs - {len(sandwiches_txs)}"
)

# Profit (now in SOL only)
profit_view = sandwiches_txs.drop_duplicates(subset=["sandwichId"]).copy()
profit_view = profit_view[["sandwichId", "tokenA", "profitA"]]
profit_view["profit_SOL"] = profit_view["profitA"].where(
    profit_view["tokenA"] == "SOL", 0.0
)
profit_view["is_SOL"] = profit_view["tokenA"] == "SOL"

Total rows of sandwiches with txs and leader in slots from 370453692 to 380000000: sandwiches - 71555 sandwich_txs - 4876119


In [None]:
# Jito bundle coverage analysis
jito_view = sandwiches_txs.copy()
jito_stat = jito_view.groupby("sandwichId", as_index=False).agg(
    tx_count=("signature", "count"),
    inbundle_count=("inBundle", lambda x: x.fillna(False).astype(int).sum()),
)
jito_stat = jito_stat.merge(profit_view, on="sandwichId", how="left")
jito_stat["tokenA"] = jito_stat["tokenA"].fillna("UNKNOWN")
jito_stat["profitA"] = jito_stat["profitA"].fillna(0.0)


def bundle_status(row_tx_count, row_inbundle_count):
    if row_tx_count == 0:
        return "no_tx"
    if row_inbundle_count == 0:
        return "none_in_bundle"
    if row_inbundle_count == row_tx_count:
        return "all_in_bundle"
    return "partial_in_bundle"


jito_stat["bundle_status"] = jito_stat.apply(
    lambda row: bundle_status(row["tx_count"], row["inbundle_count"]), axis=1
)
summary = (
    jito_stat.groupby("bundle_status", dropna=False)
    .apply(
        lambda r: pd.Series(
            {
                "sandwich_count": r["sandwichId"].nunique(),
                "total_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].sum(),
                "sol_sandwich_count": (r["tokenA"] == "SOL").sum(),
                "avg_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].mean(),
            }
        )
    )
    .reset_index()
)
total_with_tx = int(summary["sandwich_count"].sum())
summary["sandwich_count_share"] = (
    summary["sandwich_count"] / total_with_tx if total_with_tx > 0 else 0.0
)
print(
    f"Jito bundle coverage in epoch {prev_epoch} "
    f"(slots {start} to {end}): total sandwiches = {total_with_tx}"
)
print(summary)

Jito bundle coverage in epoch 863 (slots 370453692 to 380000000): total sandwiches = 636778
       bundle_status  sandwich_count  total_profit_SOL  sol_sandwich_count  \
0      all_in_bundle        203568.0        455.193930            172512.0   
1     none_in_bundle        336645.0       -363.809689            206969.0   
2  partial_in_bundle         96565.0       2681.696485             80454.0   

   avg_profit_SOL  sandwich_count_share  
0        0.002639              0.319684  
1       -0.001758              0.528669  
2        0.033332              0.151646  


In [None]:
# Distance analysis
distance_view = sandwiches_txs.copy()

dist_rows = []
for sid, txs in distance_view.groupby("sandwichId"):
    cross_block = bool(txs["crossBlock"].iloc[0])
    tx_count = int(len(txs))
    inbundle_count = int(txs["inBundle"].sum())
    status = bundle_status(tx_count, inbundle_count)

    # Consecutive
    pos_unique = sorted(txs["position"].dropna().unique().tolist())
    consecutive = False
    if not cross_block and len(pos_unique) > 0:
        consecutive = pos_unique[-1] - pos_unique[0] + 1 == len(pos_unique)

    # Distance
    fr = txs[txs["type"] == "frontRun"]
    br = txs[txs["type"] == "backRun"]
    last_front_pos = fr["position"].max() if not fr.empty else None
    first_back_pos = br["position"].min() if not br.empty else None
    last_front_slot = fr["tx_slot"].max() if not fr.empty else None
    first_back_slot = br["tx_slot"].min() if not br.empty else None

    inblock_distance = None
    if not cross_block and last_front_pos is not None and first_back_pos is not None:
        inblock_distance = int(first_back_pos - last_front_pos)

    crossblock_gap_slots = None
    if cross_block and (last_front_slot is not None) and (first_back_slot is not None):
        crossblock_gap_slots = int(first_back_slot - last_front_slot)

    dist_rows.append(
        {
            "sandwichId": sid,
            "cross_block": cross_block,
            "consecutive": bool(consecutive),
            "bundle_status": status,  # all/partial/none/no_tx
            "tx_count": tx_count,
            "inbundle_count": inbundle_count,
            "inblock_distance": inblock_distance,
            "crossblock_gap_slots": crossblock_gap_slots,
        }
    )

dist_stat = pd.DataFrame(dist_rows)
dist_stat = dist_stat.merge(profit_view, on="sandwichId", how="left")
dist_stat["tokenA"] = dist_stat["tokenA"].fillna("UNKNOWN")
dist_stat["profitA"] = dist_stat["profitA"].fillna(0.0)

# -------- In block (consecutive) --------
inblock_consecutive = dist_stat[
    (~dist_stat["cross_block"]) & (dist_stat["consecutive"])
]
print("\n[] In-block & consecutive sandwiches")
print(f"Count: {len(inblock_consecutive)}")
cov1 = inblock_consecutive.groupby("bundle_status").apply(
    lambda r: pd.Series(
        {
            "count": len(r),
            "total_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].sum(),
            "sol_sandwich_count": (r["tokenA"] == "SOL").sum(),
            "avg_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].mean(),
        }
    ),
)
total1 = int(cov1["count"].sum()) if not cov1.empty else 0
cov1["share"] = cov1["count"] / total1
print("Jito bundle coverage (all/partial/none):")
print(cov1)

# -------- In block (non-contiguous) --------
inblock_non_contig = dist_stat[
    (~dist_stat["cross_block"]) & (~dist_stat["consecutive"])
]
print("\n[2] In-block & non-contiguous sandwiches")
print(f"Count: {len(inblock_non_contig)}")
dist = inblock_non_contig["inblock_distance"].dropna().astype(int)
print("Distance summary (first_backrun.position - last_frontrun.position):")
print(dist.describe())
print("Top distances (value counts):")
print(dist.value_counts().head(10))
cov2 = inblock_non_contig.groupby("bundle_status").apply(
    lambda r: pd.Series(
        {
            "count": len(r),
            "total_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].sum(),
            "sol_sandwich_count": (r["tokenA"] == "SOL").sum(),
            "avg_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].mean(),
        }
    ),
)
total2 = int(cov2["count"].sum()) if not cov2.empty else 0
cov2["share"] = cov2["count"] / total2
print("Jito bundle coverage (all/partial/none):")
print(cov2)

# -------- Cross block --------
cross_block_df = dist_stat[dist_stat["cross_block"]]
print("\n[3] Cross-block sandwiches")
print(f"Count: {len(cross_block_df)}")
gaps = cross_block_df["crossblock_gap_slots"].dropna().astype(int)
print("Gap slots summary (min backrun slot - max frontrun slot):")
print(gaps.describe())
print("Top gap slots (value counts):")
print(gaps.value_counts().head(10))
cov3 = cross_block_df.groupby("bundle_status").apply(
    lambda r: pd.Series(
        {
            "count": len(r),
            "total_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].sum(),
            "sol_sandwich_count": (r["tokenA"] == "SOL").sum(),
            "avg_profit_SOL": r.loc[r["tokenA"] == "SOL", "profitA"].mean(),
        }
    ),
)
total3 = int(cov3["count"].sum()) if not cov3.empty else 0
cov3["share"] = cov3["count"] / total3
print("Jito bundle coverage (all/partial/none):")
print(cov3)


[] In-block & consecutive sandwiches
Count: 28475
Jito bundle coverage (all/partial/none):
                     count  total_profit_SOL  sol_sandwich_count  \
bundle_status                                                      
all_in_bundle      27519.0         87.328122             27399.0   
none_in_bundle       953.0         88.310859               924.0   
partial_in_bundle      3.0          0.138196                 3.0   

                   avg_profit_SOL     share  
bundle_status                                
all_in_bundle            0.003187  0.966427  
none_in_bundle           0.095575  0.033468  
partial_in_bundle        0.046065  0.000105  

[2] In-block & non-contiguous sandwiches
Count: 98702
Distance summary (first_backrun.position - last_frontrun.position):
count    98702.000000
mean       277.924865
std        363.991929
min          3.000000
25%         24.000000
50%        103.000000
75%        378.000000
max       2327.000000
Name: inblock_distance, dtype: float64

In [None]:
# Program analysis
program_view = sandwiches_txs.copy()
program_view = program_view[program_view["type"].isin(ATTACKER_TX_TYPES)]

program_rows = []
for sid, txs in program_view.groupby("sandwichId"):
    programs_in_sandwich = {}
    for _, tx in txs.iterrows():
        # programs is like [program1, program1, program2, ...], not string
        programs = tx["programs"] if isinstance(tx["programs"], (list, tuple)) else []
        # Count each program's occurrence in the sandwich
        for p in programs:
            programs_in_sandwich[p] = programs_in_sandwich.get(p, 0) + 1
    program_rows.append(
        {
            "sandwichId": sid,
            "programs_count": programs_in_sandwich,
            "programs": list(programs_in_sandwich.keys()),
        }
    )

program_view = pd.DataFrame(program_rows)
program_view = program_view.merge(profit_view, on="sandwichId", how="left")

program_total_occurrence = defaultdict(int)
for sid, r in program_view.iterrows():
    for p, count in r["programs_count"].items():
        program_total_occurrence[p] += count
program_total_occurrence = dict(
    sorted(program_total_occurrence.items(), key=lambda item: item[1], reverse=True)
)
print("\n[Program Analysis] Total program occurrences in all sandwiches:")
for p, count in list(program_total_occurrence.items())[:20]:
    print(f"{p}: {count}")

NameError: name 'sandwiches_txs' is not defined

In [None]:
# Union-Find
parent = {}
rank = {}


def find(x):
    parent.setdefault(x, x)
    if parent[x] != x:
        parent[x] = find(parent[x])
    return parent[x]


def union(a, b):
    ra, rb = find(a), find(b)
    if ra == rb:
        return
    rank.setdefault(ra, 0)
    rank.setdefault(rb, 0)
    if rank[ra] < rank[rb]:
        parent[ra] = rb
    elif rank[ra] > rank[rb]:
        parent[rb] = ra
    else:
        parent[rb] = ra
        rank[ra] += 1


# Attacker analysis
attacker_view = sandwiches_txs.copy()
attacker_profit_view = profit_view.copy().set_index("sandwichId")

# Link every signer to an attacker using union-find
for sid, txs in attacker_view[attacker_view["type"].isin(ATTACKER_TX_TYPES)].groupby(
    "sandwichId"
):
    attacker_txs = txs[txs["type"].isin(ATTACKER_TX_TYPES)]
    attacker_signers = sorted(set(attacker_txs["signer"].dropna().astype(str).tolist()))

    signer_same = (
        bool(txs["signerSame"].dropna().iloc[0])
        if not txs["signerSame"].empty
        else False
    )
    if signer_same and len(attacker_signers) != 1:
        print(
            f"Warning: sandwichId {sid} has signerSame={signer_same} but multiple signers: {attacker_signers}"
        )
        continue

    if len(attacker_signers) <= 1:
        if len(attacker_signers) == 1:
            _ = find(attacker_signers[0])
        continue
    else:
        base = attacker_signers[0]
        for other in attacker_signers[1:]:
            union(base, other)

comp_members = defaultdict(set)
for s in sorted(set(attacker_view["signer"].dropna().astype(str))):
    comp_members[find(s)].add(s)

# Map root to attacker key (hashing sorted concatenation of members)
root_to_attacker_key = {
    root: hex(abs(hash(",".join(sorted(members)))))
    for root, members in comp_members.items()
}
signer_to_attacker_key = {
    s: root_to_attacker_key[find(s)]
    for s in set(attacker_view["signer"].dropna().astype(str))
}


# Collect attacker's sandwich data
def choose_attacker_for_sandwich(df):
    ss = df[df["type"].isin(ATTACKER_TX_TYPES)]["signer"].dropna().astype(str).tolist()
    keys = sorted(
        {signer_to_attacker_key.get(s) for s in ss if s in signer_to_attacker_key}
    )
    # Only one attacker
    if len(keys) != 1:
        print(
            f"Warning: sandwichId {df['sandwichId'].iloc[0]} has multiple attackers: {keys}"
        )

    if len(keys) == 0:
        return "UNKNOWN"
    if len(keys) == 1:
        return keys[0]
    return " + ".join(sorted(set(keys)))


sandwich_to_attacker = (
    attacker_view.groupby("sandwichId", as_index=False)
    .apply(choose_attacker_for_sandwich)
    .rename(columns={None: "attacker_key"})
)

attacker_stat = sandwich_to_attacker.merge(
    profit_view, on="sandwichId", how="left"
).merge(jito_stat[["sandwichId", "bundle_status"]], on="sandwichId", how="left")
attacker_stat["profit_SOL"] = attacker_stat["profit_SOL"].fillna(0.0)
attacker_stat["is_SOL"] = attacker_stat["is_SOL"].fillna(False)
attacker_stat["win"] = (attacker_stat["profit_SOL"] > 1e-5).astype(int)

attacker_summary = (
    attacker_stat.groupby("attacker_key", dropna=False)
    .apply(
        lambda r: pd.Series(
            {
                "sandwich_count": r["sandwichId"].nunique(),
                "total_profit_SOL": r.loc[r["is_SOL"], "profit_SOL"].sum(),
                "sol_sandwich_count": r["is_SOL"].sum(),
                "avg_profit_SOL": r.loc[r["is_SOL"], "profit_SOL"].mean(),
                "win_count": r["win"].sum(),
                "win_rate": r["win"].mean(),
                "all_in_bundle": (r["bundle_status"] == "all_in_bundle").sum()
                / r["sandwichId"].nunique(),
                "partial_in_bundle": (r["bundle_status"] == "partial_in_bundle").sum()
                / r["sandwichId"].nunique(),
                "none_in_bundle": (r["bundle_status"] == "none_in_bundle").sum()
                / r["sandwichId"].nunique(),
            }
        )
    )
    .reset_index()
)
attacker_addresses = (
    pd.Series(
        {
            root_to_attacker_key[root]: sorted(list(members))
            for root, members in comp_members.items()
        },
        name="signer_addresses",
    )
    .reset_index()
    .rename(columns={"index": "attacker_key"})
)
attacker_addresses["signer_address_count"] = attacker_addresses[
    "signer_addresses"
].apply(len)
attacker_addresses["signer_addresses_str"] = attacker_addresses[
    "signer_addresses"
].apply(lambda xs: ", ".join(xs))
attacker_summary = attacker_summary.merge(
    attacker_addresses, on="attacker_key", how="left"
).sort_values(by=["win_count", "win_rate"], ascending=False)

print("\n[Attacker Entities] summary (top 20):")
print(attacker_summary.head(20))


[Attacker Entities] summary (top 20):
             attacker_key  sandwich_count  total_profit_SOL  \
77128  0x667a24bd76f53bc7         20250.0        709.161398   
45965  0x43725d9d46925c2d          8860.0         86.856486   
47043  0x44ab68bd93c2ac35          2043.0         67.565764   
98397  0x7e3f8081bca0fe8e          2294.0          0.991218   
11982  0x1d8195b593391dce          1656.0         39.545827   
73493  0x625ceb426e2446cc          1402.0         29.070816   
82424  0x6c771ca98aa28c9a          1273.0         44.263536   
22316  0x28e5a169e0deef11          1427.0         11.999036   
92468  0x77bb804a40089075           986.0         29.649107   
5193   0x15f36dc0c56a3d43           963.0         36.827631   
87178  0x71dfba9a76b22761           870.0          2.259811   
30358  0x31ec1d9476d10266           984.0         47.203334   
58262  0x5146fcf1e1c75c18           850.0         12.956545   
24532  0x2b6076ef6b5822a1           862.0         63.813952   
21044  0x276a3c9