## Common Func

In [3]:
import os
import re
import json
import statistics
from typing import Callable, Tuple, List, Optional

import torch
import matplotlib.pyplot as plt
import seaborn as sns
from use_eplb import load_csv_to_tensor
import pandas as pd
import time

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


def calculate_gpu_loads(original_weights: torch.Tensor, num_gpus: int):
    num_layers, num_experts = original_weights.shape
    experts_per_gpu = num_experts // num_gpus

    assert num_experts % num_gpus == 0, "Experts cannot be evenly divided among GPUs."

    gpu_loads = torch.zeros((num_layers, num_gpus), dtype=original_weights.dtype)

    for layer_idx in range(num_layers):
        for gpu_idx in range(num_gpus):
            start_idx = gpu_idx * experts_per_gpu
            end_idx = (gpu_idx + 1) * experts_per_gpu
            gpu_loads[layer_idx, gpu_idx] = original_weights[
                layer_idx, start_idx:end_idx
            ].sum()

    return gpu_loads


def save_gpu_loads_to_csv(
    gpu_loads: torch.Tensor, output_folder: str, file_name: str, num_gpus: int
):
    gpu_loads_df = pd.DataFrame(
        gpu_loads.numpy(),
        columns=[f"GPU{i}" for i in range(num_gpus)],
        index=list(range(gpu_loads.shape[0])),
    )
    csv_save_path = os.path.join(output_folder, file_name)
    gpu_loads_df.to_csv(csv_save_path)


def plot_gpu_loads_analysis(
    gpu_loads: torch.Tensor, heatmap_save_path: str, boxplot_save_path: str
):
    """
    Plot normalized heatmap and boxplot of GPU loads.

    Args:
        gpu_loads (torch.Tensor): Tensor of shape [58, 8] (layers x GPUs).
        heatmap_save_path (str): File path to save heatmap.
        boxplot_save_path (str): File path to save boxplot.
    """
    import matplotlib.pyplot as plt
    import seaborn as sns

    # Normalize the gpu loads per layer
    layer_sums = gpu_loads.sum(dim=1, keepdim=True)  # Sum across GPUs for each layer
    normalized_gpu_loads = gpu_loads / layer_sums  # Each row sums to 1

    # 1. Heatmap: Normalized Load per layer per GPU
    plt.figure(figsize=(10, 8))
    sns.heatmap(
        normalized_gpu_loads.numpy(),
        cmap="Reds",
        annot=True,
        fmt=".2f",  # Show 2 decimal places for better precision
        linewidths=0.5,
        cbar_kws={"label": "Normalized Load"},
        yticklabels=list(range(gpu_loads.shape[0])),  # Layer IDs
        annot_kws={"size": 6},
    )
    plt.xlabel("GPU Index")
    plt.ylabel("Layer ID")
    plt.title("Normalized GPU Loads per Layer")
    plt.tight_layout()
    plt.savefig(heatmap_save_path)
    print(f"Normalized heatmap saved to: {heatmap_save_path}")
    plt.close()

    # 2. Boxplot: Distribution of normalized GPU loads across layers
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=normalized_gpu_loads.numpy())
    plt.axhline(
        y=1.0 / normalized_gpu_loads.shape[1], color="blue", linestyle="-"
    )  # Ideal line
    overall_std = normalized_gpu_loads.numpy().flatten().std()
    plt.annotate(
        xy=(0.98, 0.98),
        xycoords="axes fraction",
        text=f"Overall Std Dev: σ = {overall_std:.4f}",
        ha="right",
        va="top",
        fontsize=10,
        color="blue",
        bbox=dict(
            boxstyle="round,pad=0.3", edgecolor="gray", facecolor="white", alpha=0.7
        ),
    )
    plt.xlabel("GPU Index")
    plt.ylabel("Normalized Load")
    plt.title("GPU Load Distribution Across Layers (Normalized Boxplot)")
    plt.tight_layout()
    plt.savefig(boxplot_save_path)
    print(f"Normalized boxplot saved to: {boxplot_save_path}")
    plt.close()


def remap_weights(
    phy2log: torch.Tensor, original_weights: torch.Tensor
) -> torch.Tensor:
    """
    Remap the original logical expert weights according to the phy2log mapping.

    Args:
        phy2log (torch.Tensor): A tensor of shape [58, 256], where each value is a logical_expert_id.
        original_weights (torch.Tensor): A tensor of shape [58, 256], where the index is the logical_expert_id and the value is the corresponding weight.

    Returns:
        torch.Tensor: A new tensor of shape [58, 256], where each physical expert has its remapped weight.
    """
    num_layers, num_physical_experts = phy2log.shape
    new_weights = torch.zeros_like(phy2log, dtype=original_weights.dtype)

    for layer_idx in range(num_layers):
        # Get the mapping from physical experts to logical experts for this layer
        logical_ids = phy2log[layer_idx]  # Shape: [256]
        # Fetch the corresponding weights from original_weights using the logical expert IDs
        new_weights[layer_idx] = original_weights[layer_idx][logical_ids]

    return new_weights


def normalize_gpu_loads(gpu_loads: torch.Tensor) -> torch.Tensor:
    """
    Normalize GPU loads layer-wise so that each row sums to 1.
    """
    layer_sums = gpu_loads.sum(dim=1, keepdim=True)  # Sum across GPUs for each layer
    normalized_gpu_loads = gpu_loads / layer_sums
    return normalized_gpu_loads


