In [1]:
import numpy as np

In [2]:
# 假设你的文件叫 data_20250201_120000.npy
raw = np.load("data/data_20251122_200755.npy", allow_pickle=True)


In [3]:
joint_data = []
cam_xyz = []
for arr in raw:
    joint_data.append(arr['joint_positions'])
    cam_xyz.append(arr['tag_xyz'])
joint_data = np.asarray(joint_data)
cam_xyz = np.asarray(cam_xyz)

In [4]:

# 1) 有限性过滤
mask_finite = np.isfinite(joint_data).all(axis=1) & np.isfinite(cam_xyz).all(axis=1)

joint_data_clean = joint_data[mask_finite]
cam_xyz_clean   = cam_xyz[mask_finite]
print("after finite filter:", joint_data_clean.shape[0], "samples")
# 拼到一起，方便按维度做 z-score
data_all = np.concatenate([joint_data_clean, cam_xyz_clean], axis=1)  # shape (N, 9)

mean = data_all.mean(axis=0)
std  = data_all.std(axis=0) + 1e-8     # 防止除零
z    = (data_all - mean) / std         # (N, 9)

# 这里阈值自己调，一般 3 比较常见，2 更严格
th = 3.0
mask_z = np.abs(z).max(axis=1) < th    # 每一行所有维度里最大 z-score < 3

joint_data_clean = joint_data_clean[mask_z]
cam_xyz_clean    = cam_xyz_clean[mask_z]

print("after outlier filter:", joint_data_clean.shape[0], "samples")

after finite filter: 399 samples
after outlier filter: 368 samples


In [5]:
data = np.hstack([joint_data_clean, cam_xyz_clean])

In [6]:
y_joint1 = data[:, 0]                  # (N,)
y_joint1_sin = np.sin(y_joint1)        # (N,)
y_joint1_cos = np.cos(y_joint1)        # (N,)

Y = np.column_stack([y_joint1_cos, y_joint1_sin])  # (N, 2)

In [7]:
x_joint1 = data[:,-3:]

In [8]:
from sklearn.linear_model import LinearRegression
reg = LinearRegression().fit(x_joint1, y_joint1_sin)
reg.score(x_joint1, y_joint1_sin)
# reg.coef_
# reg.intercept_
# reg.predict(np.array([[3, 5]]))

0.9971932877195544

In [9]:
joint_23 = data[:, 1:3]   
joint_2 = data[:, 1]
joint_3 = data[:, 2]

In [10]:
joint_23.shape

(368, 2)

In [11]:
import numpy as np
from sklearn.svm import SVR
from sklearn.multioutput import MultiOutputRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor

# ===================== 数据准备 =====================
# 假设 joint_2, joint_3 已经是 "弧度(rad)" !!!
# x_joint1: (N, 3)  比如相机坐标系下的 xyz
# joint_2, joint_3: (N,)

# 第一阶段目标: 让 SVR 学习与 j2/j3 相关的两个特征
joint_3_post = np.column_stack([
    np.sin(joint_2),              # 特征1: sin(j2)
    np.sin(joint_3 + joint_2)     # 特征2: sin(j2 + j3)
])  # shape (N, 2)

X = x_joint1                 # (N, 3)
Y = joint_3_post             # (N, 2)

# ⚠ 同时把 joint_3（弧度）也一起丢进 train_test_split，方便第二阶段用
X_train, X_test, Y_train, Y_test, j3_train, j3_test = train_test_split(
    X, Y, joint_3, test_size=0.25, random_state=0
)

# ================= 第一阶段：SVR 拟合 sin(j2) / sin(j2+j3) =================
base_svr = SVR(kernel='rbf', C=0.1, epsilon=0.01)
svr_reg = make_pipeline(
    StandardScaler(),
    MultiOutputRegressor(base_svr)
)

svr_reg.fit(X_train, Y_train)

print("SVR Train R2:", svr_reg.score(X_train, Y_train))
print("SVR  Test R2:",  svr_reg.score(X_test,  Y_test))

#

SVR Train R2: 0.9010571186315196
SVR  Test R2: 0.8813300700042326


In [12]:
Y_pred = svr_reg.predict(X)          # 形状 (N, 2) 之类的
Y_pred = np.clip(Y_pred, -1.0, 1.0)  # 强制限制到 [-1, 1]


In [13]:
import copy
Y_pred_next = copy.deepcopy(Y_pred)

In [14]:
Y_pred_next[:, 0] = np.asin(Y_pred[:, 0])

In [15]:
Y_pred_next[:, 1] = np.asin(Y_pred[:, 1])-np.asin(Y_pred[:, 0])

In [23]:
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

import copy
Y_pred_next = copy.deepcopy(Y_pred)
Y_pred_next[:, 0] = np.asin(Y_pred[:, 0])
Y_pred_next[:, 1] = np.asin(Y_pred[:, 1])-np.asin(Y_pred[:, 0])
# X: (N, d_in)
# Y_pred_next: (N, d_out)   比如 (N, 2)

X_train, X_test, y_train, y_test = train_test_split(
    X, Y_pred_next, test_size=0.25, random_state=1
)

regr = make_pipeline(
    StandardScaler(),
    MLPRegressor(
        hidden_layer_sizes=(256, 256),
        activation='relu',
        solver='adam',
        learning_rate_init=1e-2,
        max_iter=1000,
        tol=1e-5,           # 比 0.1 小很多，不会直接提前停
        random_state=1
    )
)

regr.fit(X_train, y_train)

y_hat = regr.predict(X_test[:2])
print("y_hat[:2] =\n", y_hat)

print("Test R2:", regr.score(X_test, y_test))


