In [1]:
import numpy as np

def explain_weights():
    """解释权重矩阵如何影响成本计算"""
    
    # 定义权重
    Q = np.array([[10.0, 0], [0, 1.0]])  # 状态权重
    R = np.array([[0.1]])                # 控制权重
    target = np.array([5.0, 0.0])        # 目标：位置5m，速度0m/s
    
    print("=== 权重矩阵示例 ===")
    print(f"Q矩阵（状态权重）:\n{Q}")
    print(f"R矩阵（控制权重）:\n{R}")
    print(f"目标状态: {target}")
    
    # 情况1：位置误差大
    state1 = np.array([2.0, 0.1])  # 位置2m，速度0.1m/s
    control1 = np.array([1.0])     # 加速度1m/s²
    
    state_error1 = state1 - target
    state_cost1 = 0.5 * state_error1.T @ Q @ state_error1
    control_cost1 = 0.5 * control1.T @ R @ control1
    total_cost1 = state_cost1 + control_cost1
    
    print(f"\n--- 情况1：位置误差大 ---")
    print(f"当前状态: {state1}")
    print(f"状态误差: {state_error1}")
    print(f"状态成本: {state_cost1:.2f}")
    print(f"控制成本: {control_cost1:.2f}")
    print(f"总成本: {total_cost1:.2f}")
    
    # 情况2：速度误差大
    state2 = np.array([4.9, 2.0])  # 位置4.9m，速度2m/s
    control2 = np.array([1.0])     # 加速度1m/s²
    
    state_error2 = state2 - target
    state_cost2 = 0.5 * state_error2.T @ Q @ state_error2
    control_cost2 = 0.5 * control2.T @ R @ control2
    total_cost2 = state_cost2 + control_cost2
    
    print(f"\n--- 情况2：速度误差大 ---")
    print(f"当前状态: {state2}")
    print(f"状态误差: {state_error2}")
    print(f"状态成本: {state_cost2:.2f}")
    print(f"控制成本: {control_cost2:.2f}")
    print(f"总成本: {total_cost2:.2f}")
    
    # 分析
    print(f"\n=== 分析 ===")
    print(f"位置误差3m的惩罚: {10.0 * 3**2 / 2:.1f}")
    print(f"速度误差2m/s的惩罚: {1.0 * 2**2 / 2:.1f}") 
    print("位置误差的惩罚更重！")

# 运行示例
explain_weights()


=== 权重矩阵示例 ===
Q矩阵（状态权重）:
[[10.  0.]
 [ 0.  1.]]
R矩阵（控制权重）:
[[0.1]]
目标状态: [5. 0.]

--- 情况1：位置误差大 ---
当前状态: [2.  0.1]
状态误差: [-3.   0.1]
状态成本: 45.01
控制成本: 0.05
总成本: 45.05

--- 情况2：速度误差大 ---
当前状态: [4.9 2. ]
状态误差: [-0.1  2. ]
状态成本: 2.05
控制成本: 0.05
总成本: 2.10

=== 分析 ===
位置误差3m的惩罚: 45.0
速度误差2m/s的惩罚: 2.0
位置误差的惩罚更重！


In [2]:
import numpy as np

import numpy as np

"""解释权重矩阵如何影响成本计算"""
    
    # 定义权重
"""解释权重矩阵如何影响成本计算"""
    
# 定义权重
Q = np.array([[10.0, 0], [0, 1.0]])  # 状态权重
R = np.array([[0.1]])                # 控制权重
target = np.array([5.0, 0.0])        # 目标：位置5m，速度0m/s

print("=== 权重矩阵示例 ===")
print(f"Q矩阵（    状态权重）:\n{Q}")
print(f"R矩阵（控制权重）:\n{R}")
print(f"目标状态: {target}")



=== 权重矩阵示例 ===
Q矩阵（    状态权重）:
[[10.  0.]
 [ 0.  1.]]
R矩阵（控制权重）:
[[0.1]]
目标状态: [5. 0.]


In [3]:
# 情况1：位置误差大
state1 = np.array([2.0, 0.1])  # 位置2m，速度0.1m/s
control1 = np.array([1.0])     # 加速度1m/s²

state_error1 = state1 - target
state_cost1 = 0.5 * state_error1.T @ Q @ state_error1
control_cost1 = 0.5 * control1.T @ R @ control1
total_cost1 = state_cost1 + control_cost1

print(f"\n--- 情况1：位置误差大 ---")
print(f"当前状态: {state1}")
print(f"状态误差: {state_error1}")
print(f"状态成本: {state_cost1:.2f}")
print(f"控制成本: {control_cost1:.2f}")
print(f"总成本: {total_cost1:.2f}")

#


