In [1]:
import numpy as np
import pandas as pd
import pinocchio as pin
import scipy
import seaborn as sns

# Set numpy precision to show easier to understand values
np.set_printoptions(formatter={"float_kind": "{:.3f}".format})

## Load data

In [None]:
# Replace the paths with your own data

df_joint_states = pd.read_parquet("data/jtc/franka_joint_states.parquet")
df_ft = pd.read_parquet("data/jtc/force_torque_sensor_broadcaster_wrench.parquet")
df_moving_to_pose = pd.read_parquet("data/jtc/moving_to_new_pose.parquet")
df_pose_reached = pd.read_parquet("data/jtc/pose_reached.parquet")

df_robot_description = pd.read_parquet("data/jtc/robot_description.parquet")

## Extract timestamps of stationary poses

In [3]:
# Add dummy variables to easier detect state transitions after concatenations
df_pose_reached["state"] = 0
df_moving_to_pose["state"] = 1
# Filter moving to pose topics that are after a first pose was reached
df_moving_to_pose = df_moving_to_pose.loc[
    (df_moving_to_pose["timestamp"] - df_pose_reached["timestamp"][0]) > 0
]
# Concatenate both dataframes and sort new dataframe by timestamp
df_stamps = pd.concat((df_moving_to_pose, df_pose_reached)).sort_values(
    by=["timestamp"], ignore_index=True
)
# Filter timestamps where robot stays in place. This means difference between next state and current state is -1
# And time difference between messages appearing has to be larder than 19 second (robot was suppose to wait 20 seconds)
df_stamps_stationary = df_stamps[
    (df_stamps["state"].diff() == -1.0) & (-df_stamps["timestamp"].diff(-1) > 19 * 1e9)
]
df_stamps_stationary.reset_index(drop=True, inplace=True)

## Create more accurate timestamps based on header messages

In [4]:
df_joint_states["header.stamp"] = (
    df_joint_states["header.stamp.sec"] * 1e9 + df_joint_states["header.stamp.nanosec"]
)
df_ft["header.stamp"] = df_ft["header.stamp.sec"] * 1e9 + df_ft["header.stamp.nanosec"]

## Extract parts of the dataframes that correspond only to the measured poses

In [5]:
# Map dataframes to topic names
measurements = {
    "joint_states": df_joint_states,
    "force_torque": df_ft,
}

stationary_data = {}
# Choose offsets for clipping
start_stamp_offset = int(3.0 * 1e9)  # 3 seconds
end_stamp_offset = int(19.0 * 1e9)  # 19 seconds
for i, data in df_stamps_stationary.iterrows():
    # Compute start and end time
    start = data["timestamp"] + start_stamp_offset
    end = data["timestamp"] + end_stamp_offset
    pose_name = data["data"]
    if pose_name not in stationary_data.keys():
        stationary_data[pose_name] = []
    stationary_data[pose_name].append(
        {
            topic: df.loc[(df["header.stamp"] > start) & (df["header.stamp"] < end)]
            for topic, df in measurements.items()
        }
    )

## Load URDF

In [None]:
model_full = pin.buildModelFromXML(df_robot_description["data"][0])
joints_to_lock = [
    model_full.getJointId(name) for name in model_full.names if "finger" in name
]
model = pin.buildReducedModel(model_full, joints_to_lock, np.zeros(model_full.nq))

data = model.createData()

joint_order = [name for name in model.names if name != "universe"]
base_frame_id = model.getFrameId("fer_link0")
tcp_pose_frame_id = model.getFrameId("fer_link8")
sensor_frame_id = model.getFrameId("ati_mini45_measurement_reference")

## Remap joint order
Joint state publisher does not guarantee to be publish joints in the same order all the time

In [7]:
# Remove columns that will not be used
columns_to_drop = [
    "timestamp",
    "header.stamp.sec",
    "header.stamp.nanosec",
    "header.frame_id",
]


def reorder_joints(row):
    joints = row["name"]
    joint_map = [np.argwhere(joints == joint_name)[0][0] for joint_name in joint_order]
    row["name"] = joint_order
    row["position"] = row["position"][joint_map]
    row["velocity"] = row["velocity"][joint_map]
    row["effort"] = row["effort"][joint_map]
    return row


for pose in stationary_data.keys():
    for i in range(len(stationary_data[pose])):
        df1 = stationary_data[pose][i]["force_torque"].drop(columns=columns_to_drop)
        df2 = stationary_data[pose][i]["joint_states"].drop(columns=columns_to_drop)
        df_merged = pd.merge(df1, df2, on=["header.stamp"]).dropna()
        stationary_data[pose][i]["merged"] = df_merged.apply(reorder_joints, axis=1)

## Compute average values for each configuration

In [8]:
averaged_stationary_data = {}

