In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Function

import einops

import matplotlib.pyplot as plt

import casadi as ca

import numpy as np

torch_device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [2]:
def patchify(images, patch_size=5):
    """Splitting images into patches.
    Args:
        images: Input tensor with size (batch, channels, height, width)
    Returns:
        A batch of image patches with size (
          batch, (height / patch_size) * (width / patch_size), 
        channels * patch_size * patch_size)
    """
    return einops.rearrange(
        images,
        'b (h p1) (w p2) -> b (h w) (p1 p2)',
        p1=patch_size,
        p2=patch_size
    )

def unpatchify(patches, patch_size=5):
    """Combining patches into images.
    Args:
        patches: Input tensor with size (
        batch, (height / patch_size) * (width / patch_size), 
        channels * patch_size * patch_size)
    Returns:
        A batch of images with size (batch, channels, height, width)
    """
    return einops.rearrange(
        patches,
        'b (h w) (p1 p2) -> b (h p1) (w p2)',
        p1=patch_size,
        p2=patch_size,
        h=int(patches.shape[1] ** 0.5),
        w=int(patches.shape[1] ** 0.5),
    )


In [3]:
grids, commands = torch.load('data/robot_field_data.pt')

In [4]:
grid_patchified = patchify([grids[0]])
grid_patchified.shape

torch.Size([1, 400, 25])

## Define Transformer Encoder

In [5]:
class Transformer(nn.Module):
    """Transformer Encoder 
    Args:
        embedding_dim: dimension of embedding
        n_heads: number of attention heads
        n_layers: number of attention layers
        feedforward_dim: hidden dimension of MLP layer
    Returns:
        Transformer embedding of input
    """
    # TODO embedding_dim? -> size of Q,p ???
    def __init__(self, embedding_dim=256, n_heads=1, n_layers=3, feedforward_dim=64):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.n_layers = n_layers
        self.n_heads = n_heads
        self.feedforward_dim = feedforward_dim
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=embedding_dim,
                nhead=self.n_heads,
                dim_feedforward=self.feedforward_dim,
                activation=F.gelu,
                batch_first=True,
                dropout=0.1,
            ),
            num_layers=n_layers,
        )

    def forward(self, x):
        return self.transformer(x)

## Define Optimization Problem

In [6]:
opt_N=20
opt_dt=0.1

opti = ca.Opti()

opt_x=opti.variable(3, opt_N+1)
opt_u=opti.variable(3, opt_N)

opt_x0 = opti.parameter(3)
opt_u0 = opti.parameter(3)

opt_u_des = opti.parameter(3)

opt_P = opti.parameter(6, 6)
opt_q = opti.parameter(6)

# stage cost
cost = 0
for i in range(opt_N):
  cost += 0.1*(ca.vertcat(opt_x[:, i], opt_u[:, i]).T @ opt_P.T @ opt_P @ ca.vertcat(opt_x[:, i], opt_u[:, i])
                   + opt_q.T @ ca.vertcat(opt_x[:, i], opt_u[:, i]))
  cost += 10*(opt_u_des - opt_u[:, i]).T @ (opt_u_des - opt_u[:, i])
  
opti.minimize(cost)

# system dynamics
for i in range(opt_N):
  opti.subject_to(opt_x[0, i+1] == opt_x[0, i] + opt_dt * (ca.cos(opt_x[2, i]) * opt_u[0, i] - ca.sin(opt_x[2, i]) * opt_u[1, i] + opt_u[2, i]) )
  opti.subject_to(opt_x[1, i+1] == opt_x[1, i] + opt_dt * (ca.sin(opt_x[2, i]) * opt_u[0, i] + ca.cos(opt_x[2, i]) * opt_u[1, i] + opt_u[2, i]))
  opti.subject_to(opt_x[2, i+1] == opt_x[2, i] + opt_dt * (opt_u[2, i]))
  
  # constraints on control rate
  # for i in range(N):
  #   opti.subject_to( u[:,i+1] - u[:,i] <= np.array([0.1, 0.1, 0.1]) )
  #   opti.subject_to( u[:,i+1] - u[:,i] >= -np.array([0.1, 0.1, 0.1]) )
  # opti.subject_to( u[:,0] - u0 <= np.array([0.1, 0.1, 0.1]) )
  # opti.subject_to( u[:,0] - u0 >= -np.array([0.1, 0.1, 0.1]) )
  
  # initial condition
  opti.subject_to(opt_x[:, 0] == opt_x0)
  
opti.solver('ipopt')

In [38]:
class CFTOC(Function):
  @staticmethod
  def forward(ctx, x0, u0, P, q, u_des):
    
    u_opt = []
    x_opt = []
    
    # loop through batch
    for i in range(P.shape[0]):
      opti.set_value(opt_x0, x0)
      opti.set_value(opt_u0, u0)
    
      opti.set_value(opt_u_des, u_des)

      opti.set_value(opt_P, P[i].detach().numpy())
      opti.set_value(opt_q, q[i].detach().numpy())

      sol = opti.solve()

      u_opt.append(sol.value(opt_u))        
      x_opt.append(sol.value(opt_x))        
    
    u_opt = torch.Tensor(u_opt)
    x_opt = torch.Tensor(x_opt)
    
    ctx.save_for_backward(u_opt)
    
    return x_opt, u_opt
    
  @staticmethod
  def backward(ctx, grad_output):
      result, = ctx.saved_tensors
      return grad_output * result