def natural_sort_key(s: str) -> List:
    """Generate a natural sort key for strings.

    Args:
        s: Input string to be sorted

    Returns:
        A list that can be used for natural sorting
    """
    return [
        int(text) if text.isdigit() else text.lower() for text in re.split(r"(\d+)", s)
    ]


def load_csv_to_tensor(input_folder: str) -> Optional[torch.Tensor]:
    """Load CSV files from a folder and convert them to a tensor.

    Each CSV file represents one layer, and is converted to one row in the output tensor.

    Args:
        input_folder: Path to folder containing CSV files

    Returns:
        torch.Tensor of shape [num_layers, num_experts] or None if an error occurs
    """
    # Get all csv files and sort them naturally
    csv_files = sorted(
        [
            os.path.join(input_folder, f)
            for f in os.listdir(input_folder)
            if f.endswith(".csv")
        ],
        key=natural_sort_key,
    )
    num_layers = len(csv_files)

    if num_layers == 0:
        print(f"No CSV files found in {input_folder}")
        return None

    # Initialize a list to store rows
    rows = []

    for csv_file in csv_files:
        try:
            # Read the CSV file
            df = pd.read_csv(csv_file, header=None)
            # Sum up all rows in the CSV file
            row_sums = df.sum(axis=0).values
            # Append to the list as a row
            rows.append(row_sums)
        except Exception as e:
            print(f"Error processing file {csv_file}: {e}")
            return None

    # Stack all rows into a single tensor of shape [num_layers, num_experts]
    return torch.tensor(rows, dtype=torch.float32)