--- 情况1：位置误差大 ---
当前状态: [2.  0.1]
状态误差: [-3.   0.1]
状态成本: 45.01
控制成本: 0.05
总成本: 45.05


In [6]:
import time
import numpy as np

def performance_comparison():
    # 大规模测试
    n_states = 100
    n_tests = 10000
    
    # 生成测试数据
    states = np.random.randn(n_tests, n_states)
    targets = np.random.randn(n_tests, n_states)
    Q = np.eye(n_states) * 10
    
    # 矩阵方式
    start = time.time()
    for i in range(n_tests):
        error = states[i] - targets[i]
        cost = 0.5 * error.T @ Q @ error
    matrix_time = time.time() - start
    
    # 单独变量方式
    start = time.time()
    for i in range(n_tests):
        cost = 0
        for j in range(n_states):
            error = states[i,j] - targets[i,j]
            cost += 0.5 * error**2 * Q[j,j]
    separate_time = time.time() - start
    
    print(f"矩阵方式: {matrix_time:.4f}秒")
    print(f"单独变量: {separate_time:.4f}秒")
    print(f"矩阵方式快 {separate_time/matrix_time:.1f}倍")
performance_comparison()  # 矩阵通常快很多


矩阵方式: 0.1478秒
单独变量: 0.4724秒
矩阵方式快 3.2倍


In [3]:
import numpy as np
import time

def comprehensive_performance_test():
    """综合性能测试：CPU矩阵 vs CPU循环 vs GPU(如果可用)"""
    
    print("=== 综合性能对比测试 ===")
    
    # 测试GPU可用性
    try:
        import cupy as cp
        # 尝试简单的GPU操作
        test_array = cp.array([1, 2, 3])
        _ = cp.sum(test_array)
        GPU_AVAILABLE = True
        print("✅ GPU可用 (CuPy)")
    except Exception as e:
        GPU_AVAILABLE = False
        print(f"❌ GPU不可用: {e}")
    
    # 测试配置
    test_configs = [
        (50, 5000),    # 中等规模
        (100, 10000),  # 大规模
        (200, 5000),   # 超大规模
    ]
    
    for n_states, n_tests in test_configs:
        print(f"\n--- 测试规模: {n_states}维状态, {n_tests}次计算 ---")
        
        # 生成测试数据
        np.random.seed(42)
        states = np.random.randn(n_tests, n_states).astype(np.float32)
        targets = np.random.randn(n_tests, n_states).astype(np.float32)
        Q = np.eye(n_states, dtype=np.float32) * 10
        
        # 方法1: CPU矩阵运算
        start_time = time.time()
        cpu_costs_matrix = []
        for i in range(n_tests):
            error = states[i] - targets[i]
            cost = 0.5 * error.T @ Q @ error
            cpu_costs_matrix.append(cost)
        cpu_matrix_time = time.time() - start_time
        
        # 方法2: CPU循环运算
        start_time = time.time()
        cpu_costs_loop = []
        for i in range(n_tests):
            cost = 0
            for j in range(n_states):
                error = states[i,j] - targets[i,j]
                cost += 0.5 * error**2 * Q[j,j]
            cpu_costs_loop.append(cost)
        cpu_loop_time = time.time() - start_time
        
        # 方法3: CPU向量化（最优CPU方法）
        start_time = time.time()
        errors_batch = states - targets
        cpu_costs_vectorized = 0.5 * np.sum(errors_batch * (errors_batch @ Q), axis=1)
        cpu_vectorized_time = time.time() - start_time
        
        # 结果输出
        print(f"CPU矩阵运算:   {cpu_matrix_time:.4f}秒")
        print(f"CPU循环运算:   {cpu_loop_time:.4f}秒")
        print(f"CPU向量化:     {cpu_vectorized_time:.4f}秒")
        print(f"矩阵 vs 循环:  {cpu_loop_time/cpu_matrix_time:.1f}x 加速")
        print(f"向量化加速:    {cpu_matrix_time/cpu_vectorized_time:.1f}x 加速")
        
        # GPU测试（如果可用）
        if GPU_AVAILABLE:
            try:
                # 数据传输到GPU
                states_gpu = cp.asarray(states)
                targets_gpu = cp.asarray(targets)
                Q_gpu = cp.asarray(Q)
                
                # 方法4: GPU逐个计算
                start_time = time.time()
                gpu_costs_individual = []
                for i in range(n_tests):
                    error = states_gpu[i] - targets_gpu[i]
                    cost = 0.5 * error.T @ Q_gpu @ error
                    gpu_costs_individual.append(float(cost))
                gpu_individual_time = time.time() - start_time
                
                # 方法5: GPU批量计算（最优GPU方法）
                cp.cuda.Stream.null.synchronize()  # 同步
                start_time = time.time()
                errors_gpu = states_gpu - targets_gpu
                gpu_costs_batch = 0.5 * cp.sum(errors_gpu * (errors_gpu @ Q_gpu), axis=1)
                cp.cuda.Stream.null.synchronize()
                gpu_batch_time = time.time() - start_time
                
                # 方法6: GPU纯计算时间（数据已在GPU）
                cp.cuda.Stream.null.synchronize()
                start_time = time.time()
                for _ in range(100):  # 多次运行取平均
                    errors_gpu = states_gpu - targets_gpu
                    _ = 0.5 * cp.sum(errors_gpu * (errors_gpu @ Q_gpu), axis=1)
                cp.cuda.Stream.null.synchronize()
                gpu_pure_time = (time.time() - start_time) / 100
                
                print(f"GPU逐个计算:   {gpu_individual_time:.4f}秒")
                print(f"GPU批量计算:   {gpu_batch_time:.4f}秒")
                print(f"GPU纯计算:     {gpu_pure_time:.6f}秒")
                print(f"GPU vs CPU矩阵: {cpu_matrix_time/gpu_batch_time:.1f}x 加速")
                print(f"GPU vs CPU最优: {cpu_vectorized_time/gpu_batch_time:.1f}x 加速")
                
                # 验证结果正确性
                cpu_sum = np.sum(cpu_costs_vectorized)
                gpu_sum = float(cp.sum(gpu_costs_batch))
                print(f"结果验证: CPU={cpu_sum:.2f}, GPU={gpu_sum:.2f}, 误差={abs(cpu_sum-gpu_sum):.6f}")
                
            except Exception as e:
                print(f"GPU计算出错: {e}")
        
        print("-" * 60)

