In [1]:
import numpy as np
import symforce
symforce.set_epsilon_to_symbol()
from symforce.values import Values
from symforce.opt.factor import Factor
import symforce.symbolic as sf


In [2]:
num_landmarks = 3
lm =np.array([
        [0, 2],
        [4, 6],
        [9, 1],
        [4,2],
        [9, 1.5],
        [9,10],
        [-5,15],
        [-7,15],])
x=np.array([[0,0],
             [1.98,.001]])
odom=np.array([[1.98,.001]])

# Maybe do range, bearing, pose, landmark
z = np.array([[2,1.57079633,0,0]])

initial_values = Values(
    poses=[sf.V2(i,j) for i,j in x],
    landmarks=[sf.V2(i,j) for i,j in lm],
    odom=[sf.V2(i,j) for i,j in odom],
    meas=[sf.V2(i,j) for i,j,k,m in z],
    epsilon=sf.numeric_epsilon,
)
print(initial_values)

Values(
  poses: [[0.0]
[0.0]
, [1.98]
[0.001]
],
  landmarks: [[0.0]
[2.0]
, [4.0]
[6.0]
, [9.0]
[1.0]
, [4.0]
[2.0]
, [9.0]
[1.5]
, [9.0]
[10.0]
, [-5.0]
[15.0]
, [-7.0]
[15.0]
],
  odom: [[1.98]
[0.001]
],
  meas: [[2.0]
[1.57079633]
],
  epsilon: 2.220446049250313e-15,
)


In [3]:
def bearing_residual(
    pose: sf.Pose2, landmark: sf.V2, angle: sf.Scalar, epsilon: sf.Scalar
) -> sf.V1:
    t_body = pose.inverse() * landmark
    predicted_angle = sf.atan2(t_body[1], t_body[0], epsilon=epsilon)
    return sf.V1(sf.wrap_angle(predicted_angle - angle))

In [4]:
def range_residual(
    pose: sf.Pose2, landmark: sf.V2, angle: sf.Scalar, epsilon: sf.Scalar
) -> sf.V1:
    t_body = pose.inverse() * landmark
    predicted_angle = sf.atan2(t_body[1], t_body[0], epsilon=epsilon)
    return sf.V1(sf.wrap_angle(predicted_angle - angle))

In [5]:
def meas_residual(
    pose: sf.V2, landmark: sf.V2, meas: sf.V2, epsilon: sf.Scalar, 
) -> sf.V1:
    
    Q = np.array([[0.0,0.0],[0.0,0.0]]) 
    rng_std = .5
    bearing_std = .5
    Q[0, 0] = rng_std**2
    Q[1, 1] = bearing_std**2
    
    Q_I = np.linalg.inv(Q)
    
    rng, bearing = meas
    # predicted measurement
    dm = sf.V2(landmark - pose)
    z_pred = sf.V2([[dm.norm(epsilon=epsilon)],[sf.atan2(dm[1], dm[0],epsilon=epsilon)]])
    print(z_pred)

    # error
    e_z = meas - z_pred
    # cost
    J = e_z@Q_I@e_z.T

    return sf.V1(J)

In [6]:
def odometry_residual(
    pose_0: sf.V2, pose_1: sf.V2, odom: sf.V2, epsilon: sf.Scalar, 
) -> sf.V1:
    
    R = np.array([[0.0,0.0],[0.0,0.0]]) 
    odom_x_std = 3
    odom_y_std = 3
    R[0, 0] = odom_x_std**2
    R[1, 1] = odom_y_std**2
    R_I = np.linalg.inv(R)
    
    odom_pred = pose_1-pose_0    # error
    e_x = sf.V2(odom-odom_pred)
    # cost
    J = e_x@R_I@e_x.T
    print(J)
    return sf.V1(J)

In [7]:
from symforce.opt.factor import Factor

factors = []

# Bearing factors

for j in range(len(z)):
    factors.append(Factor(
        residual=meas_residual,
        keys=[f"poses[{int(z[j][2])}]", f"landmarks[{int(z[j][3])}]", f"meas[{j}]", "epsilon"],
    ))

# Odometry factors
for i in range(len(x) - 1):
    factors.append(Factor(
        residual=odometry_residual,
        keys=[f"poses[{i}]", f"poses[{i + 1}]", f"odom[{i}]", "epsilon"],
    ))
print(factors)

[sqrt(epsilon + (landmark0 - pose0)**2 + (landmark1 - pose1)**2)]
[atan2(landmark1 - pose1, landmark0 - pose0 + epsilon*(0.5 + sign(landmark0 - pose0)))]

0.111111111111111*(odom0 - (-pose_00 + pose_10))**2 + 0.111111111111111*(odom1 - (-pose_01 + pose_11))**2
[<symforce.opt.factor.Factor object at 0x7f5a809ea5f0>, <symforce.opt.factor.Factor object at 0x7f5a809ea920>]


In [8]:
from symforce.opt.optimizer import Optimizer

optimizer = Optimizer(
    factors=factors,
    optimized_keys=[f"poses[{i}]" for i in range(1)],
    debug_stats=True,
)

In [9]:
result = optimizer.optimize(initial_values)

[2023-02-03 12:01:44.228] [info] LM<sym::Optimize> [iter    0] lambda: 1.000e+00, error prev/linear/new: 0.000/0.000/0.000, rel reduction: 0.00000


In [10]:
plot_scuff(optimizer, result)

NameError: name 'plot_scuff' is not defined