for pose in stationary_data.keys():
    averaged_stationary_data[pose] = []
    for i in range(len(stationary_data[pose])):
        position = stationary_data[pose][i]["merged"]["position"]
        averaged_stationary_data[pose].append(
            stationary_data[pose][i]["merged"].drop(columns=["name", "position"]).mean()
        )
        # Position has to be computed separately as it is not a scalar
        averaged_stationary_data[pose][i]["position"] = position.mean()

  stationary_data[pose][i]["merged"].drop(columns=["name", "position"]).mean()


## Precompute rotation matrices and force and torque vectors

In [9]:
processed_stationary_data = {}

for pose in averaged_stationary_data.keys():
    processed_stationary_data[pose] = []
    for i in range(len(averaged_stationary_data[pose])):
        data_sample = averaged_stationary_data[pose][i]
        pin.framesForwardKinematics(model, data, data_sample["position"])

        R_wm = data.oMf[base_frame_id].actInv(data.oMf[tcp_pose_frame_id]).rotation
        R_ws = data.oMf[base_frame_id].actInv(data.oMf[sensor_frame_id]).rotation
        processed_stationary_data[pose].append(
            {
                # Rotation from world to mounting plate
                "R_wm": np.array(R_wm, copy=True),
                # Rotation from world to sensor frame
                "R_ws": np.array(R_ws, copy=True),
                # Negate both force and torque as measurements are flipped
                "f": -np.array([data_sample["wrench.force." + dir] for dir in "xyz"]),
                "tau": -np.array(
                    [data_sample["wrench.torque." + dir] for dir in "xyz"]
                ),
            }
        )

## Validate rotations

Ensure rotation matrices computed from averaged joint configurations match valued expected during the experiment.

In [10]:
import yaml

with open("../config/reference_pose_publisher_params.yaml") as stream:
    try:
        params = yaml.safe_load(stream)
    except yaml.YAMLError as exc:
        print(exc)

params = params["/**"]["ros__parameters"]
keys = [key for key in set(params["poses_to_reach_names"]) if "_transition" not in key]
expected_configurations = {
    key: pin.Quaternion(np.array(params[key]["quat"])) for key in keys
}

print(f"{'pose name':15} delta angle in degrees")
print()
for p in processed_stationary_data.keys():
    pose = processed_stationary_data[p][0]
    expected_quat = expected_configurations[p]

    diff = expected_quat * pin.Quaternion(pose["R_wm"].T)
    print(f"{p:15}", np.rad2deg(pin.rpy.matrixToRpy(diff.matrix())))

pose name       delta angle in degrees

down_1          [-0.093 -0.016 0.567]
down_0          [-0.090 -0.017 0.384]
down_2          [-0.074 -0.018 0.238]
up_side_down_1  [-0.155 -0.588 0.536]
up_side_down_2  [-0.240 -0.560 0.116]
up_side_down_0  [-0.095 -0.486 0.190]
curled_1        [0.266 0.467 -0.016]
curled_0        [0.097 0.459 -0.033]
curled_2        [0.098 0.434 -0.033]
up_right_0      [-0.277 -0.267 0.306]
up_right_1      [-0.224 -0.015 0.269]
up_right_2      [-0.172 -0.075 0.230]
up_right_3      [-0.124 -0.052 0.203]
stretched_0     [0.692 0.305 0.140]
stretched_1     [0.283 0.433 0.031]
stretched_2     [0.337 0.400 -0.025]
stretched_3     [0.145 0.355 -0.206]


## Fit the model only on forces

This checks if a simpler model can be fit. Initial mass is larger than what can be found in URDF of the robot. Rotation is between expected frame, based on the datasheet making the whole method do only small refinement, smaller than 5 degree

In [18]:
# Create gravity vector
g = np.array([0.0, 0.0, -9.81])

# Initial guess for bias based on two poses, where expected measured force can be guesses
b_init = (
    processed_stationary_data["down_0"][0]["f"]
    + processed_stationary_data["up_side_down_0"][0]["f"]
) / 2
# Bound expected bias
b_lim = [sorted((0.5 * b, 1.5 * b)) for b in b_init]


# Cost function
def cost(x):
    # Mass
    m = x[0]
    # Angular velocity between expected sensor frame and refined sensor frame
    v = x[1:4]
    # Bias
    b = x[4:]
    sum = 0.0
    j = 0.0
    for p in processed_stationary_data.keys():
        for i in range(len(processed_stationary_data[p])):
            pose = processed_stationary_data[p][i]
            R = pose["R_ws"].copy()
            f = pose["f"].copy()

            diff = m * pin.exp3(v).T @ R.T @ g - (f - b)
            sum += np.sum(diff * diff)
            j += 1.0
    # Return normalized cost
    return sum / j


rot_err = np.deg2rad(5)

