In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from scipy.integrate import solve_ivp
from scipy.optimize import root
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
import joblib  # 用于保存scaler
import time
import os

# -----------------------------------------------------------------
# --- PART 1: GROUND TRUTH PHYSICS SOLVER
# -----------------------------------------------------------------

def model_3d(t, state, m, c, g, wind_vec):
    vx, vy, vz = state[3], state[4], state[5]
    wx, wy, wz = wind_vec
    v_rel_x, v_rel_y, v_rel_z = vx - wx, vy - wy, vz - wz
    v_rel = (v_rel_x**2 + v_rel_y**2 + v_rel_z**2)**0.5
    
    if v_rel < 1e-9:
        ax, ay = 0, 0
        az = -g
    else:
        ax = -(c / m) * v_rel * v_rel_x
        ay = -(c / m) * v_rel * v_rel_y
        az = -g - (c / m) * v_rel * v_rel_z
    return [vx, vy, vz, ax, ay, az]

def objective_function_3d(launch_angles, v0, m, c, g, wind_vec, target_x, target_z):
    theta_deg, phi_deg = launch_angles
    theta_rad, phi_rad = np.radians(theta_deg), np.radians(phi_deg)
    
    v0z = v0 * np.sin(theta_rad)
    v_proj_xy = v0 * np.cos(theta_rad)
    v0x = v_proj_xy * np.cos(phi_rad)
    v0y = v_proj_xy * np.sin(phi_rad)
    
    initial_state = [0, 0, 0, v0x, v0y, v0z]
    
    def stop_at_target_x(t, state, *args):
        return state[0] - target_x
    stop_at_target_x.terminal = True
    stop_at_target_x.direction = 1
    t_max_guess = max(30.0, (2.0 * v0 / g) * 1.5)
    
    sol = solve_ivp(
        model_3d, [0, t_max_guess], initial_state,
        args=(m, c, g, wind_vec), events=stop_at_target_x, dense_output=True
    )

    if sol.status == 1 and len(sol.t_events[0]) > 0:
        final_state = sol.y_events[0][-1]
        error_y = final_state[1] - 0.0
        error_z = final_state[2] - target_z
        return [error_y, error_z]
    else:
        error_x = sol.y[0][-1] - target_x if sol.y[0].size > 0 else -target_x
        return [error_x * 100, error_x * 100]

def find_launch_angles_3d(rho, wind_vector, target_distance, target_altitude_diff, v0, 
                          m_projectile, C_D, projectile_diameter, g,
                          verbose=False): # 设置为 "verbose=False" 以便安静运行
    target_z = target_altitude_diff
    A = np.pi * (projectile_diameter / 2)**2
    c_drag = 0.5 * C_D * rho * A
    
    args_tuple = (v0, m_projectile, c_drag, g, wind_vector, target_distance, target_z)
    
    # 使用为 h=0 找到的 3.7 度作为更好的初始猜测
    initial_guess = [3.7, 0.0] 
    
    if verbose:
        print(f"Solving for: TargetX={target_distance}m, TargetZ={target_z}m, Wind={wind_vector}")
    
    sol = root(
        objective_function_3d, initial_guess, 
        args=args_tuple, method='lm'
    )
    
    if sol.success:
        if verbose:
            print(f"  > Success: Theta={sol.x[0]:.4f}, Phi={sol.x[1]:.4f}")
        return sol.x[0], sol.x[1]  # theta_sol, phi_sol
    else:
        if verbose:
            print("  > Solution Failed.")
        return None, None

# -------------------------------------------------
# --- PART 2: DATA GENERATION FUNCTION (INVERSE) ---
# -------------------------------------------------

def generate_inverse_data(sim_params, n_samples):
    """
    使用“真理”模型生成 (输入) -> (输出) 映射数据
    输入: [target_x, h, Wx, Wy]
    输出: [theta, phi]
    """
    print(f"开始生成 {n_samples} 个训练样本 (这可能需要很长时间)...")
    
    # 展开模拟参数
    v0, m, C_D, diam, g, rho = sim_params
    
    inputs_X = []
    outputs_y = []
    
    pbar = tqdm(total=n_samples)
    while len(inputs_X) < n_samples:
        
        # 1. 随机化输入条件
        target_x = np.random.uniform(800.0, 1500.0)
        h = np.random.uniform(-100.0, 100.0)
        wx = np.random.uniform(-15.0, 15.0)
        wy = np.random.uniform(-15.0, 15.0)
        wind_vec = [wx, wy, 0.0] # 假设 wz=0

        # 2. 运行“慢速但精确”的求解器
        theta, phi = find_launch_angles_3d(
            rho, wind_vec, target_x, h, v0, m, C_D, diam, g, verbose=False
        )
        
        # 3. 如果求解成功，保存数据点
        if theta is not None:
            inputs_X.append([target_x, h, wx, wy])
            outputs_y.append([theta, phi])
            pbar.update(1)
            
    pbar.close()
    print("数据生成完毕。")
    return np.array(inputs_X, dtype=np.float32), np.array(outputs_y, dtype=np.float32)


