In [4]:
import numpy as np

# =========================
# 模拟 Flower 的 FedAvg 调用格式:
# results = [(num_examples, parameters, metrics), ...]
#
# parameters 是一个: List[np.ndarray]
# 每个 ndarray 对应一层参数
# =========================

# 假设模型有 3 层参数（layer0, layer1, layer2）
client1_params = [
    np.array([1.0, 2.0, 3.0]),        # layer 0
    np.array([10.0, 20.0, 30.0]),     # layer 1
    np.array([5.0])                   # layer 2
]

client2_params = [
    np.array([2.0, 4.0, 6.0]),
    np.array([15.0, 25.0, 35.0]),
    np.array([7.0])
]

client3_params = [
    np.array([9.0, 8.0, 7.0]),
    np.array([50.0, 40.0, 30.0]),
    np.array([9.0])
]

# Flower 的 results 结构（完全匹配你的服务器）
results = [
    (100, client1_params, {}),   # (num_examples, parameters, metrics)
    (100, client2_params, {}),
    (100, client3_params, {})
]

# =========================
# 你的 FedMedian 逻辑（原封不动）
# =========================

def fedmedian_aggregate(results):
    weights = [np.array([p for p in params], dtype=object) for _, params, _ in results]
    weights = np.array(weights, dtype=object)

    aggregated = []
    for layer in range(len(weights[0])):
        layer_vals = np.array([client[layer] for client in weights], dtype=float)
        median_layer = np.median(layer_vals, axis=0)
        aggregated.append(median_layer)

    return aggregated

# =========================
# 执行测试
# =========================

output = fedmedian_aggregate(results)

print("\n=== FedMedian Test Output ===")
for i, layer in enumerate(output):
    print(f"Layer {i}: {layer}")



=== FedMedian Test Output ===
Layer 0: [2. 4. 6.]
Layer 1: [15. 25. 30.]
Layer 2: [7.]


In [None]:
import numpy as np

# =========================
# 模拟 Flower 的 FedAvg 调用格式:
# results = [(num_examples, parameters, metrics), ...]
# 每个参数是一个 np.ndarray，对应模型的一层
# =========================

# 造与前面一样的 3 个客户端参数
client1_params = [
    np.array([1.0, 2.0, 3.0]),
    np.array([10.0, 20.0, 30.0]),
    np.array([5.0])
]

client2_params = [
    np.array([2.0, 4.0, 6.0]),
    np.array([15.0, 25.0, 35.0]),
    np.array([7.0])
]

client3_params = [
    np.array([9.0, 8.0, 7.0]),
    np.array([50.0, 40.0, 30.0]),
    np.array([9.0])
]

# Flower 的 results 结构
results = [
    (100, client1_params, {}),
    (100, client2_params, {}),
    (100, client3_params, {})
]

# =========================
# 你的 FedTrimmedAvg 聚合逻辑（独立测试版本）
# =========================

def trimmed_mean_aggregate(results, trim_ratio=0.2):
    weights = [np.array([p for p in params], dtype=object) for _, params, _ in results]
    weights = np.array(weights, dtype=object)

    aggregated = []
    num_clients = len(weights)
    k = int(num_clients * trim_ratio)

    for layer in range(len(weights[0])):
        layer_vals = np.array([client[layer] for client in weights], dtype=float)

        # sort per coordinate
        sorted_vals = np.sort(layer_vals, axis=0)

        # trim k smallest and k largest
        if k > 0:
            trimmed = sorted_vals[k: num_clients - k]
        else:
            trimmed = sorted_vals

        trimmed_mean = np.mean(trimmed, axis=0)
        aggregated.append(trimmed_mean)

    return aggregated

# =========================
# 执行测试
# =========================

output = trimmed_mean_aggregate(results, trim_ratio=0.2)

print("\n=== FedTrimmedAvg Test Output ===")
for i, layer in enumerate(output):
    print(f"Layer {i}: {layer}")