# SLSQP is used without gradient.
# This way it approximates it.
res = scipy.optimize.minimize(
    cost,
    [0.8, 0.0, 0.0, 0.0, *b_init],
    method="SLSQP",
    bounds=[
        (0.78, 1.0),
        (-rot_err, rot_err),
        (-rot_err, rot_err),
        (-rot_err, rot_err),
        *b_lim,
    ],
    tol=1e-7,
)
m = res.x[0]
v = res.x[1:4]
b = res.x[4:]
print(f"m: {m} [kg]")
print(f"v: {np.rad2deg(pin.rpy.matrixToRpy(pin.exp3(v)))} [deg]")
print(f"b: {b} [N]")

m: 0.7881260028387547 [kg]
v: [-1.040 0.089 -0.073] [deg]
b: [0.121 -0.269 7.788] [N]


## Validate results

Convert unbiased, measured force to world frame. Expected values is, all vectors are close to gravity vector and cost being close to zero.

In [19]:
print(f"{'pose name':15} {'Normalized measured force in world frame':42} cost")
print()
for p in processed_stationary_data.keys():
    pose = processed_stationary_data[p][0]
    R = pose["R_ws"].copy()
    f = pose["f"].copy()
    diff = m * pin.exp3(v).T @ R.T @ g - (f - b)
    cost = np.sum(diff * diff)
    g_reconstruct = R @ pin.exp3(v) @ (f - b) / m
    print(f"{p:15} {str(g_reconstruct):42} {cost:.3f}")

pose name       Normalized measured force in world frame   cost

