<h1>Creating Custom Cost Functions</h1>

In this tutorial, we show how to create a custom cost function that might be needed for an application. While we can always use the `AutoDiffCostFunction` by simply writing an error function, it is often more efficient for compute-intensive applications to derive a new `CostFunction` subclass and use closed-form Jacobians. 

We will show how to write a custom `VectorDifference` cost function in this tutorial. This cost function provides the difference between two `Vector`s as the error. 

Note: `VectorDifference` is a simplified version of the `Difference` cost function already provided in the Theseus library, and shown in Tutorial 0. `Difference` can be used on any LieGroup, while `VectorDifference` can only be used on Vectors.

<h2>Initialization</h2> 

Any `CostFunction` subclass should be initialized with a `CostWeight` and all arguments needed to compute the cost function. In this example, we set up `__init__` function for `VectorDifference` to require as input the two `Vector`s whose difference we wish to compute: the `Vector` to be optimized, `var`, and the `Vector` that is the reference for comparison, `target`. 

In addition, the `__init__` function also needs to register the optimization variables and all the auxiliary variables. In this example, optimization variable `var` is registered with `register_optim_vars`. The other input necessary to evaluate the cost, `target` is registered with `register_aux_vars`. This is required for the nonlinear optimizers to work correctly: these functions register the optimization and auxiliary variables into internal lists, and then are easily used by the relevant `Objective` to add them, ensure no name collisions, and to update them with new values.

The `CostWeight` is used to weight the errors and jacobians, and is required by every `CostFunction` sub-class (the error and jacobian weighting functions are inherited from the parent `CostFunction` class.)

In [1]:
from typing import List, Optional, Tuple
import theseus as th

class VectorDifference(th.CostFunction):
    def __init__(
        self,
        cost_weight: th.CostWeight,
        var: th.Vector,
        target: th.Vector,
        name: Optional[str] = None,
    ):
        super().__init__(cost_weight, name=name) 

        # add checks to ensure the input arguments are of the same class and dof:
        if not isinstance(var, target.__class__):
            raise ValueError(
                "Variable for the VectorDifference inconsistent with the given target."
            )
        if not var.dof() == target.dof():
            raise ValueError(
                "Variable and target in the VectorDifference must have identical dof."
            )

        self.var = var
        self.target = target

        # register variable and target
        self.register_optim_vars(["var"])
        self.register_aux_vars(["target"])

<h2>Implement abstract functions</h2> 

Next, we need to implement the abstract functions of `CostFunction`: `dim`, `error`, `jacobians`, and `_copy_impl`:
- `dim`: returns the degrees of freedom (`dof`) of the error; in this case, this is the `dof` of the optimization variable `var`
- `error`: returns the difference of Vectors i.e. `var` - `target`
- `jacobian`: returns the Jacobian of the error with respect to the `var`
- `_copy_impl`: creates a deep copy of the internal class members

We illustrate these below (including once again the `__init__` function from above, so the class is fully defined.)

In [2]:
import torch 

class VectorDifference(th.CostFunction):
    def __init__(
        self,
        cost_weight: th.CostWeight,
        var: th.Vector,
        target: th.Vector,
        name: Optional[str] = None,
    ):
        super().__init__(cost_weight, name=name) 
        self.var = var
        self.target = target
        # to improve readability, we have skipped the data checks from code block above
        self.register_optim_vars(["var"])
        self.register_aux_vars(["target"])

    def error(self) -> torch.Tensor:
        return (self.var - self.target).tensor

    def jacobians(self) -> Tuple[List[torch.Tensor], torch.Tensor]:
        return [
            # jacobian of error function wrt var is identity matrix I
            torch.eye(self.dim(), dtype=self.var.dtype)  
            # repeat jacobian across each element in the batch
            .repeat(self.var.shape[0], 1, 1)  
            # send to variable device
            .to(self.var.device)  
        ], self.error()

    def dim(self) -> int:
        return self.var.dof()

    def _copy_impl(self, new_name: Optional[str] = None) -> "VectorDifference":
        return VectorDifference(  # type: ignore
            self.var.copy(), self.weight.copy(), self.target.copy(), name=new_name
        )

<h2>Usage</h2>

We show now that the `VectorDifference` cost function works as expected. 