def memory_and_scaling_test():
    """内存使用和规模化测试"""
    
    print("\n=== 内存使用和规模化测试 ===")
    
    # 测试不同数据规模的内存使用
    sizes = [100, 500, 1000, 2000]
    
    for size in sizes:
        # CPU内存使用
        cpu_data = np.random.randn(size, size).astype(np.float32)
        cpu_memory_mb = cpu_data.nbytes / (1024**2)
        
        print(f"规模 {size}x{size}:")
        print(f"  CPU内存: {cpu_memory_mb:.1f} MB")
        
        # GPU内存使用（如果可用）
        try:
            import cupy as cp
            gpu_data = cp.asarray(cpu_data)
            gpu_memory_mb = gpu_data.nbytes / (1024**2)
            print(f"  GPU内存: {gpu_memory_mb:.1f} MB")
            
            # GPU内存池信息
            mempool = cp.get_default_memory_pool()
            print(f"  GPU池使用: {mempool.used_bytes() / (1024**2):.1f} MB")
            
            # 清理GPU内存
            del gpu_data
            mempool.free_all_blocks()
            
        except:
            print(f"  GPU内存: 不可用")

def advanced_gpu_optimization():
    """高级GPU优化技术演示"""
    
    try:
        import cupy as cp
        print("\n=== 高级GPU优化技术 ===")
        
        # 测试配置
        n_states = 100
        n_trajectories = 1000
        horizon = 50
        
        # 生成测试数据
        states_trajectories = cp.random.randn(n_trajectories, horizon, n_states)
        target = cp.zeros(n_states)
        Q = cp.eye(n_states) * 10.0
        
        # 方法1: 标准GPU计算
        start_time = time.time()
        costs_standard = []
        for i in range(n_trajectories):
            trajectory_cost = 0
            for t in range(horizon):
                error = states_trajectories[i, t] - target
                cost = 0.5 * error.T @ Q @ error
                trajectory_cost += cost
            costs_standard.append(trajectory_cost)
        cp.cuda.Stream.null.synchronize()
        standard_time = time.time() - start_time
        
        # 方法2: 高度优化的GPU批量计算
        start_time = time.time()
        # 一次性计算所有轨迹的所有时间步
        errors_all = states_trajectories - target  # (n_trajectories, horizon, n_states)
        # 批量计算二次型 
        costs_batch = 0.5 * cp.sum(errors_all * cp.tensordot(errors_all, Q, axes=([2], [0])), axis=2)
        trajectory_costs = cp.sum(costs_batch, axis=1)  # 沿时间轴求和
        cp.cuda.Stream.null.synchronize()
        optimized_time = time.time() - start_time
        
        print(f"标准GPU计算:   {standard_time:.4f}秒")
        print(f"优化GPU计算:   {optimized_time:.4f}秒")
        print(f"优化加速比:    {standard_time/optimized_time:.1f}x")
        
        # 验证结果
        print(f"结果验证: 标准={cp.sum(costs_standard):.2f}, 优化={cp.sum(trajectory_costs):.2f}")
        
    except Exception as e:
        print(f"高级GPU优化测试失败: {e}")