down_1          [0.024 -0.231 -9.941]                      0.044
down_0          [0.181 -0.281 -9.945]                      0.081
down_2          [0.270 -0.173 -9.959]                      0.077
up_side_down_1  [-0.163 -0.277 -10.277]                    0.199
up_side_down_2  [-0.022 -0.157 -10.237]                    0.129
up_side_down_0  [-0.075 -0.137 -10.240]                    0.130
curled_1        [0.382 -0.255 -9.462]                      0.206
curled_0        [0.221 -0.034 -9.715]                      0.037
curled_2        [0.153 -0.334 -9.650]                      0.100
up_right_0      [-0.248 0.063 -9.750]                      0.043
up_right_1      [0.064 -0.082 -9.471]                      0.078
up_right_2      [-0.101 -0.209 -9.673]                     0.045
up_right_3      [0.217 -0.244 -9.654]                      0.082
stretched_0     [-0.120 -0.401 -9.791]                     0.109
stretched_1     [0.053 -0

## Fit the model with forces and torques

Extend the model to expect both forces and torques

In [20]:
# Create gravity vector
g = np.array([0.0, 0.0, -9.81])

# Initial guess with force and torque
b_init = (
    np.hstack([processed_stationary_data["down_0"][0][val] for val in ("f", "tau")])
    + np.hstack(
        [processed_stationary_data["up_side_down_0"][0][val] for val in ("f", "tau")]
    )
) / 2
# Create bounds for the initial guess
b_lim = [sorted((0.5 * b, 1.5 * b)) for b in b_init]


def cost(x):
    # Mass
    m = x[0]
    # Angular velocity
    vr = np.array(x[1:4])
    # Angular velocity between expected sensor frame and refined sensor frame
    vm = np.array(x[4:7])
    # Bias
    b = np.array(x[7:])
    sum = 0.0
    j = 0.0
    for p in processed_stationary_data.keys():
        for i in range(len(processed_stationary_data[p])):
            pose = processed_stationary_data[p][i]
            R = pose["R_ws"].copy()
            f = pose["f"].copy()
            tau = pose["tau"].copy()

            diff = np.vstack((np.eye(3), pin.skew(vm))) @ (
                m * pin.exp3(vr).T @ R.T @ g
            ) - (np.hstack((f, tau)) - b)
            sum += np.sum(diff * diff)
            j += 1.0
    # Return normalized cost
    return sum / j


rot_err = np.deg2rad(5)

res = scipy.optimize.minimize(
    cost,
    [0.8, 0.0, 0.0, 0.0, 0.0, 0.0, -0.1, *b_init],
    method="SLSQP",
    bounds=[
        (0.78, 1.0),
        (-rot_err, rot_err),
        (-rot_err, rot_err),
        (-rot_err, rot_err),
        (-0.05, 0.05),
        (-0.05, 0.05),
        (-0.2, 0.2),
        *b_lim,
    ],
    tol=1e-7,
)
m = res.x[0]
vr = np.array(res.x[1:4])
vm = np.array(res.x[4:7])
b = np.array(res.x[7:])

print(f"m: {m} [kg]")
print(f"vr: {np.rad2deg(pin.rpy.matrixToRpy(pin.exp3(v)))} [deg]")
print(f"vm: {vm} [m]")
print(f"b: {b} [N, N/m]")

m: 0.7881268685939541 [kg]
vr: [-1.040 0.089 -0.073] [deg]
vm: [0.004 0.006 -0.050] [m]
b: [0.121 -0.269 7.788 0.048 -0.037 -0.003] [N, N/m]




## Validate results

Check if torques with bias removed can be recreated from measured forces without the bias.

In [21]:
print(f"{'pose name':15} {'Measured torque':22} {'Computed torque':22}")
print()
for p in processed_stationary_data.keys():
    for i in range(len(processed_stationary_data[p])):
        pose = processed_stationary_data[p][i]
        R = pose["R_ws"].copy()
        f = pose["f"].copy()
        tau = pose["tau"].copy()

        print(f"{p:15} {str(tau - b[3:]):22} {str(np.cross(vm, f - b[:3])):22}")

pose name       Measured torque        Computed torque       

down_1          [-0.050 0.038 0.004]   [-0.039 0.038 0.002]  
down_1          [-0.048 0.036 0.004]   [-0.044 0.031 0.001]  
down_0          [-0.040 0.030 0.004]   [-0.029 0.029 0.002]  
down_0          [-0.038 0.031 0.003]   [-0.034 0.026 0.001]  
down_2          [-0.047 0.018 0.004]   [-0.037 0.017 -0.001] 
down_2          [-0.039 0.015 0.004]   [-0.039 0.012 -0.001] 
up_side_down_1  [0.035 -0.034 -0.004]  [0.040 -0.041 -0.002] 
up_side_down_1  [0.031 -0.033 -0.002]  [0.036 -0.046 -0.003] 
up_side_down_2  [0.050 -0.020 0.001]   [0.051 -0.024 0.001]  
up_side_down_2  [0.050 -0.018 -0.001]  [0.046 -0.028 -0.000] 
up_side_down_0  [0.040 -0.023 -0.009]  [0.039 -0.026 -0.000] 
up_side_down_0  [0.035 -0.022 -0.003]  [0.032 -0.032 -0.002] 
curled_1        [-0.329 0.194 -0.001]  [-0.319 0.193 0.001]  
curled_1        [-0.324 0.195 -0.001]  [-0.323 0.190 0.000]  
curled_0        [0.193 0.340 0.061]    [0.193 0.332 0.057]   
curled_

## Dynamic calibration. TBD...

In [15]:
df_control = pd.read_parquet("results/control_measurements.parquet")
df_ft = pd.read_parquet(
    "results/force_torque_sensor_broadcaster_wrench_measurements.parquet"
)
df_franka_joint_states = pd.read_parquet(
    "results/franka_joint_states_measurements.parquet"
)
df_sensor = pd.read_parquet("results/sensor_measurements.parquet")


df_joint_states["header.stamp"] = (
    df_joint_states["header.stamp.sec"] * 1e9 + df_joint_states["header.stamp.nanosec"]
)
df_ft["header.stamp"] = df_ft["header.stamp.sec"] * 1e9 + df_ft["header.stamp.nanosec"]

# Map dataframes to topic names
measurements = {
    "joint_states": df_joint_states,
    "force_torque": df_ft,
}

stationary_data = {}
# Choose offsets for clipping
start_stamp_offset = int(3.0 * 1e9)  # 3 seconds
end_stamp_offset = int(19.0 * 1e9)  # 19 seconds
for i, data in df_stamps_stationary.iterrows():
    # Compute start and end time
    start = data["timestamp"] + start_stamp_offset
    end = data["timestamp"] + end_stamp_offset
    pose_name = data["data"]
    if pose_name not in stationary_data.keys():
        stationary_data[pose_name] = []
    stationary_data[pose_name].append(
        {
            topic: df.loc[(df["header.stamp"] > start) & (df["header.stamp"] < end)]
            for topic, df in measurements.items()
        }
    )

# Remove columns that will not be used
columns_to_drop = [
    "timestamp",
    "header.stamp.sec",
    "header.stamp.nanosec",
    "header.frame_id",
]


def reorder_joints(row):
    joints = row["name"]
    joint_map = [np.argwhere(joints == joint_name)[0][0] for joint_name in joint_order]
    row["name"] = joint_order
    row["position"] = row["position"][joint_map]
    row["velocity"] = row["velocity"][joint_map]
    row["effort"] = row["effort"][joint_map]
    return row


for pose in stationary_data.keys():
    for i in range(len(stationary_data[pose])):
        df1 = stationary_data[pose][i]["force_torque"].drop(columns=columns_to_drop)
        df2 = stationary_data[pose][i]["joint_states"].drop(columns=columns_to_drop)
        df_merged = pd.merge(df1, df2, on=["header.stamp"]).dropna()
        stationary_data[pose][i]["merged"] = df_merged.apply(reorder_joints, axis=1)

FileNotFoundError: [Errno 2] No such file or directory: 'results/control_measurements.parquet'