def count_expert_moves(assignment: torch.Tensor) -> int:
    """input one-hot assignment"""
    nb_experts_per_gpu = assignment.shape[1] // assignment.shape[0]
    return sum(
        [
            sum(
                [
                    int(assignment[gpu][expert].item())
                    for expert in range(assignment.shape[1])
                    if (expert // nb_experts_per_gpu) != gpu
                ]
            )
            for gpu in range(assignment.shape[0])
        ]
    )


def get_naive_assignment(nb_gpus: int, nb_experts: int) -> torch.Tensor:
    """return vanilla one-hot assignment of experts to GPUs"""
    nb_experts_per_gpu = nb_experts // nb_gpus
    return torch.Tensor(
        [
            [
                (
                    1
                    if i < (j + 1) * nb_experts_per_gpu and i >= j * nb_experts_per_gpu
                    else 0
                )
                for i in range(nb_experts)
            ]
            for j in range(nb_gpus)
        ]
    ).long()


def get_readable_assignment(assignment: torch.Tensor) -> str:
    """return a readable string of the one-hot assignment"""
    assignment_str = ""
    for gpu in range(assignment.shape[0]):
        assignment_str += f"GPU {gpu}:"
        for expert, value in enumerate(assignment[gpu]):
            if value:
                assignment_str += f" {expert}"
        assignment_str += "\n"
    return assignment_str


def phy_to_assignment(
    phy2log: torch.Tensor, nb_gpus: int, nb_experts: int
) -> torch.Tensor:
    """
    convert phy2log to one-hot assignment, if assignment[0][32] = 1, then in this layer, logical_expert 32 is located on gpu0.
    """
    assert nb_experts % nb_gpus == 0, "Experts cannot be evenly divided among GPUs."
    return torch.sum(
        torch.nn.functional.one_hot(phy2log).view(nb_gpus, nb_experts // nb_gpus, -1),
        dim=1,
    )


def get_gpu_load(
    assignment: torch.Tensor,
    hotness: torch.Tensor,
):
    return torch.matmul(assignment.float(), hotness.float())


def adjust_hotness(hotness: torch.Tensor, assignment: torch.Tensor) -> torch.Tensor:
    """
    Args:
        - hotness: [E], total logical hotness
        - assignment: [G, E], 1 if logical expert is on GPU

    Returns:
        - adjusted_hotness: [E], hotness per replica (logical expert / replica count)
    """
    assignment = assignment.float()
    replica_count = assignment.sum(dim=0)  # [E]
    adjusted_hotness = hotness.float() / replica_count.clamp(min=1)
    return adjusted_hotness


def measure_execution_time(func: Callable[[], dict], warmup: int, repeat: int):
    for _ in range(warmup):
        func()

    times = []
    final_result = None
    for _ in range(repeat):
        start = time.perf_counter()
        final_result = func()
        end = time.perf_counter()
        times.append((end - start) * 1000)  # ms

    return {
        "result": final_result,
        "avg": statistics.mean(times),
        "std": statistics.stdev(times) if repeat > 1 else 0,
        "min": min(times),
        "max": max(times),
        "all_times": times,
    }


def minimize_moves(self, hotness: torch.Tensor, assignments: torch.Tensor) -> torch.Tensor:
    bins = torch.arange(
        0,
        int(torch.max(hotness).item() + 11),
        # int(0.5 * self.nb_experts),
        10,
    )
    digitized = torch.bucketize(hotness, bins, right=True)
    # Try sort approach rather than bin approach

    binned = [
        torch.arange(self.nb_experts)[digitized == i].tolist()
        for i in range(1, len(bins))
    ]
    assignment_indices = torch.arange(self.nb_experts)[torch.argsort(assignments)]
    for i, expert in enumerate(assignments):
        gpu = i // self.nb_experts_per_gpu
        # Nothing to change. The expert is on the original GPU
        if (
            gpu * self.nb_experts_per_gpu <= expert
            and expert < (gpu + 1) * self.nb_experts_per_gpu
        ):
            continue
        current_expert_bin = digitized[expert]
        experts_in_same_bin = binned[current_expert_bin - 1]
        for i, same_bin_expert in enumerate(experts_in_same_bin):
            # We found an expert with the same kind of hotness that belongs to the current GPU
            if (
                same_bin_expert != expert
                and gpu * self.nb_experts_per_gpu <= same_bin_expert
                and same_bin_expert < (gpu + 1) * self.nb_experts_per_gpu
            ):
                # We get the index of the original gpu
                # gpu_ind = np.where(assignments[:, same_bin_expert] == 1)[0]
                # assert len(gpu_ind) == 1
                # gpu_ind = gpu_ind[0]
                # if gpu_ind == gpu:
                # continue
                (
                    assignments[assignment_indices[expert]],
                    assignments[assignment_indices[same_bin_expert]],
                ) = (
                    assignments[assignment_indices[same_bin_expert]],
                    assignments[assignment_indices[expert]],
                )

                # We don't want this expert to be swapped again
                del experts_in_same_bin[i]

                break

    return assignments

## vanilla

In [None]:
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
input_folder = r"C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist"
output_folder = os.path.join(input_folder, "vanilla")
os.makedirs(output_folder, exist_ok=True)
original_weights = load_csv_to_tensor(input_folder)
num_gpus = 8
vanilla_gpu_loads = calculate_gpu_loads(original_weights, num_gpus)
csv_save_path = os.path.join(output_folder, "vanilla_gpu_loads.csv")
save_gpu_loads_to_csv(vanilla_gpu_loads, output_folder, csv_save_path, num_gpus)

heatmap_save_path = os.path.join(output_folder, "vanilla_gpu_loads_heatmap.png")
boxplot_save_path = os.path.join(output_folder, "vanilla_gpu_loads_boxplot.png")
plot_gpu_loads_analysis(
    gpu_loads=vanilla_gpu_loads,
    heatmap_save_path=heatmap_save_path,
    boxplot_save_path=boxplot_save_path,
)

  return torch.tensor(rows, dtype=torch.float32)


Normalized heatmap saved to: C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist\vanilla\vanilla_gpu_loads_heatmap.png
Normalized boxplot saved to: C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist\vanilla\vanilla_gpu_loads_boxplot.png


## EPLB

### EPLB ALgorithm Code

In [None]:
from typing import Tuple

import torch


def balanced_packing(
    weight: torch.Tensor, num_packs: int
) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Pack n weighted objects to m packs, such that each bin contains exactly n/m objects and the weights of all packs
    are as balanced as possible.

    Parameters:
        weight: [X, n], the weight of each item
        num_packs: number of packs

    Returns:
        pack_index: [X, n], the pack index of each item
        rank_in_pack: [X, n], the rank of the item in the pack
    """
    num_layers, num_groups = weight.shape
    assert num_groups % num_packs == 0
    groups_per_pack = num_groups // num_packs

    if groups_per_pack == 1:
        pack_index = torch.arange(
            weight.size(-1), dtype=torch.int64, device=weight.device
        ).expand(weight.shape)
        rank_in_pack = torch.zeros_like(weight, dtype=torch.int64)
        return pack_index, rank_in_pack

    indices = weight.float().sort(-1, descending=True).indices.cpu()  # O(n log n)
    pack_index = torch.full_like(weight, fill_value=-1, dtype=torch.int64, device="cpu")
    rank_in_pack = torch.full_like(pack_index, fill_value=-1)
    for i in range(num_layers):
        pack_weights = [0] * num_packs
        pack_items = [0] * num_packs
        for group in indices[i]:
            pack = min(
                (i for i in range(num_packs) if pack_items[i] < groups_per_pack),
                key=pack_weights.__getitem__,
            )
            assert pack_items[pack] < groups_per_pack
            pack_index[i, group] = pack
            rank_in_pack[i, group] = pack_items[pack]
            pack_weights[pack] += weight[i, group]
            pack_items[pack] += 1
    return pack_index, rank_in_pack


def replicate_experts(
    weight: torch.Tensor, num_phy: int
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    """
    Replicate `num_log` experts to `num_phy` replicas, such that the maximum load of all replicas is minimized.

    Parameters:
        weight: [X, num_log]
        num_phy: total number of experts after replication

    Returns:
        phy2log: [X, num_phy], logical expert id of each physical expert
        rank: [X, num_phy], the replica rank
        logcnt: [X, num_log], number of replicas for each logical expert
    """
    n, num_log = weight.shape
    num_redundant = num_phy - num_log
    assert num_redundant >= 0
    device = weight.device
    phy2log = torch.arange(num_phy, dtype=torch.int64, device=device).repeat(n, 1)
    rank = torch.zeros(n, num_phy, dtype=torch.int64, device=device)
    logcnt = torch.ones(n, num_log, dtype=torch.int64, device=device)
    arangen = torch.arange(n, dtype=torch.int64, device=device)
    for i in range(num_log, num_phy):
        redundant_indices = (weight / logcnt).max(dim=-1).indices
        phy2log[:, i] = redundant_indices
        rank[:, i] = logcnt[arangen, redundant_indices]
        logcnt[arangen, redundant_indices] += 1
    return phy2log, rank, logcnt


def rebalance_experts_hierarchical(
    weight: torch.Tensor,
    num_physical_experts: int,
    num_groups: int,
    num_nodes: int,
    num_gpus: int,
):
    """
    Parameters:
        weight: [num_moe_layers, num_logical_experts]
        num_physical_experts: number of physical experts after replication
        num_groups: number of expert groups
        num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster
        num_gpus: number of GPUs, must be a multiple of `num_nodes`

    Returns:
        physical_to_logical_map: [num_moe_layers, num_physical_experts]
        logical_to_physical_map: [num_moe_layers, num_logical_experts, X]
        logical_count: [num_moe_layers, num_logical_experts]
    """
    num_layers, num_logical_experts = weight.shape
    assert num_logical_experts % num_groups == 0
    group_size = num_logical_experts // num_groups
    assert num_groups % num_nodes == 0
    groups_per_node = num_groups // num_nodes
    assert num_gpus % num_nodes == 0
    assert num_physical_experts % num_gpus == 0
    phy_experts_per_gpu = num_physical_experts // num_gpus

    def inverse(perm: torch.Tensor) -> torch.Tensor:
        inv = torch.empty_like(perm)
        inv.scatter_(
            1,
            perm,
            torch.arange(perm.size(1), dtype=torch.int64, device=perm.device).expand(
                perm.shape
            ),
        )
        return inv

    # Step 1: pack groups to nodes
    tokens_per_group = weight.unflatten(-1, (num_groups, group_size)).sum(-1)
    group_pack_index, group_rank_in_pack = balanced_packing(tokens_per_group, num_nodes)
    log2mlog = (
        (
            (group_pack_index * groups_per_node + group_rank_in_pack) * group_size
        ).unsqueeze(-1)
        + torch.arange(group_size, dtype=torch.int64, device=group_pack_index.device)
    ).flatten(-2)
    mlog2log = inverse(log2mlog)

    # Step 2: construct redundant experts within nodes
    # [num_layers * num_nodes, num_logical_experts // num_nodes]
    tokens_per_mlog = weight.gather(-1, mlog2log).view(
        -1, num_logical_experts // num_nodes
    )
    phy2mlog, phyrank, mlogcnt = replicate_experts(
        tokens_per_mlog, num_physical_experts // num_nodes
    )

    # Step 3: pack physical_experts to GPUs
    # [num_layers * num_nodes, num_physical_experts // num_nodes]
    tokens_per_phy = (tokens_per_mlog / mlogcnt).gather(-1, phy2mlog)
    pack_index, rank_in_pack = balanced_packing(tokens_per_phy, num_gpus // num_nodes)
    phy2pphy = pack_index * phy_experts_per_gpu + rank_in_pack
    pphy2phy = inverse(phy2pphy)

    pphy2mlog = phy2mlog.gather(
        -1, pphy2phy
    )  # [num_layers * num_nodes, num_log_per_nodes]
    pphy2mlog = (
        pphy2mlog.view(num_layers, num_nodes, -1)
        + torch.arange(
            0,
            num_logical_experts,
            num_logical_experts // num_nodes,
            device=group_pack_index.device,
        ).view(1, -1, 1)
    ).flatten(-2)
    pphy2log = mlog2log.gather(-1, pphy2mlog)
    pphyrank = phyrank.gather(-1, pphy2phy).view(num_layers, -1)
    logcnt = mlogcnt.view(num_layers, -1).gather(-1, log2mlog)
    return pphy2log, pphyrank, logcnt


def rebalance_experts(
    weight: torch.Tensor,
    num_replicas: int,
    num_groups: int,
    num_nodes: int,
    num_gpus: int,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    """
    Entry point for expert-parallelism load balancer.

    Parameters:
        weight: [layers, num_logical_experts], the load statistics for all logical experts
        num_replicas: number of physical experts, must be a multiple of `num_gpus`
        num_groups: number of expert groups
        num_nodes: number of server nodes, where the intra-node network (e.g, NVLink) is faster
        num_gpus: number of GPUs, must be a multiple of `num_nodes`

    Returns:
        physical_to_logical_map: [layers, num_replicas], the expert index of each replica
        logical_to_physical_map: [layers, num_logical_experts, X], the replica indices for each expert
        expert_count: [layers, num_logical_experts], number of physical replicas for each logical expert
    """
    num_layers, num_logical_experts = weight.shape
    weight = weight.float().cpu()
    if num_groups % num_nodes == 0:
        # use hierarchical load-balance policy
        phy2log, phyrank, logcnt = rebalance_experts_hierarchical(
            weight, num_replicas, num_groups, num_nodes, num_gpus
        )
    else:
        # use global load-balance policy
        phy2log, phyrank, logcnt = rebalance_experts_hierarchical(
            weight, num_replicas, 1, 1, num_gpus
        )
    maxlogcnt = logcnt.max().item()
    log2phy: torch.Tensor = torch.full(
        (num_layers, num_logical_experts, maxlogcnt),
        -1,
        dtype=torch.int64,
        device=logcnt.device,
    )
    log2phy.view(num_layers, -1).scatter_(
        -1,
        phy2log * maxlogcnt + phyrank,
        torch.arange(num_replicas, dtype=torch.int64, device=log2phy.device).expand(
            num_layers, -1
        ),
    )
    return phy2log, log2phy, logcnt


__all__ = ["rebalance_experts"]

### DeepSeek-v3 EPLB 256 replicas new token dist

In [None]:
input_folder = r"C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist"
output_folder = os.path.join(input_folder, "eplb")
os.makedirs(output_folder, exist_ok=True)
weights = load_csv_to_tensor(input_folder)
if weights is None:
    raise ValueError(f"Failed to load weights from {input_folder}")

# ========== EPLB 测试配置 ==========
eplb_config = {
    "num_replicas": 256,  # Number of physical experts
    "num_groups": 1,
    "num_nodes": 1,
    "num_gpus": 8,
}


def run_eplb_all_layers():
    phy2log, log2phy, logcnt = rebalance_experts(
        weight=weights,
        num_replicas=eplb_config["num_replicas"],
        num_groups=eplb_config["num_groups"],
        num_nodes=eplb_config["num_nodes"],
        num_gpus=eplb_config["num_gpus"],
    )
    return {"phy2log": phy2log, "log2phy": log2phy, "logcnt": logcnt}


# ========== EPLB 执行时间测量 ==========
print("\nMeasuring total EPLB execution time across all layers...")


repeat_runs = 5
warmup_runs = 2
eplb_stats = measure_execution_time(
    run_eplb_all_layers, warmup=warmup_runs, repeat=repeat_runs
)

# ========== 打印 EPLB 时间统计 ==========
print(f"\nEPLB Execution Time Stats (over {repeat_runs} runs):")
print(f"  • Average: {eplb_stats['avg']:.2f} ms")
print(f"  • Std Dev: {eplb_stats['std']:.2f} ms")
print(f"  • Min:     {eplb_stats['min']:.2f} ms")
print(f"  • Max:     {eplb_stats['max']:.2f} ms")

# ========== 后续处理 ==========
phy2log = eplb_stats["result"]["phy2log"]
log2phy = eplb_stats["result"]["log2phy"]
logcnt = eplb_stats["result"]["logcnt"]

# 原始 expert load 重映射
bldm_256_replicas_new_weights = remap_weights(
    phy2log=phy2log, original_weights=original_weights
)

# 计算负载
bldm_256_replicas_gpu_loads = calculate_gpu_loads(
    bldm_256_replicas_new_weights, num_gpus
)
csv_save_path = os.path.join(output_folder, "eplb_256_replicas_gpu_loads.csv")
save_gpu_loads_to_csv(
    bldm_256_replicas_gpu_loads, output_folder, csv_save_path, num_gpus
)

# 可视化
heatmap_save_path = os.path.join(
    output_folder, "eplb_256_replicas_gpu_loads_heatmap.png"
)
boxplot_save_path = os.path.join(
    output_folder, "eplb_256_replicas_gpu_loads_boxplot.png"
)
plot_gpu_loads_analysis(
    gpu_loads=bldm_256_replicas_gpu_loads,
    heatmap_save_path=heatmap_save_path,
    boxplot_save_path=boxplot_save_path,
)

# experts moves
all_assignments = []
all_expert_moves = 0
print(f"phy2log: {phy2log}")
for i in range(phy2log.shape[0]):
    assignment = phy_to_assignment(
        phy2log=phy2log[i],
        nb_gpus=eplb_config["num_gpus"],
        nb_experts=eplb_config["num_replicas"],
    )

    layerwise_num_expert_moves = count_expert_moves(assignment)
    print(
        f"\nLayer {i} expert moves:{layerwise_num_expert_moves} \nassignment:\n{get_readable_assignment(assignment)}"
    )
    all_expert_moves += layerwise_num_expert_moves
    all_assignments.append(assignment)


print(f"Number of expert moves: {all_expert_moves}")


Measuring total EPLB execution time across all layers...

EPLB Execution Time Stats (over 5 runs):
  • Average: 600.54 ms
  • Std Dev: 32.18 ms
  • Min:     567.04 ms
  • Max:     652.90 ms


NameError: name 'original_weights' is not defined

### Profile

In [None]:
from viztracer import VizTracer

with VizTracer(output_file="eplb_profile.json") as tracer:
    tracer.start()
    results = run_eplb_all_layers()
    tracer.stop()
    tracer.save()

## BLDM

### BLDM Tensor Version

In [None]:
from typing import List, Tuple, Union

import torch
from torch import Tensor
import os
import pandas as pd
from balancing.balancers.load_balancer import LoadBalancer


class BLDMBalancer(LoadBalancer):
    def __init__(
        self,
        number_of_gpus: int,
        number_of_experts: int,
        **kwargs,
    ):
        super().__init__(number_of_gpus, number_of_experts, **kwargs)
        self.device = torch.device("cpu")

    # Section 2: https://link.springer.com/chapter/10.1007/3-540-36494-3_51
    @staticmethod
    def argsort(seq):
        # http://stackoverflow.com/questions/3071415/efficient-method-to-calculate-the-rank-vector-of-a-list-in-python
        return sorted(range(len(seq)), key=seq.__getitem__)

    def get_graph_name(self) -> str:
        return "BLDM"

    @staticmethod
    def __algo(buckets: List[List[List[Tuple[int, int]]]]):
        d_diff = []
        sums = []

        for partition in buckets:
            local_sums = torch.tensor(
                [sum(elem[0] for elem in subset) for subset in partition],
                dtype=torch.float32,
            )
            sorted_sums = torch.argsort(local_sums)
            sums.append(sorted_sums)
            d_diff.append(local_sums[sorted_sums[-1]] - local_sums[sorted_sums[0]])
        sorted_d_diff = torch.argsort(torch.stack(d_diff))
        p1_ind = sorted_d_diff[-1].item()
        p2_ind = sorted_d_diff[-2].item()
        p1 = buckets[p1_ind]
        p2 = buckets[p2_ind]
        new_partition = [
            p1[sums[p1_ind][j].item()]
            + p2[sums[p2_ind][len(sums[p2_ind]) - j - 1].item()]
            for j in range(len(sums[p1_ind]))
        ]

        del buckets[p1_ind]
        if p1_ind < p2_ind:
            p2_ind -= 1
        del buckets[p2_ind]
        buckets.append(new_partition)

    def balance(self, hotness: Tensor) -> Union[Tensor, None]:
        # From wikipedia description
        buckets: List[List[List[Tuple[int, int]]]] = []
        sorted_vals, sorted_idx = torch.sort(hotness)  # 已是升序
        hotnesses = list(
            zip(sorted_vals.tolist(), sorted_idx.tolist())  # 仅用于后面构造 buckets
        )
        assert len(hotnesses) % self.nb_gpus == 0

        # Initialization phase
        m = len(hotnesses) // self.nb_gpus
        k = 0
        for i in range(m):  # O(n)
            local_bucket = []
            for j in range(self.nb_gpus):
                local_bucket.append([hotnesses[k]])
                k = k + 1
            buckets.append(local_bucket)

        # Applying algo
        while len(buckets) > 1:
            BLDMBalancer.__algo(buckets)

        # Retrieving solution
        solution = buckets[0]
        self.solution = solution
        # if self.minimize_moves:
        # return LoadBalancer.minimize_moves(hotness, np.array(assignment))
        sequential_solution = []
        for partition in self.solution:
            for _, expert in partition:
                sequential_solution.append(expert)

        self.solution = torch.Tensor(sequential_solution)
        return self.solution

### BLDM Algorithm Code

In [19]:
from typing import List, Tuple, Union

import torch
from torch import Tensor
import os
import pandas as pd
from balancing.balancers.load_balancer import LoadBalancer


class BLDMBalancer(LoadBalancer):
    def __init__(
        self,
        number_of_gpus: int,
        number_of_experts: int,
        **kwargs,
    ):
        super().__init__(number_of_gpus, number_of_experts, **kwargs)
        self.device = torch.device("cpu")

    # Section 2: https://link.springer.com/chapter/10.1007/3-540-36494-3_51
    @staticmethod
    def argsort(seq):
        # http://stackoverflow.com/questions/3071415/efficient-method-to-calculate-the-rank-vector-of-a-list-in-python
        return sorted(range(len(seq)), key=seq.__getitem__)

    def get_graph_name(self) -> str:
        return "BLDM"

    @staticmethod
    def __algo(buckets: List[List[List[Tuple[int, int]]]]):
        d_diff = []
        sums = []

        for partition in buckets:
            local_sums = [sum(elem[0] for elem in subset) for subset in partition]
            sorted_sums = BLDMBalancer.argsort(local_sums)
            sums.append(sorted_sums)
            d_diff.append(local_sums[sorted_sums[-1]] - local_sums[sorted_sums[0]])
        sorted_d_diff = BLDMBalancer.argsort(d_diff)
        p1_ind = sorted_d_diff[-1]
        p2_ind = sorted_d_diff[-2]
        p1 = buckets[p1_ind]
        p2 = buckets[p2_ind]
        new_partition = [
            p1[sums[p1_ind][j]] + p2[sums[p2_ind][len(sums[p2_ind]) - j - 1]]
            for j in range(len(sums[p1_ind]))
        ]

        del buckets[p1_ind]
        if p1_ind < p2_ind:
            p2_ind = p2_ind - 1
        del buckets[p2_ind]
        buckets.append(new_partition)

    def balance(self, hotness: Tensor) -> Union[Tensor, None]:
        # From wikipedia description
        buckets: List[List[List[Tuple[int, int]]]] = []
        hotnesses = hotness.tolist()
        hotnesses = [(hotness, expert) for expert, hotness in enumerate(hotnesses)]
        hotnesses.sort(key=lambda x: x[0])  # O(n log n)
        assert len(hotnesses) % self.nb_gpus == 0

        # Initialization phase
        m = len(hotnesses) // self.nb_gpus
        k = 0
        for i in range(m):  # O(n)
            local_bucket = []
            for j in range(self.nb_gpus):
                local_bucket.append([hotnesses[k]])
                k = k + 1
            buckets.append(local_bucket)

        # Applying algo
        while len(buckets) > 1:
            BLDMBalancer.__algo(buckets)

        # Retrieving solution
        solution = buckets[0]
        self.solution = solution
        # if self.minimize_moves:
        # return LoadBalancer.minimize_moves(hotness, np.array(assignment))
        sequential_solution = []
        for partition in self.solution:
            for _, expert in partition:
                sequential_solution.append(expert)

        self.solution = torch.Tensor(sequential_solution)
        return self.solution

### Run BLDM

In [None]:
import os
import time
import torch
import pandas as pd
import statistics
from typing import Callable


# config
input_folder = r"C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist"
output_folder = os.path.join(input_folder, "bldm")
os.makedirs(output_folder, exist_ok=True)

num_gpus = 8
num_experts = 256
warmup_runs = 2
repeat_runs = 5

# load hotness data
all_hotness = load_csv_to_tensor(input_folder)  # [num_layers, 256]
num_layers = all_hotness.shape[0]
balancer = BLDMBalancer(
    number_of_gpus=num_gpus, number_of_experts=num_experts, minimize_moves=False
)


# wrap the function to run all layers
def run_all_layers_bldm() -> list:
    all_seq = []
    for layer_idx in range(num_layers):
        hotness = all_hotness[layer_idx]
        sequential = balancer.balance(hotness).to(torch.int64)
        all_seq.append(sequential.cpu().numpy())
    return all_seq


# execute the function and measure time
print("Measuring total BLDM execution time across all layers...")
stats = measure_execution_time(
    run_all_layers_bldm, warmup=warmup_runs, repeat=repeat_runs
)

# save phy2log mapping
all_phy2log = torch.tensor(stats["results"], dtype=torch.int64)
all_assignments = [
    phy_to_assignment(all_phy2log[i], num_gpus, num_experts)
    for i in range(all_phy2log.shape[0])
]
all_assignments_tensor = torch.stack(all_assignments, dim=0)

all_gpu_loads = []
for i in range(all_assignments_tensor.shape[0]):
    layerwise_gpu_loads = get_gpu_load(all_assignments_tensor[i], all_hotness[i])
    all_gpu_loads.append(layerwise_gpu_loads)
all_gpu_loads_tensor = torch.stack(all_gpu_loads, dim=0)


df = pd.DataFrame(all_phy2log.numpy())
csv_path = os.path.join(output_folder, "bldm_phy2log.csv")
df.to_csv(csv_path, index=False, header=False)
print(f"Saved balanced phy2log mapping to {csv_path}")

# print results
print(f"\nBLDM Execution Time Stats (over {repeat_runs} runs):")
print(f"  • Average: {stats['avg']:.2f} ms")
print(f"  • Std Dev: {stats['std']:.2f} ms")
print(f"  • Min:     {stats['min']:.2f} ms")
print(f"  • Max:     {stats['max']:.2f} ms")

Measuring total BLDM execution time across all layers...
Saved balanced phy2log mapping to C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist\bldm\bldm_phy2log.csv

BLDM Execution Time Stats (over 5 runs):
  • Average: 150.75 ms
  • Std Dev: 8.15 ms
  • Min:     141.73 ms
  • Max:     163.94 ms


In [21]:
input_folder = r"C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist"
output_folder = os.path.join(input_folder, "bldm")
os.makedirs(output_folder, exist_ok=True)

heatmap_save_path = os.path.join(output_folder, "bldm_256_replicas_gpu_loads_heatmap.png")
boxplot_save_path = os.path.join(output_folder, "bldm_256_replicas_gpu_loads_boxplot.png")
plot_gpu_loads_analysis(gpu_loads=all_gpu_loads_tensor, heatmap_save_path=heatmap_save_path, boxplot_save_path=boxplot_save_path)

Normalized heatmap saved to: C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist\bldm\bldm_256_replicas_gpu_loads_heatmap.png
Normalized boxplot saved to: C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist\bldm\bldm_256_replicas_gpu_loads_boxplot.png


### Profile 

In [None]:
from viztracer import VizTracer

with VizTracer(output_file="bldm_profile.json") as tracer:
    tracer.start()
    results = run_all_layers_bldm()
    tracer.stop()
    tracer.save()

## Simple Replication

### code

In [None]:
import os
from typing import List, Union

import pandas as pd
import torch
from torch import Tensor


class ReplicationBalancer(LoadBalancer):
    additional_experts_per_gpu: int

    def __init__(
        self,
        number_of_gpus: int,
        number_of_experts: int,
        **kwargs,
    ):
        super().__init__(number_of_gpus, number_of_experts, **kwargs)
        self.additional_experts_per_gpu = kwargs["additional_experts_per_gpu"]

    def __str__(self) -> str:
        return f"""{super().__str__()}
                Max experts on one GPU: {self.additional_experts_per_gpu}
                """

    def get_graph_name(self) -> str:
        return f"Replication_Max_{self.additional_experts_per_gpu}"

    def balance(self, hotness: Tensor) -> Union[Tensor, None]:
        hottest_experts = torch.argsort(hotness).flip(dims=[0])[
            : self.additional_experts_per_gpu
        ]
        ones = torch.ones(self.nb_experts_per_gpu)
        assignment = torch.zeros(
            size=(self.nb_gpus, self.nb_experts), dtype=torch.int64
        )
        for gpu in range(self.nb_gpus):
            assignment[gpu][
                gpu * self.nb_experts_per_gpu : (gpu + 1) * self.nb_experts_per_gpu
            ] = ones
        for hottest_expert in hottest_experts:
            for gpu in range(self.nb_gpus):
                # We are not on the original GPU the expert has been assigned to
                if gpu != hottest_expert // self.nb_experts_per_gpu:
                    assignment[gpu][hottest_expert] = 1
        return assignment

### max 8 extra experts per GPU

In [None]:
def run_simple_replication():
    all_assignments = []
    all_gpu_loads = []

    for layer_idx in range(vanilla_hotness.shape[0]):
        hotness = vanilla_hotness[layer_idx]
        assignment = balancer.balance(hotness)
        assert assignment is not None
        adjusted_hot = adjust_hotness(hotness, assignment)
        all_assignments.append(assignment)
        layerwise_gpu_load = get_gpu_load(assignment, adjusted_hot)
        all_gpu_loads.append(layerwise_gpu_load)

    return {
        "assignments": all_assignments,
        "gpu_loads": all_gpu_loads,
    }
    

input_folder = r"C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist"
output_folder = os.path.join(input_folder, f"8_simple_replication")
os.makedirs(output_folder, exist_ok=True)

balancer = ReplicationBalancer(
    number_of_gpus=8,
    number_of_experts=256,
    additional_experts_per_gpu=8,
    minimize_moves=False,
)
vanilla_hotness = load_csv_to_tensor(input_folder)

print("Measuring total ReplicationBalancer execution time across all layers...")
replication_stats = measure_execution_time(run_simple_replication, warmup=warmup_runs, repeat=repeat_runs)


print(f"\nReplicationBalancer Execution Time Stats (over {repeat_runs} runs):")
print(f"  • Average: {replication_stats['avg']:.2f} ms")
print(f"  • Std Dev: {replication_stats['std']:.2f} ms")
print(f"  • Min:     {replication_stats['min']:.2f} ms")
print(f"  • Max:     {replication_stats['max']:.2f} ms")


# to tensor [num_layers, nb_gpus, nb_experts]
all_assignments_tensor = torch.stack(replication_stats['results']['assignments'], dim=0)
all_gpu_loads_tensor = torch.stack(replication_stats['results']['gpu_loads'], dim=0)

# print(f"all gpu loads: {all_gpu_loads}")
all_gpu_loads_df = pd.DataFrame(all_gpu_loads_tensor.numpy())
csv_path = os.path.join(output_folder, "8_simple_replication_gpu_loads.csv")
all_gpu_loads_df.to_csv(csv_path, index=False, header=False)

print(f"all_assignments_tensor shape: {all_assignments_tensor.shape}")
print(f"all_gpu_loads_tensor shape: {all_gpu_loads_tensor.shape}")


heatmap_save_path = os.path.join(output_folder, "8_simple_replication_gpu_loads_heatmap.png")
boxplot_save_path = os.path.join(output_folder, "8_simple_replication_gpu_loads_boxplot.png")
plot_gpu_loads_analysis(gpu_loads=all_gpu_loads_tensor, heatmap_save_path=heatmap_save_path, boxplot_save_path=boxplot_save_path)


# experts moves
all_assignments = []
all_expert_moves = 0
for i in range(phy2log.shape[0]):
    assignment = all_assignments_tensor[i]
    layerwise_num_expert_moves = count_expert_moves(assignment)
    all_expert_moves += layerwise_num_expert_moves
    all_assignments.append(assignment)
    


print(f"Number of expert moves: {all_expert_moves}")

### max 16 extra experts per GPU

In [None]:
def run_simple_replication():
    all_assignments = []
    all_gpu_loads = []

    for layer_idx in range(vanilla_hotness.shape[0]):
        hotness = vanilla_hotness[layer_idx]
        assignment = balancer.balance(hotness)
        assert assignment is not None
        adjusted_hot = adjust_hotness(hotness, assignment)
        all_assignments.append(assignment)
        layerwise_gpu_load = get_gpu_load(assignment, adjusted_hot)
        all_gpu_loads.append(layerwise_gpu_load)

    return {
        "assignments": all_assignments,
        "gpu_loads": all_gpu_loads,
    }
    

input_folder = r"C:\Users\bingxche\data\log\deepseek-v3_tp8_mixtral_dataset_5000_prompts_vanilla\moe_token_dist"
output_folder = os.path.join(input_folder, f"16_simple_replication")
os.makedirs(output_folder, exist_ok=True)

balancer = ReplicationBalancer(
    number_of_gpus=8,
    number_of_experts=256,
    additional_experts_per_gpu=16,
    minimize_moves=False,
)
vanilla_hotness = load_csv_to_tensor(input_folder)

print("Measuring total ReplicationBalancer execution time across all layers...")
replication_stats = measure_execution_time(run_simple_replication, warmup=warmup_runs, repeat=repeat_runs)


print(f"\nReplicationBalancer Execution Time Stats (over {repeat_runs} runs):")
print(f"  • Average: {replication_stats['avg']:.2f} ms")
print(f"  • Std Dev: {replication_stats['std']:.2f} ms")
print(f"  • Min:     {replication_stats['min']:.2f} ms")
print(f"  • Max:     {replication_stats['max']:.2f} ms")


# to tensor [num_layers, nb_gpus, nb_experts]
all_assignments_tensor = torch.stack(replication_stats['results']['assignments'], dim=0)
all_gpu_loads_tensor = torch.stack(replication_stats['results']['gpu_loads'], dim=0)

# print(f"all gpu loads: {all_gpu_loads}")
all_gpu_loads_df = pd.DataFrame(all_gpu_loads_tensor.numpy())
csv_path = os.path.join(output_folder, "8_simple_replication_gpu_loads.csv")
all_gpu_loads_df.to_csv(csv_path, index=False, header=False)

print(f"all_assignments_tensor shape: {all_assignments_tensor.shape}")
print(f"all_gpu_loads_tensor shape: {all_gpu_loads_tensor.shape}")


heatmap_save_path = os.path.join(output_folder, "8_simple_replication_gpu_loads_heatmap.png")
boxplot_save_path = os.path.join(output_folder, "8_simple_replication_gpu_loads_boxplot.png")
plot_gpu_loads_analysis(gpu_loads=all_gpu_loads_tensor, heatmap_save_path=heatmap_save_path, boxplot_save_path=boxplot_save_path)


# experts moves
all_assignments = []
all_expert_moves = 0
for i in range(phy2log.shape[0]):
    assignment = all_assignments_tensor[i]
    layerwise_num_expert_moves = count_expert_moves(assignment)
    all_expert_moves += layerwise_num_expert_moves
    all_assignments.append(assignment)
    


print(f"Number of expert moves: {all_expert_moves}")