# -------------------------------------------------
# --- PART 3: PYTORCH MODEL DEFINITION (INVERSE) ---
# -------------------------------------------------

class InverseSolverModel(nn.Module):
    def __init__(self):
        super(InverseSolverModel, self).__init__()
        # 4个输入 (target_x, h, Wx, Wy) -> ... -> 2个输出 (theta, phi)
        self.network = nn.Sequential(
            nn.Linear(4, 128),
            nn.ReLU(),
            nn.Linear(128, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 2)  # 2个输出
        )

    def forward(self, x):
        return self.network(x)


# -------------------------------------------------
# --- PART 4: MAIN TRAINING SCRIPT ---
# -------------------------------------------------

if __name__ == "__main__":
    
    # --- A. CONFIGURATION ---
    
    # 1. 模拟参数
    v_initial = 450.0
    m_projectile = 5.0
    g = 9.81
    air_density = 1.17
    C_D = 0.92
    projectile_diameter = 0.11
    
    SIM_PARAMS = (v_initial, m_projectile, C_D, projectile_diameter, g, air_density)

    # 2. 数据生成参数
    N_SAMPLES = 100000  # 总样本数
    X_DATA_FILE = 'X_inverse_data.npy'
    Y_DATA_FILE = 'Y_inverse_data.npy'

    # 3. PyTorch 训练参数
    LEARNING_RATE = 0.001
    NUM_EPOCHS = 100
    BATCH_SIZE = 32
    MODEL_FILE = 'inverse_model.pth'
    SCALER_X_FILE = 'scaler_X.pkl'
    SCALER_Y_FILE = 'scaler_Y.pkl'
    
    
    # --- B. GENERATE DATA ---
    
    if not (os.path.exists(X_DATA_FILE) and os.path.exists(Y_DATA_FILE)):
        X_data, y_data = generate_inverse_data(SIM_PARAMS, N_SAMPLES)
        np.save(X_DATA_FILE, X_data)
        np.save(Y_DATA_FILE, y_data)
        print(f"数据已生成并保存到 {X_DATA_FILE}, {Y_DATA_FILE}")
    else:
        print(f"从 .npy 文件加载现有数据...")
        X_data = np.load(X_DATA_FILE)
        y_data = np.load(Y_DATA_FILE)

    print(f"数据形状: X={X_data.shape}, y={y_data.shape}")

    
    # --- C. PREPROCESS DATA ---
    print("预处理数据...")
    
    # 1. 特征缩放 (!!! 关键: X 和 y 都需要 !!!)
    scaler_X = StandardScaler()
    X_scaled = scaler_X.fit_transform(X_data)
    
    scaler_Y = StandardScaler()
    y_scaled = scaler_Y.fit_transform(y_data)
    
    # 2. 训练/验证集拆分
    X_train, X_val, y_train, y_val = train_test_split(
        X_scaled, y_scaled, test_size=0.2, random_state=42
    )

    # 3. 转换为PyTorch Tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.float32)

    # 4. 创建 DataLoaders
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

    
    # --- D. TRAIN MODEL ---
    print("开始训练反向求解模型...")
    
    model = InverseSolverModel()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    best_val_loss = float('inf')

    for epoch in range(NUM_EPOCHS):
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * inputs.size(0)
            
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
        
        train_loss = train_loss / len(train_loader.dataset)
        val_loss = val_loss / len(val_loader.dataset)
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{NUM_EPOCHS}] "
                  f"Train Loss: {train_loss:.6f} "
                  f"Val Loss: {val_loss:.6f}")
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), MODEL_FILE)
            
    print("训练完成。")

    
    # --- E. SAVE SCALERS ---
    joblib.dump(scaler_X, SCALER_X_FILE)
    joblib.dump(scaler_Y, SCALER_Y_FILE)
    print(f"最佳模型已保存到: {MODEL_FILE}")
    print(f"输入scaler已保存到: {SCALER_X_FILE}")
    print(f"输出scaler已保存到: {SCALER_Y_FILE}")

    
    # --- F. INFERENCE EXAMPLE (关键对比) ---
    print("\n" + "="*50)
    print("--- 推理（预测）示例 ---")
    print("="*50)

 

开始生成 100000 个训练样本 (这可能需要很长时间)...