def practical_mpc_gpu_demo():
    """实际MPC应用中的GPU加速演示"""
    
    print("\n=== 实际MPC应用GPU加速演示 ===")
    
    try:
        import cupy as cp
        
        class HighPerformanceMPC:
            def __init__(self, n_states, n_controls, horizon, use_gpu=True):
                self.n_states = n_states
                self.n_controls = n_controls
                self.horizon = horizon
                self.use_gpu = use_gpu
                
                if use_gpu:
                    self.xp = cp
                    self.Q = cp.eye(n_states, dtype=cp.float32) * 10.0
                    self.R = cp.eye(n_controls, dtype=cp.float32) * 0.1
                    self.target = cp.zeros(n_states, dtype=cp.float32)
                else:
                    self.xp = np
                    self.Q = np.eye(n_states, dtype=np.float32) * 10.0
                    self.R = np.eye(n_controls, dtype=np.float32) * 0.1
                    self.target = np.zeros(n_states, dtype=np.float32)
            
            def evaluate_multiple_trajectories(self, states_batch, controls_batch):
                """批量评估多条轨迹的成本"""
                n_candidates = states_batch.shape[0]
                
                # 状态成本计算
                state_errors = states_batch - self.target  # (n_candidates, horizon+1, n_states)
                state_costs = 0.5 * self.xp.sum(
                    state_errors * (state_errors @ self.Q), axis=2
                )  # (n_candidates, horizon+1)
                
                # 控制成本计算
                control_costs = 0.5 * self.xp.sum(
                    controls_batch * (controls_batch @ self.R), axis=2
                )  # (n_candidates, horizon)
                
                # 总成本：状态成本 + 控制成本 + 终端成本
                total_costs = (
                    self.xp.sum(state_costs[:, :-1], axis=1) +  # 中间状态成本
                    self.xp.sum(control_costs, axis=1) +        # 控制成本
                    state_costs[:, -1]                          # 终端成本
                )
                
                return total_costs
        
        # 性能测试
        n_states, n_controls, horizon = 12, 4, 30
        n_candidates = 1000
        
        # 生成候选轨迹
        states_batch_cpu = np.random.randn(n_candidates, horizon+1, n_states).astype(np.float32)
        controls_batch_cpu = np.random.randn(n_candidates, horizon, n_controls).astype(np.float32)
        
        # CPU测试
        mpc_cpu = HighPerformanceMPC(n_states, n_controls, horizon, use_gpu=False)
        start_time = time.time()
        costs_cpu = mpc_cpu.evaluate_multiple_trajectories(states_batch_cpu, controls_batch_cpu)
        cpu_time = time.time() - start_time
        
        # GPU测试
        states_batch_gpu = cp.asarray(states_batch_cpu)
        controls_batch_gpu = cp.asarray(controls_batch_cpu)
        mpc_gpu = HighPerformanceMPC(n_states, n_controls, horizon, use_gpu=True)
        
        cp.cuda.Stream.null.synchronize()
        start_time = time.time()
        costs_gpu = mpc_gpu.evaluate_multiple_trajectories(states_batch_gpu, controls_batch_gpu)
        cp.cuda.Stream.null.synchronize()
        gpu_time = time.time() - start_time
        
        print(f"MPC轨迹批量评估 ({n_candidates}条轨迹):")
        print(f"CPU时间: {cpu_time:.4f}秒")
        print(f"GPU时间: {gpu_time:.4f}秒")
        print(f"GPU加速比: {cpu_time/gpu_time:.1f}x")
        
        # 验证结果
        costs_cpu_sum = np.sum(costs_cpu)
        costs_gpu_sum = float(cp.sum(costs_gpu))
        print(f"结果验证: CPU={costs_cpu_sum:.2f}, GPU={costs_gpu_sum:.2f}")
        
    except Exception as e:
        print(f"实际MPC演示失败: {e}")


if __name__ == "__main__":
    # 运行所有测试
    comprehensive_performance_test()
    memory_and_scaling_test()
    advanced_gpu_optimization()
    practical_mpc_gpu_demo()


=== 综合性能对比测试 ===
❌ GPU不可用: cudaErrorUnknown: unknown error

--- 测试规模: 50维状态, 5000次计算 ---
CPU矩阵运算:   0.0163秒
CPU循环运算:   0.3736秒
CPU向量化:     0.0070秒
矩阵 vs 循环:  23.0x 加速
向量化加速:    2.3x 加速
------------------------------------------------------------