For this, we create a set of `VectorDifference` cost functions each over a pair of `Vector`s <i>a_i</i> and <i>b_i</i>, and add them to an `Objective`. We then create the data for each `Vector` <i>a_i</i> and <i>b_i</i> of the `VectorDifference` cost functions, and `update` the `Objective` with it. The code snippet below shows that the `Objective` error is correctly computed.

We use a `ScaleCostWeight` as the input `CostWeight` here: this is a scalar real-valued `CostWeight` used to weight the `CostFunction`; for simplicity we use a fixed value of 1. in this example.

In [3]:
cost_weight = th.ScaleCostWeight(1.0)

# construct cost functions and add to objective
objective = th.Objective()
num_test_fns = 10
for i in range(num_test_fns):
    a = th.Vector(2, name=f"a_{i}")
    b = th.Vector(2, name=f"b_{i}")
    cost_fn = VectorDifference(cost_weight, a, b)
    objective.add(cost_fn)
    
# create data for adding to the objective
theseus_inputs = {}
for i in range(num_test_fns):
    # each pair of var/target has a difference of [1, 1]
    theseus_inputs.update({f"a_{i}": torch.ones((1,2)), f"b_{i}": 2 * torch.ones((1,2))})

objective.update(theseus_inputs)
# sum of squares of errors [1, 1] for 10 cost fns: the result should be 20
error_sq = objective.error_squared_norm()
print(f"Sample error squared norm: {error_sq.item()}")

Sample error squared norm: 20.0


# Auto check Jacobian

You can also check your hand-written Jacobian by comparing the Jacobian results from AutoDiffCostFunction()

In [4]:
import torch
import numpy as np
import theseus as th
from typing import Any, Dict, List, Optional, Tuple, Type, Union, cast
from theseus.core import CostFunction, CostWeight, Variable, as_variable
from theseus.geometry import LieGroup, Vector

This is a cost function define a simple problem as below: Assuming you know a certain number of the base stations and their position, and you also got the distance between the user and the base station. Then you can calculate the user location.   

In [5]:
def generateInput(SampleNum = 100):

    BasePos_x = th.Variable(torch.randint(100,(SampleNum,3),dtype = torch.float64), name="BasePos_x")
    BasePos_y = th.Variable(torch.randint(100,(SampleNum,3),dtype = torch.float64), name="BasePos_y")
    Pos_x = th.Variable(torch.randint(100,(SampleNum,1),dtype = torch.float64), name="Pos_x")
    Pos_y = th.Variable(torch.randint(100,(SampleNum,1),dtype = torch.float64), name="Pos_y")
    Distance_x = BasePos_x.tensor - Pos_x.tensor
    Distance_y = BasePos_y.tensor - Pos_y.tensor
    #Distance = torch.sum((BasePos_x.tensor - Pos_x.tensor).square() + (BasePos_y.tensor - Pos_y.tensor).square(), dim = 1, keepdim = True).sqrt()
    Distance = (Distance_x.square() + Distance_y.square()).sqrt()
    Range = th.Variable(Distance, name = "Range")
    
    
    return Range, BasePos_x, BasePos_y


def test_jacobian_Cost(Pos_x, Pos_y, Range, BasePos_x, BasePos_y, cost_weight, name):

    cost_function = toyCost(Pos_x, Pos_y, Range, BasePos_x, BasePos_y, cost_weight)

    aux_vars = Range,BasePos_x,BasePos_y
    optim_vars = Pos_x, Pos_y
    iteration = Range.tensor.shape[1]

    cost_function_numeric = th.AutoDiffCostFunction(optim_vars, cost_function.err_func_numeric, iteration, aux_vars=aux_vars)
    
    expected_jacs, error_jac = cost_function_numeric.jacobians()
    #print(expected_jacs)
    jacobians, error_jac = cost_function.jacobians()
    #print(jacobians)
    error = cost_function.error()
    print("checking Jacobian for Cost...")
    
    for i in range(len(jacobians)):
        assert torch.allclose(jacobians[i], expected_jacs[i], atol=1e-8)