100%|██████████████████████████████████████████████████████████████████████████| 100000/100000 [41:34<00:00, 40.08it/s]


数据生成完毕。
数据已生成并保存到 X_inverse_data.npy, Y_inverse_data.npy
数据形状: X=(100000, 4), y=(100000, 2)
预处理数据...
开始训练反向求解模型...
Epoch [10/100] Train Loss: 0.000119 Val Loss: 0.000052
Epoch [20/100] Train Loss: 0.000066 Val Loss: 0.000045
Epoch [30/100] Train Loss: 0.000049 Val Loss: 0.000054
Epoch [40/100] Train Loss: 0.000044 Val Loss: 0.000022
Epoch [50/100] Train Loss: 0.000036 Val Loss: 0.000013
Epoch [60/100] Train Loss: 0.000033 Val Loss: 0.000013
Epoch [70/100] Train Loss: 0.000031 Val Loss: 0.000040
Epoch [80/100] Train Loss: 0.000029 Val Loss: 0.000114
Epoch [90/100] Train Loss: 0.000025 Val Loss: 0.000040
Epoch [100/100] Train Loss: 0.000023 Val Loss: 0.000014
训练完成。
最佳模型已保存到: inverse_model.pth
输入scaler已保存到: scaler_X.pkl
输出scaler已保存到: scaler_Y.pkl

--- 推理（预测）示例 ---


In [32]:
# --- F. INFERENCE EXAMPLE (关键对比) ---
print("\n" + "="*50)
print("--- 推理（预测）示例 ---")
print("="*50)

# 1. 加载保存的模型和scaler
print("加载保存的模型和scaler...")
loaded_model = InverseSolverModel()
loaded_model.load_state_dict(torch.load(MODEL_FILE))
loaded_model.eval()  # 设置为评估模式

loaded_scaler_X = joblib.load(SCALER_X_FILE)
loaded_scaler_Y = joblib.load(SCALER_Y_FILE)

# 2. 准备一个新问题
test_target_x = 1200.0
test_h = 100
test_wx = -8 # 5m/s 逆风
test_wy = 0  # 3m/s 侧风

print(f"\n测试问题: X={test_target_x}m, h={test_h}m, Wx={test_wx}m/s, Wy={test_wy}m/s")

# a. 使用"真理"模型 
print("正在运行真理Scipy求解器 (可能需要几秒钟)...")
start_sim_time = time.time()

true_theta, true_phi = find_launch_angles_3d(
    air_density, [test_wx, test_wy, 0.0], test_target_x, test_h,
    v_initial, m_projectile, C_D, projectile_diameter, g, verbose=False
)
sim_time = time.time() - start_sim_time
print(f"  > Scipy (真理) 结果: Theta={true_theta:.4f}°, Phi={true_phi:.4f}°")
print(f"  > Scipy 求解耗时: {sim_time * 1000:.4f} 毫秒")

# b. 使用PyTorch代理模型 
print("正在运行 PyTorch 代理求解器 (几乎瞬时)...")
start_nn_time = time.time()

# (i) 组织输入
input_data = np.array([[test_target_x, test_h, test_wx, test_wy]], dtype=np.float32)

# (ii) *必须* 使用 X_scaler 转换输入
input_data_scaled = loaded_scaler_X.transform(input_data)

# (iii) 转换为Tensor
input_tensor = torch.tensor(input_data_scaled, dtype=torch.float32)

# (iv) 预测 (得到的是缩放后的值)
with torch.no_grad():
    predicted_scaled_output = loaded_model(input_tensor)

# (v) *必须* 使用 Y_scaler 逆转换输出
predicted_angles = loaded_scaler_Y.inverse_transform(predicted_scaled_output.numpy())

nn_time = time.time() - start_nn_time

nn_theta, nn_phi = predicted_angles[0]
print(f"  > PyTorch (代理) 预测结果: Theta={nn_theta:.4f}°, Phi={nn_phi:.4f}°")
print(f"  > PyTorch 推理耗时: {nn_time * 1000:.4f} 毫秒")

# 3. 计算误差
if true_theta is not None and true_phi is not None:
    theta_error = abs(nn_theta - true_theta)
    phi_error = abs(nn_phi - true_phi)
    print(f"\n--- 误差分析 ---")
    print(f"Theta 误差: {theta_error:.4f}°")
    print(f"Phi 误差: {phi_error:.4f}°")

# 4. 生成参数组合表格 (限定 wy=0)
print("\n" + "="*60)
print("--- 参数组合表格 (Wy=0) ---")
print("="*60)

# 定义参数范围
h_values = [-100, -50, 0, 50, 100]
wx_values = [-8, -6, -4, -2, 0, 2, 4, 6, 8]
target_x = 1200.0
wy = 0.0