--- 测试规模: 100维状态, 10000次计算 ---
CPU矩阵运算:   0.5226秒
CPU循环运算:   1.7716秒
CPU向量化:     0.0030秒
矩阵 vs 循环:  3.4x 加速
向量化加速:    175.7x 加速
------------------------------------------------------------

--- 测试规模: 200维状态, 5000次计算 ---
CPU矩阵运算:   0.2036秒
CPU循环运算:   2.4831秒
CPU向量化:     0.0055秒
矩阵 vs 循环:  12.2x 加速
向量化加速:    36.8x 加速
------------------------------------------------------------

=== 内存使用和规模化测试 ===
规模 100x100:
  CPU内存: 0.0 MB
  GPU内存: 不可用
规模 500x500:
  CPU内存: 1.0 MB
  GPU内存: 不可用
规模 1000x1000:
  CPU内存: 3.8 MB
  GPU内存: 不可用
规模 2000x2000:
  CPU内存: 15.3 MB
  GPU内存: 不可用

=== 高级GPU优化技术 ===
高级GPU优化测试失败: cudaErrorUnknown: unknown error

=== 实际MPC应用GPU加速演示 ===
实际MPC演示失败: cudaErrorUnknown: unknown error


In [4]:
import numpy as np
import time
import matplotlib.pyplot as plt

def vectorization_vs_tensorization_comparison():
    """向量化 vs 张量化性能对比"""
    
    print("=== 向量化 vs 张量化性能对比 ===")
    
    # 测试场景1: 批量矩阵-向量乘法
    print("\n场景1: 批量矩阵-向量乘法")
    test_batch_matrix_vector()
    
    # 测试场景2: 批量二次型计算
    print("\n场景2: 批量二次型计算")
    test_batch_quadratic_form()
    
    # 测试场景3: 多维数据处理
    print("\n场景3: 多维轨迹数据处理")
    test_multidimensional_processing()
    
    # 测试场景4: MPC中的实际应用
    print("\n场景4: MPC实际应用对比")
    test_mpc_application()

def test_batch_matrix_vector():
    """测试批量矩阵-向量乘法"""
    
    batch_sizes = [100, 500, 1000, 2000]
    matrix_size = 50
    
    for batch_size in batch_sizes:
        print(f"\n批量大小: {batch_size}, 矩阵大小: {matrix_size}x{matrix_size}")
        
        # 生成测试数据
        matrices = np.random.randn(batch_size, matrix_size, matrix_size).astype(np.float32)
        vectors = np.random.randn(batch_size, matrix_size).astype(np.float32)
        
        # 方法1: 循环方式（基准）
        start_time = time.time()
        results_loop = []
        for i in range(batch_size):
            result = matrices[i] @ vectors[i]
            results_loop.append(result)
        results_loop = np.array(results_loop)
        loop_time = time.time() - start_time
        
        # 方法2: 向量化方式
        start_time = time.time()
        # 使用einsum进行向量化计算
        results_vectorized = np.einsum('bij,bi->bj', matrices, vectors)
        vectorized_time = time.time() - start_time
        
        # 方法3: 张量化方式
        start_time = time.time()
        # 使用批量矩阵乘法
        vectors_expanded = vectors[:, :, np.newaxis]  # (batch, matrix_size, 1)
        results_tensorized = np.matmul(matrices, vectors_expanded).squeeze(-1)
        tensorized_time = time.time() - start_time
        
        # 方法4: 高度优化的张量操作
        start_time = time.time()
        results_optimized = np.sum(matrices * vectors[:, np.newaxis, :], axis=2)
        optimized_time = time.time() - start_time
        
        print(f"  循环方式:     {loop_time:.6f}秒")
        print(f"  向量化:       {vectorized_time:.6f}秒 ({loop_time/vectorized_time:.1f}x)")
        print(f"  张量化:       {tensorized_time:.6f}秒 ({loop_time/tensorized_time:.1f}x)")
        print(f"  优化张量:     {optimized_time:.6f}秒 ({loop_time/optimized_time:.1f}x)")
        
        # 验证结果正确性
        error1 = np.max(np.abs(results_loop - results_vectorized))
        error2 = np.max(np.abs(results_loop - results_tensorized))
        error3 = np.max(np.abs(results_loop - results_optimized))
        print(f"  误差检查: 向量化={error1:.2e}, 张量化={error2:.2e}, 优化={error3:.2e}")