In [6]:
class toyCost(CostFunction):
    def __init__(
        self,
        Pos_x: Vector,
        Pos_y: Vector,

        Range: Vector, 
        BasePos_x: Vector,
        BasePos_y: Vector,


        cost_weight: CostWeight,
        
        name: Optional[str] = None,
    ):
        super().__init__(cost_weight, name=name)
     

        self.Pos_x = Pos_x
        self.Pos_y = Pos_y
        
        self.Range = as_variable(Range)
        self.BasePos_x = as_variable(BasePos_x)
        self.BasePos_y = as_variable(BasePos_y)

        self.register_optim_vars(["Pos_x", "Pos_y"])
       
        self.register_aux_vars(["Range", "BasePos_x", "BasePos_y"])
        self.weight = cost_weight


    # Add this function for autochecking Jocobian. What you need to do is to warp the inputs to two types: optim_vars and aux_vars    
    def err_func_numeric(self, optim_vars, aux_vars):
        Pos_x,Pos_y = optim_vars
        Pos_x = Pos_x.tensor
        Pos_y = Pos_y.tensor
        
        Range, BasePos_x, BasePos_y = aux_vars
        Range = Range.tensor
        BasePos_x = BasePos_x.tensor
        BasePos_y = BasePos_y.tensor
        
        # Copy it from the self-defined cost function _err_func
        distance_0 = ((Pos_x - BasePos_x).square()+ (Pos_y - BasePos_y).square()).sqrt()


        err = distance_0 - Range

        return err
    
    
    def _err_func(self):
        
        Pos_x = self.Pos_x.tensor
        Pos_y = self.Pos_y.tensor
        Range = self.Range.tensor
        BasePos_x = self.BasePos_x.tensor
        BasePos_y = self.BasePos_y.tensor
      
        distance_0 = ((Pos_x - BasePos_x).square()+ (Pos_y - BasePos_y).square()).sqrt()


        err = distance_0 - Range
        print(err.shape)
       
        #print(err_0.shape)
        return err
    
    
    def dim(self):
        return BasePos_x.shape[1]

    def error(self) -> torch.Tensor:
        return self._err_func()

    def jacobians(self) -> Tuple[List[torch.Tensor], torch.Tensor]:
        
        batch_size = self.Range.shape[0]
        
        
        dtype = self.Range.dtype
        device = self.Range.device
        

        
        Pos_x = self.Pos_x.tensor
        Pos_y = self.Pos_y.tensor
        

        Range = self.Range.tensor
        BasePos_x = self.BasePos_x.tensor
        BasePos_y = self.BasePos_y.tensor
        dof = BasePos_x.shape[1]
        
        interTerm = (Pos_x - BasePos_x).square()+ (Pos_y - BasePos_y).square()
        
        
        Jacob_Pos_x = torch.zeros(batch_size, dof, 1, dtype=dtype, device=device)
        Jacob_Pos_y = torch.zeros(batch_size, dof, 1, dtype=dtype, device=device)
        
        dErr_x = torch.pow(interTerm, -0.5) * (Pos_x - BasePos_x)

        dErr_y = torch.pow(interTerm, -0.5) * (Pos_y - BasePos_y)
       
        Jacob_Pos_x[:,:,0] = dErr_x
        Jacob_Pos_y[:,:,0] = dErr_y
        
        error = self.error()
        
        return [Jacob_Pos_x, Jacob_Pos_y], error

    def _copy_impl(self, new_name: Optional[str] = None) -> "toyCost":
        return toyCost(
            self.Pos_x.copy(),
            self.Pos_y.copy(),
            self.Range.copy(),
            self.BasePos_x.copy(),
            self.BasePos_y.copy(),
            self.weight.copy(),
            name=new_name,
        )

In [7]:
torch.set_default_dtype(torch.float64)

In [8]:
numSample = 100

In [9]:
Range, BasePos_x, BasePos_y = generateInput(numSample)

In [10]:
Pos_x = th.Vector(tensor=torch.zeros(numSample, 1), name="Pos_x")
Pos_y = th.Vector(tensor=torch.zeros(numSample, 1), name="Pos_y")

In [11]:
cost_weight = th.ScaleCostWeight(1.0, name = "CostWeight")
objective = toyCost(Pos_x, Pos_y, Range, BasePos_x, BasePos_y, cost_weight, name = "cost0")

In [12]:
test_jacobian_Cost(Pos_x, Pos_y, Range, BasePos_x, BasePos_y, cost_weight, name = "cost0")

torch.Size([100, 3])
torch.Size([100, 3])
checking Jacobian for Cost...