In [24]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import animation
from matplotlib.widgets import Slider

import sym
from symforce.opt.optimizer import Optimizer
from symforce.python_util import AttrDict
from symforce.values import Values


def plot_scuff(optimizer: Optimizer, result: Optimizer.Result, animated: bool = False) -> None:
    """
    Visualize the optimization problem along its iterations. If animated is True, displays a
    matplotlib animation instead of providing an interactive slider.
    """
    # Pull out values from the result
    values_per_iter = [
        optimizer.load_iteration_values(stats.values) for stats in result.iteration_stats
    ]

    # Create the layout
    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.set_aspect("equal", adjustable="box")
    plt.grid(True, alpha=0.5)
    plt.tight_layout()

    # Pull out quantities to plot
    data = get_data_to_plot(result.optimized_values)

    # Draw a circle at the origin
    plt.scatter(x=[0], y=[0], color="black", s=50, zorder=2)

    # Draw landmark locations
    plt.scatter(data.landmark_xy[:, 0], data.landmark_xy[:, 1], color="orange", s=250, zorder=3)

    # Draw poses
    poses_circles = plt.scatter(
        data.pose_xy[:, 0], data.pose_xy[:, 1], color="skyblue", zorder=3, s=500
    )

    # Draw lines connecting poses
    poses_lines = plt.plot(
        data.pose_xy[:, 0], data.pose_xy[:, 1], color="black", zorder=2, alpha=0.8
    )

    # Draw X/Y axes for pose locations
    pose_vectors_x = plt.quiver(
        data.pose_xy[:, 0],
        data.pose_xy[:, 1],
        zorder=4,
        width=0.003,
        color="blue",
    )
    pose_vectors_y = plt.quiver(
        data.pose_xy[:, 0],
        data.pose_xy[:, 1],
        zorder=4,
        width=0.003,
        color="red",
    )

    # Draw dotted lines from poses to their landmark heading measurements
    heading_arrows = [
        plt.quiver(
            data.pose_xy[:, 0],
            data.pose_xy[:, 1],
            scale=1.0,
            zorder=2,
            width=0.003,
            linestyle=":",
            facecolor="none",
            linewidth=0.8,
            alpha=0.5,
            headwidth=0,
            headlength=0,
            capstyle="butt",
        )
        for landmark_inx in range(data.heading_vectors.shape[1])
    ]

    # Text box to write iteration stats
    text = ax.text(3.0, -2.6, "-", fontsize=10)

    def update_plot(slider_value: np.float64) -> None:
        """
        Update the plot using the given iteration.
        """
        num = int(slider_value)

        # Set iteration text and abort if we rejected this iteration
        stats = result.iteration_stats[num]
        if num > 0 and not stats.update_accepted:
            text.set_text(f"Iteration: {num} (rejected)\nError: {stats.new_error:.6f}")
            return
        text.set_text(f"Iteration: {num}\nError: {stats.new_error:.6f}")

        # Get plottable data for this iteration
        v = values_per_iter[num]
        data = get_data_to_plot(v)

        # Update the pose locations and connecting lines
        poses_circles.set_offsets(data.pose_xy)
        poses_lines[0].set_data(data.pose_xy.T)

        # Update pose axes
        pose_vectors_x.set_offsets(data.pose_xy)
        pose_vectors_y.set_offsets(data.pose_xy)

        # Update heading measurement vectors to landmarks
        # for landmark_inx in range(len(v["landmarks"])):
        #     heading_arrows[landmark_inx].set_offsets(data.pose_xy)
        #     heading_arrows[landmark_inx].set_UVC(
        #         data.heading_vectors[:, landmark_inx, 0], data.heading_vectors[:, landmark_inx, 1]
        #     )

    if animated:
        _ = animation.FuncAnimation(
            fig, update_plot, len(values_per_iter), fargs=tuple(), interval=250
        )
        plt.show()
    else:
        # Add a slider for iterations at the bottom of the plot
        plt.subplots_adjust(bottom=0.2)
        ax_slider = plt.axes([0.25, 0.1, 0.65, 0.03])
        iteration_slider = Slider(
            ax=ax_slider,
            label="Iteration",
            valmin=0,
            valmax=len(values_per_iter) - 1,
            valinit=len(values_per_iter) - 1,
            valfmt="%0.0f",
        )
        iteration_slider.on_changed(update_plot)
        iteration_slider.set_val(len(values_per_iter) - 1)
        plt.show()


def get_data_to_plot(v: Values) -> AttrDict:
    """
    Compute direct quantities needed for plotting.
    """
    data = AttrDict()

    # Landmark positions
    data.landmark_xy = np.array(v["landmarks"])

    # Pose positions
    data.pose_xy = np.array(v["poses"])

    # Pose x/y axis vectors
    # data.pose_x_axes = np.array([p.rotation() * np.array([1, 0]) for p in v["poses"]])
    # data.pose_y_axes = np.array([p.rotation() * np.array([0, 1]) for p in v["poses"]])

    # Measurement heading vectors from each pose to each landmark
    # data.heading_vectors = np.array(
    #     [
    #         [
    #             v["poses"][i].rotation()
    #             * sym.Rot2.from_tangent(np.array([v["angles"][i][landmark_inx]]))
    #             * np.array([50, 0])
    #             for landmark_inx in range(len(v["landmarks"]))
    #         ]
    #         for i in range(len(v["poses"]))
    #     ]
    # )

    return 