def test_batch_quadratic_form():
    """测试批量二次型计算 x^T Q x"""
    
    print("\n不同维度下的二次型计算:")
    
    dimensions = [10, 50, 100, 200]
    batch_size = 1000
    
    for dim in dimensions:
        print(f"\n维度: {dim}, 批量: {batch_size}")
        
        # 生成测试数据
        X = np.random.randn(batch_size, dim).astype(np.float32)
        Q = np.random.randn(dim, dim).astype(np.float32)
        Q = Q.T @ Q  # 确保正定
        
        # 方法1: 循环计算
        start_time = time.time()
        results_loop = []
        for i in range(batch_size):
            result = X[i].T @ Q @ X[i]
            results_loop.append(result)
        results_loop = np.array(results_loop)
        loop_time = time.time() - start_time
        
        # 方法2: 向量化计算
        start_time = time.time()
        results_vectorized = np.sum(X * (X @ Q), axis=1)
        vectorized_time = time.time() - start_time
        
        # 方法3: 张量化计算
        start_time = time.time()
        results_tensorized = np.einsum('bi,ij,bj->b', X, Q, X)
        tensorized_time = time.time() - start_time
        
        # 方法4: 矩阵乘法张量化
        start_time = time.time()
        XQ = X @ Q  # (batch_size, dim)
        results_matmul = np.sum(X * XQ, axis=1)
        matmul_time = time.time() - start_time
        
        print(f"  循环方式:     {loop_time:.6f}秒")
        print(f"  向量化:       {vectorized_time:.6f}秒 ({loop_time/vectorized_time:.1f}x)")
        print(f"  张量化:       {tensorized_time:.6f}秒 ({loop_time/tensorized_time:.1f}x)")
        print(f"  矩阵乘法:     {matmul_time:.6f}秒 ({loop_time/matmul_time:.1f}x)")
        
        # 验证结果
        error1 = np.max(np.abs(results_loop - results_vectorized))
        error2 = np.max(np.abs(results_loop - results_tensorized))
        error3 = np.max(np.abs(results_loop - results_matmul))
        print(f"  误差: 向量化={error1:.2e}, 张量化={error2:.2e}, 矩阵={error3:.2e}")

def test_multidimensional_processing():
    """测试多维数据处理"""
    
    print("\n多维轨迹数据处理:")
    
    # 模拟MPC轨迹数据: (n_trajectories, horizon, n_states)
    n_trajectories = 500
    horizon = 20
    n_states = 10
    
    # 生成测试数据
    trajectories = np.random.randn(n_trajectories, horizon, n_states).astype(np.float32)
    targets = np.random.randn(n_trajectories, horizon, n_states).astype(np.float32)
    Q = np.eye(n_states, dtype=np.float32) * 10
    
    print(f"数据形状: {trajectories.shape}")
    
    # 方法1: 三重循环
    start_time = time.time()
    costs_loop = []
    for i in range(n_trajectories):
        trajectory_cost = 0
        for t in range(horizon):
            error = trajectories[i, t] - targets[i, t]
            cost = 0.5 * error.T @ Q @ error
            trajectory_cost += cost
        costs_loop.append(trajectory_cost)
    costs_loop = np.array(costs_loop)
    loop_time = time.time() - start_time
    
    # 方法2: 部分向量化（轨迹内向量化）
    start_time = time.time()
    costs_partial_vec = []
    for i in range(n_trajectories):
        errors = trajectories[i] - targets[i]  # (horizon, n_states)
        costs = 0.5 * np.sum(errors * (errors @ Q), axis=1)  # (horizon,)
        trajectory_cost = np.sum(costs)
        costs_partial_vec.append(trajectory_cost)
    costs_partial_vec = np.array(costs_partial_vec)
    partial_vec_time = time.time() - start_time
    
    # 方法3: 完全向量化
    start_time = time.time()
    errors_all = trajectories - targets  # (n_trajectories, horizon, n_states)
    costs_all = 0.5 * np.sum(errors_all * (errors_all @ Q), axis=2)  # (n_trajectories, horizon)
    costs_vectorized = np.sum(costs_all, axis=1)  # (n_trajectories,)
    vectorized_time = time.time() - start_time
    
    # 方法4: 张量化计算
    start_time = time.time()
    errors_tensor = trajectories - targets
    # 使用einsum进行高效张量计算
    costs_tensor = 0.5 * np.einsum('ijk,kl,ijl->ij', errors_tensor, Q, errors_tensor)
    costs_tensorized = np.sum(costs_tensor, axis=1)
    tensorized_time = time.time() - start_time
    
    # 方法5: 优化的张量操作
    start_time = time.time()
    errors_opt = trajectories - targets
    # 重塑为2D进行批量矩阵乘法
    errors_2d = errors_opt.reshape(-1, n_states)  # (n_trajectories*horizon, n_states)
    costs_2d = 0.5 * np.sum(errors_2d * (errors_2d @ Q), axis=1)
    costs_optimized = costs_2d.reshape(n_trajectories, horizon).sum(axis=1)
    optimized_time = time.time() - start_time
    
    print(f"  三重循环:     {loop_time:.6f}秒")
    print(f"  部分向量化:   {partial_vec_time:.6f}秒 ({loop_time/partial_vec_time:.1f}x)")
    print(f"  完全向量化:   {vectorized_time:.6f}秒 ({loop_time/vectorized_time:.1f}x)")
    print(f"  张量化:       {tensorized_time:.6f}秒 ({loop_time/tensorized_time:.1f}x)")
    print(f"  优化张量:     {optimized_time:.6f}秒 ({loop_time/optimized_time:.1f}x)")
    
    # 验证结果
    print(f"  结果验证:")
    print(f"    循环 vs 部分向量化: {np.max(np.abs(costs_loop - costs_partial_vec)):.2e}")
    print(f"    循环 vs 完全向量化: {np.max(np.abs(costs_loop - costs_vectorized)):.2e}")
    print(f"    循环 vs 张量化:     {np.max(np.abs(costs_loop - costs_tensorized)):.2e}")
    print(f"    循环 vs 优化张量:   {np.max(np.abs(costs_loop - costs_optimized)):.2e}")

