In [None]:
import torch
import time
from new_model.model_segments import load_segment_models_nlcc

def test_pipeline_local(n_workers, k_workers):
    """
    测试本地的 NLCC 完整流程
    """
    print(f"\n--- Testing with N={n_workers}, K={k_workers} ---")

    # 1. 加载模型
    # 确保 load_segment_models_nlcc 接受 n 和 k
    try:
        master_models, worker_models = load_segment_models_nlcc(n_workers, k_workers)
    except Exception as e:
        print(f"Error loading models: {e}")
        print("确保 'load_segment_models_nlcc' 接受 (n, k) 参数")
        return

    # 假设只有一个 segment 'seg1'
    encoder, coder, final_decoder = master_models['seg1']
    worker_decoder = worker_models['seg1']

    # 2. 创建一个虚拟输入
    # (batch_size, channels, H, W)
    x = torch.randn((1, 3, 224, 224))
    print(f"Input shape: {x.shape}")

    # 3. 模拟 Master 端的 Encoder 和 Coder
    with torch.no_grad():
        z = encoder(x)
        print(f"Latent shape (z): {z.shape}")
        
        # zs_coded 是一个包含 N 个张量的列表
        zs_coded = coder(z) 
        print(f"Coded latent pieces: {len(zs_coded)}, Shape of one: {zs_coded[0].shape}")

    # 4. 模拟 K 个 Worker 的工作
    # 我们从 N 个编码块中只选择 K 个
    # 关键：选择任意 K 个，例如前 K 个
    k_indices_to_use = list(range(k_workers))
    k_coded_outputs = []

    print(f"Simulating {k_workers} workers (indices: {k_indices_to_use})...")
    with torch.no_grad():
        for i in k_indices_to_use:
            z_coded_i = zs_coded[i]
            # 模拟 Worker i 的解码
            y_coded_i = worker_decoder(z_coded_i)
            k_coded_outputs.append(y_coded_i)
            
    print(f"Worker intermediate outputs: {len(k_coded_outputs)}, Shape of one: {k_coded_outputs[0].shape}")

    # 5. 模拟 Master 端的 FinalDecoder
    with torch.no_grad():
        start_time = time.time()
        # FinalDecoder 必须能够从这 K 个块和它们的索引中重建
        y_pred = final_decoder(k_coded_outputs, k_indices_to_use)
        duration = time.time() - start_time

    print(f"Final output shape (y_pred): {y_pred.shape}")
    print(f"FinalDecoder duration: {duration:.4f}s")
    
    # 检查最终形状是否符合预期
    # (例如，您在 model_segments_nlcc.py 中定义的 final_output_shape)
    expected_shape = (1, 64, 112, 112) 
    assert y_pred.shape == expected_shape
    print(f"Test N={n_workers}, K={k_workers} PASSED!")

In [7]:

# --- 运行测试 ---
if __name__ == "__main__":
    # 测试 1: "uncoded" / "repetition" 场景 (k=n)
    # n=2, k=2
    test_pipeline_local(n_workers=2, k_workers=2)

    # 测试 2: *真正的 NLCC 容错场景* (k < n)
    # n=3, k=2
    test_pipeline_local(n_workers=3, k_workers=2)


--- Testing with N=2, K=2 ---
Input shape: torch.Size([1, 3, 224, 224])
Latent shape (z): torch.Size([1, 16, 56, 56])
Coded latent pieces: 2, Shape of one: torch.Size([1, 512])
Simulating 2 workers (indices: [0, 1])...
Worker intermediate outputs: 2, Shape of one: torch.Size([1, 64, 28, 28])
Final output shape (y_pred): torch.Size([1, 64, 112, 112])
FinalDecoder duration: 4.2502s
Test N=2, K=2 PASSED!

--- Testing with N=3, K=2 ---
Input shape: torch.Size([1, 3, 224, 224])
Latent shape (z): torch.Size([1, 16, 56, 56])
Coded latent pieces: 3, Shape of one: torch.Size([1, 512])
Simulating 2 workers (indices: [0, 1])...
Worker intermediate outputs: 2, Shape of one: torch.Size([1, 64, 28, 28])
Final output shape (y_pred): torch.Size([1, 64, 112, 112])
FinalDecoder duration: 7.1004s
Test N=3, K=2 PASSED!