In [40]:
# Use it by calling the apply method:
_, u_opt = CFTOC.apply(np.array([0, 0, 0]), 
                       np.array([0, 0, 0]), 
                       torch.zeros((2,6,6)), 
                       torch.zeros((2,6)), 
                       np.array([0.4, 0, 0]))

print(u_opt)

This is Ipopt version 3.14.11, running with linear solver MUMPS 5.4.1.

Number of nonzeros in equality constraint Jacobian...:      360
Number of nonzeros in inequality constraint Jacobian.:        0
Number of nonzeros in Lagrangian Hessian.............:      420

Total number of variables............................:      123
                     variables with only lower bounds:        0
                variables with lower and upper bounds:        0
                     variables with only upper bounds:        0
Total number of equality constraints.................:      120
Total number of inequality constraints...............:        0
        inequality constraints with only lower bounds:        0
   inequality constraints with lower and upper bounds:        0
        inequality constraints with only upper bounds:        0

iter    objective    inf_pr   inf_du lg(mu)  ||d||  lg(rg) alpha_du alpha_pr  ls
   0  3.2000000e+01 0.00e+00 8.00e+00  -1.0 0.00e+00    -  0.00e+00 0.00e+00 

In [37]:
class MPCTransformer(nn.Module):
    """MPC transformer
    Args:
        TODO
        embedding_dim: dimension of embedding
        patch_size: image patch size
        num_patches: number of image patches
    Returns:
        TODO
    """
    def __init__(self, embedding_dim=256, patch_size=5, num_patches=20):
        super().__init__()
        
        self.patch_size = patch_size
        self.num_patches = num_patches
        
        self.embedding_dim = embedding_dim

        self.transformer = Transformer(embedding_dim)
        
        self.position_encoding = nn.Parameter(
            torch.randn(1, num_patches * num_patches, embedding_dim) * 0.02
        )
        
        self.patch_projection = nn.Linear(patch_size * patch_size, embedding_dim)
        
        self.output_head = nn.Sequential(
            nn.LayerNorm(embedding_dim), 
            nn.Linear(embedding_dim, 6*6+6)  # TODO P,q
        )
        
        self.cftoc = CFTOC()

    def forward(self, images):
        """ 
        (1) Splitting images into fixed-size patches; 
        (2) Linearly embed each image patch, prepend CLS token; 
        (3) Add position embeddings;
        (4) Feed the resulting sequence of vectors to Transformer encoder.
        (5) Extract the embeddings corresponding to the CLS token.
        (6) Apply output head to the embeddings to obtain the logits
        """
        patches = patchify(images, self.patch_size)
        
        patch_embeddings = self.patch_projection(patches)
        
        embeddings = patch_embeddings + self.position_encoding
        
        transformer_embeddings = self.transformer(embeddings)
        transformer_embeddings = transformer_embeddings[:, 0, :]
        
        output = self.output_head(transformer_embeddings)
        
        # split and reshape to get P and q for CFTOC cost function
        P, q = torch.split(output, [36, 6], dim=1)
        P = P.view(P.shape[0], 6, 6)
        
        
        x0 = np.array([0, 0, 0])
        u0 = np.array([0, 0, 0])
        u_des = np.array([0.4, 0, 0])
        
        P = P
        q = q

        _, u_opt = CFTOC.apply(x0, u0, P, q, u_des)

        return u_opt

In [41]:
model = MPCTransformer()

test_tensor = torch.tensor(grids[0], dtype=torch.float32).unsqueeze(0)

print(model(test_tensor))

This is Ipopt version 3.14.11, running with linear solver MUMPS 5.4.1.

Number of nonzeros in equality constraint Jacobian...:      360
Number of nonzeros in inequality constraint Jacobian.:        0
Number of nonzeros in Lagrangian Hessian.............:      420

Total number of variables............................:      123
                     variables with only lower bounds:        0
                variables with lower and upper bounds:        0
                     variables with only upper bounds:        0
Total number of equality constraints.................:      120
Total number of inequality constraints...............:        0
        inequality constraints with only lower bounds:        0
   inequality constraints with lower and upper bounds:        0
        inequality constraints with only upper bounds:        0

iter    objective    inf_pr   inf_du lg(mu)  ||d||  lg(rg) alpha_du alpha_pr  ls
   0  3.2000000e+01 0.00e+00 7.98e+00  -1.0 0.00e+00    -  0.00e+00 0.00e+00 

  test_tensor = torch.tensor(grids[0], dtype=torch.float32).unsqueeze(0)