def test_mpc_application():
    """MPC实际应用中的性能对比"""
    
    print("\nMPC实际应用场景:")
    
    class MPCPerformanceTest:
        def __init__(self, n_states, n_controls, horizon):
            self.n_states = n_states
            self.n_controls = n_controls
            self.horizon = horizon
            self.Q = np.eye(n_states, dtype=np.float32) * 10
            self.R = np.eye(n_controls, dtype=np.float32) * 0.1
            self.target = np.zeros(n_states, dtype=np.float32)
        
        def evaluate_trajectories_loop(self, states_batch, controls_batch):
            """循环方式评估轨迹"""
            n_candidates = states_batch.shape[0]
            costs = []
            
            for i in range(n_candidates):
                total_cost = 0
                for t in range(self.horizon):
                    # 状态成本
                    state_error = states_batch[i, t] - self.target
                    state_cost = 0.5 * state_error.T @ self.Q @ state_error
                    
                    # 控制成本
                    control_cost = 0.5 * controls_batch[i, t].T @ self.R @ controls_batch[i, t]
                    
                    total_cost += state_cost + control_cost
                
                # 终端成本
                final_error = states_batch[i, -1] - self.target
                total_cost += 0.5 * final_error.T @ self.Q @ final_error
                costs.append(total_cost)
            
            return np.array(costs)
        
        def evaluate_trajectories_vectorized(self, states_batch, controls_batch):
            """向量化方式评估轨迹"""
            # 状态成本
            state_errors = states_batch - self.target
            state_costs = 0.5 * np.sum(state_errors * (state_errors @ self.Q), axis=2)
            
            # 控制成本
            control_costs = 0.5 * np.sum(controls_batch * (controls_batch @ self.R), axis=2)
            
            # 总成本
            total_costs = (
                np.sum(state_costs[:, :-1], axis=1) +  # 中间状态成本
                np.sum(control_costs, axis=1) +        # 控制成本
                state_costs[:, -1]                     # 终端成本
            )
            
            return total_costs
        
        def evaluate_trajectories_tensorized(self, states_batch, controls_batch):
            """张量化方式评估轨迹"""
            # 使用einsum进行张量计算
            state_errors = states_batch - self.target
            
            # 状态成本张量计算
            state_costs = 0.5 * np.einsum('ijk,kl,ijl->ij', state_errors, self.Q, state_errors)
            
            # 控制成本张量计算
            control_costs = 0.5 * np.einsum('ijk,kl,ijl->ij', controls_batch, self.R, controls_batch)
            
            # 总成本
            total_costs = (
                np.sum(state_costs[:, :-1], axis=1) +
                np.sum(control_costs, axis=1) +
                state_costs[:, -1]
            )
            
            return total_costs
    
    # 测试不同规模
    test_configs = [
        (6, 3, 20, 100),   # 小规模
        (10, 4, 30, 500),  # 中规模
        (15, 6, 50, 200),  # 大规模
    ]
    
    for n_states, n_controls, horizon, n_candidates in test_configs:
        print(f"\n配置: {n_states}状态, {n_controls}控制, {horizon}步, {n_candidates}候选")
        
        mpc = MPCPerformanceTest(n_states, n_controls, horizon)
        
        # 生成测试数据
        states_batch = np.random.randn(n_candidates, horizon+1, n_states).astype(np.float32)
        controls_batch = np.random.randn(n_candidates, horizon, n_controls).astype(np.float32)
        
        # 循环方式
        start_time = time.time()
        costs_loop = mpc.evaluate_trajectories_loop(states_batch, controls_batch)
        loop_time = time.time() - start_time
        
        # 向量化方式
        start_time = time.time()
        costs_vectorized = mpc.evaluate_trajectories_vectorized(states_batch, controls_batch)
        vectorized_time = time.time() - start_time
        
        # 张量化方式
        start_time = time.time()
        costs_tensorized = mpc.evaluate_trajectories_tensorized(states_batch, controls_batch)
        tensorized_time = time.time() - start_time
        
        print(f"  循环方式:   {loop_time:.6f}秒")
        print(f"  向量化:     {vectorized_time:.6f}秒 ({loop_time/vectorized_time:.1f}x)")
        print(f"  张量化:     {tensorized_time:.6f}秒 ({loop_time/tensorized_time:.1f}x)")
        
        # 验证结果
        error1 = np.max(np.abs(costs_loop - costs_vectorized))
        error2 = np.max(np.abs(costs_loop - costs_tensorized))
        print(f"  误差: 向量化={error1:.2e}, 张量化={error2:.2e}")