print(f"\n目标距离: {target_x}m, 侧风 Wy: {wy}m/s")
print("\nTheta 预测结果表 (°):")
print("h\\wx   ", end="")
for wx in wx_values:
    print(f"{wx:>8}", end="")
print()

# 存储所有结果用于后续分析
results = []

for h in h_values:
    print(f"{h:>5} ", end="")
    row_results = []
    for wx in wx_values:
        # 准备输入数据
        input_data = np.array([[target_x, h, wx, wy]], dtype=np.float32)
        input_data_scaled = loaded_scaler_X.transform(input_data)
        input_tensor = torch.tensor(input_data_scaled, dtype=torch.float32)
        
        # 使用PyTorch模型预测
        with torch.no_grad():
            predicted_scaled_output = loaded_model(input_tensor)
        
        predicted_angles = loaded_scaler_Y.inverse_transform(predicted_scaled_output.numpy())
        theta_pred = predicted_angles[0][0]
        
        # 存储结果
        row_results.append(theta_pred)
        print(f"{theta_pred:>8.3f}", end="")
    
    results.append(row_results)
    print()

# 可选：生成对应的真实值表格（如果需要对比）
print("\n" + "-"*60)
print("是否生成对应的Scipy真实值表格进行对比？(这可能需要较长时间)")
user_input = input("输入 'y' 继续，其他键跳过: ")

if user_input.lower() == 'y':
    print("\n生成Scipy真实值表格...")
    print("h\\wx   ", end="")
    for wx in wx_values:
        print(f"{wx:>8}", end="")
    print()
    
    true_results = []
    for i, h in enumerate(h_values):
        print(f"{h:>5} ", end="")
        true_row = []
        for j, wx in enumerate(wx_values):
            try:
                true_theta, true_phi = find_launch_angles_3d(
                    air_density, [wx, wy, 0.0], target_x, h,
                    v_initial, m_projectile, C_D, projectile_diameter, g, verbose=False
                )
                if true_theta is not None:
                    true_row.append(true_theta)
                    print(f"{true_theta:>8.3f}", end="")
                else:
                    true_row.append(float('nan'))
                    print(f"{'N/A':>8}", end="")
            except:
                true_row.append(float('nan'))
                print(f"{'N/A':>8}", end="")
        true_results.append(true_row)
        print()
    
    # 计算并显示误差表格
    print("\nTheta 误差表格 (预测值 - 真实值):")
    print("h\\wx   ", end="")
    for wx in wx_values:
        print(f"{wx:>8}", end="")
    print()
    
    for i, h in enumerate(h_values):
        print(f"{h:>5} ", end="")
        for j, wx in enumerate(wx_values):
            if (i < len(true_results) and j < len(true_results[i]) and 
                not np.isnan(true_results[i][j])):
                error = results[i][j] - true_results[i][j]
                print(f"{error:>8.3f}", end="")
            else:
                print(f"{'N/A':>8}", end="")
        print()

print("\n表格生成完成！")
    


--- 推理（预测）示例 ---
加载保存的模型和scaler...

测试问题: X=1200.0m, h=100m, Wx=-8m/s, Wy=0m/s
正在运行真理Scipy求解器 (可能需要几秒钟)...
  > Scipy (真理) 结果: Theta=9.6059°, Phi=0.0000°
  > Scipy 求解耗时: 37.9965 毫秒
正在运行 PyTorch 代理求解器 (几乎瞬时)...
  > PyTorch (代理) 预测结果: Theta=9.6084°, Phi=0.0010°
  > PyTorch 推理耗时: 2.0006 毫秒

--- 误差分析 ---
Theta 误差: 0.0026°
Phi 误差: 0.0010°

--- 参数组合表格 (Wy=0) ---

目标距离: 1200.0m, 侧风 Wy: 0.0m/s

Theta 预测结果表 (°):
h\wx         -8      -6      -4      -2       0       2       4       6       8
 -100    0.148   0.053  -0.043  -0.138  -0.225  -0.302  -0.380  -0.460  -0.540
  -50    2.486   2.393   2.301   2.222   2.148   2.071   1.996   1.923   1.856
    0    4.841   4.760   4.695   4.626   4.556   4.486   4.416   4.361   4.306
   50    7.218   7.155   7.091   7.027   6.962   6.907   6.858   6.808   6.754
  100    9.608   9.546   9.485   9.429   9.387   9.345   9.300   9.254   9.208

------------------------------------------------------------
是否生成对应的Scipy真实值表格进行对比？(这可能需要较长时间)
输入 'y' 继续，其他键跳过: y

生成