y_hat[:2] =
 [[ 0.12963036 -1.4452468 ]
 [ 0.02565547 -1.4439438 ]]
Test R2: 0.9835980413871828


In [55]:
import numpy as np
from sklearn.svm import SVR
from sklearn.multioutput import MultiOutputRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score


# ---------- 小工具函数 ----------
def sin_pair_to_angles(sin_pair: np.ndarray) -> np.ndarray:
    """[sin(j2), sin(j2+j3)] -> [j2_hat, j3_hat] (rad)."""
    s1  = np.clip(sin_pair[:, 0], -1.0, 1.0)
    s12 = np.clip(sin_pair[:, 1], -1.0, 1.0)

    j2_hat   = np.arcsin(s1)
    j2p3_hat = np.arcsin(s12)
    j3_hat   = j2p3_hat - j2_hat
    return np.column_stack([j2_hat, j3_hat])


def print_metrics(name, y_true, y_pred):
    mae  = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2   = r2_score(y_true, y_pred)
    mae_deg  = np.rad2deg(mae)
    rmse_deg = np.rad2deg(rmse)
    print(f"[{name}]  R2 = {r2:.4f},  MAE = {mae_deg:.3f} deg,  RMSE = {rmse_deg:.3f} deg")


# ===================== 数据准备 =====================
# x_joint1: (N, 3)   相机坐标 xyz
# joint_2, joint_3: (N,)  弧度
joint_3_post = np.column_stack([
    np.sin(joint_2),
    np.sin(joint_3 + joint_2)
])              # (N, 2)

X = x_joint1    # (N, 3)
Y = joint_3_post

X_train, X_test, Y_train, Y_test, j2_train, j2_test, j3_train, j3_test = train_test_split(
    X, Y, joint_2, joint_3, test_size=0.25, random_state=0
)

# ============ 第一阶段：SVR 拟合正弦特征 ============
svr_reg = make_pipeline(
    StandardScaler(),
    MultiOutputRegressor(SVR(kernel='rbf', C=0.1, epsilon=0.01))
)
svr_reg.fit(X_train, Y_train)
print("SVR Train R2 (sin-space):", svr_reg.score(X_train, Y_train))
print("SVR  Test R2 (sin-space):", svr_reg.score(X_test,  Y_test))

# SVR 预测 -> 转角度
Y_train_sin_pred = svr_reg.predict(X_train)
Y_test_sin_pred  = svr_reg.predict(X_test)
angles_train_svr = sin_pair_to_angles(Y_train_sin_pred)
angles_test_svr  = sin_pair_to_angles(Y_test_sin_pred)

angles_train_true = np.column_stack([j2_train, j3_train])
angles_test_true  = np.column_stack([j2_test,  j3_test])

# ============ 第二阶段：MLP 学误差 Δθ ============
err_train      = angles_train_true - angles_train_svr
X_mlp_train    = np.hstack([X_train, Y_train_sin_pred])
X_mlp_test     = np.hstack([X_test,  Y_test_sin_pred])

mlp_err_reg = make_pipeline(
    StandardScaler(),
    MLPRegressor(
        hidden_layer_sizes=(64, 64, 16),
        activation='relu',
        solver='adam',
        learning_rate_init=1e-3,
        max_iter=2000,
        tol=1e-6,
        random_state=1
    )
)
mlp_err_reg.fit(X_mlp_train, err_train)

# 叠加补偿
err_pred_test      = mlp_err_reg.predict(X_mlp_test)
angles_test_final  = angles_test_svr + err_pred_test

# ============ 指标打印 ============
j2_svr,  j3_svr  = angles_test_svr[:, 0], angles_test_svr[:, 1]
j2_final, j3_final = angles_test_final[:, 0], angles_test_final[:, 1]
j2_gt,   j3_gt   = j2_test, j3_test

print("=== joint2 ===")
print_metrics("SVR only    (j2)", j2_gt, j2_svr)
print_metrics("SVR + MLP   (j2)", j2_gt, j2_final)

print("=== joint3 ===")
print_metrics("SVR only    (j3)", j3_gt, j3_svr)
print_metrics("SVR + MLP   (j3)", j3_gt, j3_final)

print("R2(all angles) SVR only  :", r2_score(angles_test_true, angles_test_svr))
print("R2(all angles) SVR+MLP   :", r2_score(angles_test_true, angles_test_final))


# ============ 对外统一预测接口 ============

def predict_joint2_joint3_from_xyz(X_query: np.ndarray) -> np.ndarray:
    """
    X_query: (M, 3)  相机坐标 xyz
    return:  (M, 2)  [j2_hat, j3_hat] (rad)
    """
    Y_sin   = svr_reg.predict(X_query)
    angles0 = sin_pair_to_angles(Y_sin)
    X_mlp   = np.hstack([X_query, Y_sin])
    err     = mlp_err_reg.predict(X_mlp)
    return angles0 + err


SVR Train R2 (sin-space): 0.9010571186315196
SVR  Test R2 (sin-space): 0.8813300700042326
=== joint2 ===
[SVR only    (j2)]  R2 = 0.8744,  MAE = 1.212 deg,  RMSE = 1.712 deg
[SVR + MLP   (j2)]  R2 = 0.8824,  MAE = 1.200 deg,  RMSE = 1.656 deg
=== joint3 ===
[SVR only    (j3)]  R2 = -2.6091,  MAE = 10.214 deg,  RMSE = 15.474 deg
[SVR + MLP   (j3)]  R2 = 0.8983,  MAE = 1.928 deg,  RMSE = 2.598 deg
R2(all angles) SVR only  : -0.8673940307493729
R2(all angles) SVR+MLP   : 0.8903612769921961