def performance_summary():
    """性能总结和建议"""
    
    print("\n" + "="*60)
    print("性能总结和建议:")
    print("="*60)
    
    print("""
1. 向量化 vs 张量化的选择原则:
   
   • 低维数据 (< 100维): 向量化通常更快
     - 内存访问模式更友好
     - 缓存利用率更高
     - 函数调用开销更小
   
   • 高维数据 (> 100维): 张量化可能更快
     - 更好的并行化
     - 减少中间结果存储
     - 更高效的内存使用
   
   • 批量大小影响:
     - 小批量: 向量化优势明显
     - 大批量: 张量化优势显现

2. 具体应用建议:
   
   • MPC轨迹评估: 向量化通常最优
   • 大规模优化: 张量化更有优势
   • 实时控制: 向量化延迟更低
   • 离线训练: 张量化吞吐量更高

3. 优化策略:
   
   • 避免不必要的数据复制
   • 合理使用einsum和matmul
   • 考虑内存布局(C-order vs F-order)
   • 利用BLAS库的优化
   
4. GPU加速:
   
   • 张量化在GPU上通常更快
   • 向量化在CPU上通常更快
   • 数据传输成本需要考虑
    """)

if __name__ == "__main__":
    vectorization_vs_tensorization_comparison()
    performance_summary()


=== 向量化 vs 张量化性能对比 ===

场景1: 批量矩阵-向量乘法

批量大小: 100, 矩阵大小: 50x50
  循环方式:     0.000210秒
  向量化:       0.000070秒 (3.0x)
  张量化:       0.000034秒 (6.2x)
  优化张量:     0.000264秒 (0.8x)
  误差检查: 向量化=4.19e+01, 张量化=0.00e+00, 优化=3.81e-06

批量大小: 500, 矩阵大小: 50x50
  循环方式:     0.000901秒
  向量化:       0.000300秒 (3.0x)
  张量化:       0.000194秒 (4.7x)
  优化张量:     0.001339秒 (0.7x)
  误差检查: 向量化=4.15e+01, 张量化=0.00e+00, 优化=7.63e-06

批量大小: 1000, 矩阵大小: 50x50
  循环方式:     0.001741秒
  向量化:       0.000592秒 (2.9x)
  张量化:       0.000404秒 (4.3x)
  优化张量:     0.002753秒 (0.6x)
  误差检查: 向量化=4.81e+01, 张量化=0.00e+00, 优化=7.63e-06

批量大小: 2000, 矩阵大小: 50x50
  循环方式:     0.003966秒
  向量化:       0.001318秒 (3.0x)
  张量化:       0.001082秒 (3.7x)
  优化张量:     0.009218秒 (0.4x)
  误差检查: 向量化=4.78e+01, 张量化=0.00e+00, 优化=5.72e-06

场景2: 批量二次型计算

不同维度下的二次型计算:

维度: 10, 批量: 1000
  循环方式:     0.001974秒
  向量化:       0.000055秒 (36.0x)
  张量化:       0.004968秒 (0.4x)
  矩阵乘法:     0.000090秒 (21.9x)
  误差: 向量化=9.16e-05, 张量化=1.83e-04, 矩阵=9.16e-05

维度: 50, 批量: 1000
  循环

总结：向量化 vs 张量化的性能差异

向量化更快的情况：

低维数据（< 100维）

小批量处理

CPU计算

需要低延迟的实时应用

张量化更快的情况：

高维数据（> 100维）

大批量处理

GPU计算

复杂的多维运算

实际经验：

在MPC应用中，向量化通常是最佳选择

张量化在深度学习和大规模数值计算中更有优势

具体性能取决于数据形状、硬件和具体操作