In [8]:
import torch
from torchvision import models
import sys

# 确保 model_utils 和 model_segments 可以在 Python 路径中被找到
# 假设这些文件与本测试脚本在同一目录下
try:
    from model_utils import auto_segment_model
    from model_segments import load_segment_models_dynamically
except ImportError:
    print("错误: 请确保 'model_utils.py' 和 'model_segments.py' 文件与此测试脚本在同一目录下。")
    sys.exit(1)

def run_test_for_model(model_name, input_shape, k, r):
    """
    对指定的模型运行一套完整的分割和加载测试。
    """
    print(f"\n{'='*20} 开始测试: {model_name.upper()} {'='*20}")
    
    try:
        # --- 步骤 1: 调用动态加载器 ---
        # 这一步同时测试了 model_utils.py 和 model_segments.py
        print(f"--> 正在调用 load_segment_models_dynamically for '{model_name}'...")
        master_models, worker_models, pooling_layers = load_segment_models_dynamically(
            model_name=model_name,
            k_workers=k,
            r_workers=r,
            input_shape=input_shape
        )
        print("--> 动态加载成功！")

        # --- 步骤 2: 验证输出的正确性 ---
        num_blocks = len(master_models)
        print(f"\n--- 验证结果 ---")
        print(f"识别出的卷积块数量: {num_blocks}")
        print(f"识别出的池化层数量: {len(pooling_layers)}")
        
        # 断言检查
        assert num_blocks > 0, f"测试失败: 未能为 '{model_name}' 识别出任何卷积块。"
        assert len(master_models) == len(worker_models), "测试失败: Master 和 Worker 的模型数量不匹配。"
        # 通常，池化层的数量比卷积块少一个
        assert abs(len(master_models) - len(pooling_layers)) <= 1, "测试失败: 池化层和卷积块的数量关系不正确。"

        print("\n--- 详细配置检查 ---")
        for i, block_name in enumerate(master_models.keys()):
            print(f"  - 块 {i+1}: '{block_name}'")
            assert block_name in worker_models, f"测试失败: Worker 模型中缺少 '{block_name}'。"
            
            encoder, final_decoder = master_models[block_name]
            worker_decoder = worker_models[block_name]
            
            # 检查模型类型是否正确
            assert isinstance(encoder, torch.nn.Module), f"测试失败: '{block_name}' 的 Encoder 不是一个 nn.Module。"
            assert isinstance(final_decoder, torch.nn.Module), f"测试失败: '{block_name}' 的 FinalDecoder 不是一个 nn.Module。"
            assert isinstance(worker_decoder, torch.nn.Module), f"测试失败: '{block_name}' 的 WorkerDecoder 不是一个 nn.Module。"
            print(f"    - Encoder, FinalDecoder, WorkerDecoder 实例已成功创建。")

        print(f"\n{'='*20} 测试成功: {model_name.upper()} {'='*20}")
        return True

    except Exception as e:
        print(f"\n{'!'*20} 测试失败: {model_name.upper()} {'!'*20}")
        print(f"错误信息: {e}")
        import traceback
        traceback.print_exc()
        return False

if __name__ == '__main__':
    # 定义测试参数
    k = 4  # 系统化任务数
    r = 2  # 校验任务数
    
    # --- 测试 VGG16 ---
    vgg_input_shape = (1, 3, 224, 224)
    vgg_success = run_test_for_model('vgg16', vgg_input_shape, k, r)
    
    print("\n" + "-"*50 + "\n")
    
    # --- 测试 AlexNet ---
    alexnet_input_shape = (1, 3, 224, 224) # AlexNet 也使用 224x224
    alexnet_success = run_test_for_model('alexnet', alexnet_input_shape, k, r)

    print("\n" + "#"*50)
    if vgg_success and alexnet_success:
        print("所有测试均已通过！自动化分割和动态加载功能工作正常。")
    else:
        print("部分或全部测试失败。请检查上面的错误日志。")
    print("#"*50)


ImportError: DLL load failed while importing _imaging: 操作系统无法运行 %1。