In [1]:
import os
import argparse
import torch
import pykitti
import numpy as np
import pypose as pp
from pypose.module import EKF,UKF,PF
from datetime import datetime
import torch.utils.data as Data
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from matplotlib.collections import PatchCollection
from torchvision.datasets.utils import download_and_extract_archive
import matplotlib.ticker as mtick
import torch, os
import numpy as np
import pypose as pp
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse
from matplotlib.legend_handler import HandlerLine2D

class KITTI_IMU(Data.Dataset):
    datalink = 'https://github.com/pypose/pypose/releases/download/v0.2.2/2011_09_26.zip'
    def __init__(self, root, dataname, drive, duration=10, step_size=1, mode='train', download=True):
        super().__init__()
        if download:
            download_and_extract_archive(self.datalink, root)
        self.duration = duration
        self.data = pykitti.raw(root, dataname, drive)
        self.seq_len = len(self.data.timestamps) - 1
        assert mode in ['evaluate', 'train',
                        'test'], "{} mode is not supported.".format(mode)
        self.gps = torch.tensor([[self.data.oxts[i].packet.lon,
                                   self.data.oxts[i].packet.lat,self.data.oxts[i].packet.alt]
                                   for i in range(self.seq_len)])
        self.input = torch.tensor([[self.data.oxts[i].packet.vf,self.data.oxts[i].packet.yaw]
                                   for i in range(self.seq_len)])

        self.dt = torch.tensor([datetime.timestamp(self.data.timestamps[i+1]) -
                               datetime.timestamp(self.data.timestamps[i])
                               for i in range(self.seq_len)])

        self.gyro = torch.tensor([[self.data.oxts[i].packet.wx,
                                   self.data.oxts[i].packet.wy,
                                   self.data.oxts[i].packet.wz]
                                   for i in range(self.seq_len)])

        self.acc = torch.tensor([[self.data.oxts[i].packet.ax,
                                  self.data.oxts[i].packet.ay,
                                  self.data.oxts[i].packet.az]
                                  for i in range(self.seq_len)])
        self.gt_rot = pp.euler2SO3(torch.tensor([[self.data.oxts[i].packet.roll,
                                                  self.data.oxts[i].packet.pitch,
                                                  self.data.oxts[i].packet.yaw]
                                                  for i in range(self.seq_len)]))
        self.gt_vel = self.gt_rot @ torch.tensor([[self.data.oxts[i].packet.vf,
                                                   self.data.oxts[i].packet.vl,
                                                   self.data.oxts[i].packet.vu]
                                                   for i in range(self.seq_len)])
        self.gt_pos = torch.tensor(
            np.array([self.data.oxts[i].T_w_imu[0:3, 3] for i in range(self.seq_len)]))

        start_frame = 0
        end_frame = self.seq_len
        if mode == 'train':
            end_frame = np.floor(self.seq_len * 0.5).astype(int)
        elif mode == 'test':
            start_frame = np.floor(self.seq_len * 0.5).astype(int)

        self.index_map = [i for i in range(
            0, end_frame - start_frame - self.duration, step_size)]

    def __len__(self):
        return len(self.index_map)

    def __getitem__(self, i):
        frame_id = self.index_map[i]

        end_frame_id = frame_id + self.duration
        return {
            'gps': self.gps[frame_id: end_frame_id],
            'input': self.input[frame_id: end_frame_id],
            'dt': self.dt[frame_id: end_frame_id],
            'acc': self.acc[frame_id: end_frame_id],
            'gyro': self.gyro[frame_id: end_frame_id],
            'gyro': self.gyro[frame_id: end_frame_id],
            'gt_pos': self.gt_pos[frame_id+1: end_frame_id+1],
            'gt_rot': self.gt_rot[frame_id+1: end_frame_id+1],
            'gt_vel': self.gt_vel[frame_id+1: end_frame_id+1],
            'init_pos': self.gt_pos[frame_id][None, ...],
            # TODO: the init rotation might be used in gravity compensation
            'init_rot': self.gt_rot[frame_id: end_frame_id],
            'init_vel': self.gt_vel[frame_id][None, ...],
        }

    def get_init_value(self):
        return {'pos': self.gt_pos[:1],
                'rot': self.gt_rot[:1],
                'vel': self.gt_vel[:1],
                'input': self.input[:1],
                'gps': self.gps}
def imu_collate(data):
    gps = torch.stack([d['gps'] for d in data])
    input = torch.stack([d['input'] for d in data])
    acc = torch.stack([d['acc'] for d in data])
    gyro = torch.stack([d['gyro'] for d in data])

    gt_pos = torch.stack([d['gt_pos'] for d in data])
    gt_rot = torch.stack([d['gt_rot'] for d in data])
    gt_vel = torch.stack([d['gt_vel'] for d in data])

    init_pos = torch.stack([d['init_pos'] for d in data])
    init_rot = torch.stack([d['init_rot'] for d in data])
    init_vel = torch.stack([d['init_vel'] for d in data])

    dt = torch.stack([d['dt'] for d in data]).unsqueeze(-1)

    return {
        'dt': dt,
        'gps': gps,
        'input': input,
        'acc': acc,
        'gyro': gyro,

        'gt_pos': gt_pos,
        'gt_vel': gt_vel,
        'gt_rot': gt_rot,

        'init_pos': init_pos,
        'init_vel': init_vel,
        'init_rot': init_rot,
    }
def move_to(obj, device):
    if torch.is_tensor(obj):
        return obj.to(device)
    elif isinstance(obj, dict):
        res = {}
        for k, v in obj.items():
            res[k] = move_to(v, device)
        return res
    elif isinstance(obj, list):
        res = []
        for v in obj:
            res.append(move_to(v, device))
        return res
    else:
        raise TypeError("Invalid type for move_to", obj)
def plot_gaussian(ax, means, covs, color=None, sigma=3):
    ''' Set specific color to show edges, otherwise same with facecolor.'''
    ellipses = []
    for i in range(len(means)):
        eigvals, eigvecs = np.linalg.eig(covs[i])
        axis = np.sqrt(eigvals) * sigma
        slope = eigvecs[1][0] / eigvecs[1][1]
        angle = 180.0 * np.arctan(slope) / np.pi
        ellipses.append(Ellipse(means[i, 0:2], axis[0], axis[1], angle=angle))
    ax.add_collection(PatchCollection(ellipses, edgecolors=color, linewidth=1))
class Car(pp.module.NLS):

    def __init__(self):
        super().__init__()

    def state_transition(self, state, input, t=None):
        '''
        Don't add noise in this function, as it will be used for automatically
        linearizing the system by the parent class ``pp.module.NLS``.
        https://speakerdeck.com/motokimura/shi-jian-karumanhuiruta
        '''
        r = input[0] /  input[1]
        dtheta = input[1]*t
        x = state[..., 0] - r * state[..., 2].sin() + r * (state[..., 2] + dtheta).sin()
        y = state[..., 1] + r * state[..., 2].cos() - r * (state[..., 2] + dtheta).cos()
        theta = state[..., 2] + dtheta

        # theta = state[..., 2] + input[1]
        # x = state[..., 0] + input[..., 0] * theta.cos()
        # y = state[..., 1] + input[..., 0] * theta.sin()
        return torch.stack([x, y, theta], dim=-1)

    def observation(self, state, input, t=None):
        '''
        Don't add noise in this function, as it will be used for automatically
        linearizing the system by the parent class ``pp.module.NLS``.
        '''
        return  state
        # r = input[0] / input[1]
        # dtheta = input[1] * t
        # x = state[..., 0] - r * state[..., 2].sin() + r * (state[..., 2] + dtheta).sin()
        # y = state[..., 1] + r * state[..., 2].cos() - r * (state[..., 2] + dtheta).cos()
        # theta = state[..., 2] + dtheta
        #
        # # theta = state[..., 2] + input[1]
        # # x = state[..., 0] + input[..., 0] * theta.cos()
        # # y = state[..., 1] + input[..., 0] * theta.sin()
        # return torch.stack([x, y, theta], dim=-1)
parser = argparse.ArgumentParser(description='IMU Preintegration')
parser.add_argument("--device",
                    type=str,
                    default='cpu',
                    help="cuda or cpu")
parser.add_argument("--batch-size",
                    type=int,
                    default=1,
                    help="batch size, only support 1 now") #why?
parser.add_argument("--step-size",
                    type=int,
                    default=1,
                    help="the size of the integration for one interval")
parser.add_argument("--save",
                    type=str,
                    default='dataset/save/',
                    help="location of png files to save")
parser.add_argument("--dataroot",
                    type=str,
                    default='dataset/',
                    help="dataset location downloaded")
parser.add_argument("--dataname",
                    type=str,
                    default='2011_09_26',
                    help="dataset name")
parser.add_argument("--datadrive",
                    nargs='+',
                    type=str,
                    default=["0001", "0002", "0005", "0009", "0011",
                             "0013", "0014", "0015", "0017", "0018",
                             "0019", "0020", "0022"],
                    help="data sequences")
parser.add_argument('--plot3d',
                    dest='plot3d',
                    action='store_true',
                    help="plot in 3D space, default: False")
parser.set_defaults(plot3d=False)
args, unknown = parser.parse_known_args()
print(args)
os.makedirs(os.path.join(args.save), exist_ok=True)
torch.set_default_tensor_type(torch.DoubleTensor)

In [None]:
#EKF RESULT
N=3
for drive in args.datadrive:

    # Step 1: Define dataloader using the ``KITTI_IMU`` class we defined above
    dataset = KITTI_IMU(args.dataroot,
                        args.dataname,
                        drive,
                        duration=args.step_size,
                        step_size=args.step_size,
                        mode='evaluate')
    loader = Data.DataLoader(dataset=dataset,
                             batch_size=args.batch_size,
                             collate_fn=imu_collate,
                             shuffle=False)
    # Step 2: Get the initial position, rotation and velocity, all 0 here
    init = dataset.get_init_value()
    poses, poses_gt = [init['pos']], [init['pos']]
    T = len(loader)
    q, r, p = 0.2, 0.2, 5
    P = torch.eye(N, device=args.device).repeat( len(loader), 1, 1) * p ** 2  # estimation covariance
    Q = torch.eye(N, device=args.device) * q ** 2  # covariance of transition
    R = torch.eye(N, device=args.device) * r ** 2  # covariance of observation
    est = torch.randn(T, N, device=args.device) * p
    obs = torch.zeros(T, N, device=args.device)
    state_ = torch.zeros(T, N, device=args.device)  # true states
    car = Car()
    filter = EKF(car, Q, R).to(args.device)
    poses, poses_gt = [init['pos']], [init['pos']]
    est[0] = init['pos'].reshape(-1,N)
    for i, data in enumerate(loader):
        if i == T-1:
          break
        data = move_to(data, args.device)
        dt = data['dt']
        states = data['gt_pos'].reshape(-1,N)
        input = data['input'].reshape(-1)
        poses_gt.append(data['gt_pos'][..., -1, :].cpu())
        state_[i + 1], obs[i] = car(states , input)  # model measurement
        est[i + 1], P[i + 1] = filter(est[i], obs[i] , input, P[i],t = dt)
    poses_gt = torch.cat(poses_gt).numpy()
    state_ = state_.numpy()
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(est[:,0],est[:,1])
    ax.plot(poses_gt[:,0],poses_gt[:,1])
    plt.title("PyPose EKF")
    plt.legend(["PyPose", "Ground Truth"])
    ax.set_xlabel('X [m]')
    ax.set_ylabel('Y [m]')
    ax.grid();
    plt.show()

In [None]:
#EKF RESULT
N=3
for drive in args.datadrive:

    # Step 1: Define dataloader using the ``KITTI_IMU`` class we defined above
    dataset = KITTI_IMU(args.dataroot,
                        args.dataname,
                        drive,
                        duration=args.step_size,
                        step_size=args.step_size,
                        mode='evaluate')
    loader = Data.DataLoader(dataset=dataset,
                             batch_size=args.batch_size,
                             collate_fn=imu_collate,
                             shuffle=False)
    # Step 2: Get the initial position, rotation and velocity, all 0 here
    init = dataset.get_init_value()
    poses, poses_gt = [init['pos']], [init['pos']]
    T = len(loader)
    q, r, p = 0.2, 0.2, 5
    P = torch.eye(N, device=args.device).repeat( len(loader), 1, 1) * p ** 2  # estimation covariance
    Q = torch.eye(N, device=args.device) * q ** 2  # covariance of transition
    R = torch.eye(N, device=args.device) * r ** 2  # covariance of observation
    est = torch.randn(T, N, device=args.device) * p
    obs = torch.zeros(T, N, device=args.device)
    state_ = torch.zeros(T, N, device=args.device)  # true states
    car = Car()
    filter = UKF(car, Q, R).to(args.device)
    poses, poses_gt = [init['pos']], [init['pos']]
    est[0] = init['pos'].reshape(-1,N)
    for i, data in enumerate(loader):
        if i == T-1:
          break
        data = move_to(data, args.device)
        dt = data['dt']
        states = data['gt_pos'].reshape(-1,N)
        input = data['input'].reshape(-1)
        poses_gt.append(data['gt_pos'][..., -1, :].cpu())
        state_[i + 1], obs[i] = car(states , input)  # model measurement
        est[i + 1], P[i + 1] = filter(est[i], obs[i] , input, P[i],t = dt)
    poses_gt = torch.cat(poses_gt).numpy()
    state_ = state_.numpy()
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(est[:,0],est[:,1])
    ax.plot(poses_gt[:,0],poses_gt[:,1])
    plt.title("PyPose UKF")
    plt.legend(["PyPose", "Ground Truth"])
    ax.set_xlabel('X [m]')
    ax.set_ylabel('Y [m]')
    ax.grid();
    plt.show()

In [None]:
#EKF RESULT
N=3
for drive in args.datadrive:

    # Step 1: Define dataloader using the ``KITTI_IMU`` class we defined above
    dataset = KITTI_IMU(args.dataroot,
                        args.dataname,
                        drive,
                        duration=args.step_size,
                        step_size=args.step_size,
                        mode='evaluate')
    loader = Data.DataLoader(dataset=dataset,
                             batch_size=args.batch_size,
                             collate_fn=imu_collate,
                             shuffle=False)
    # Step 2: Get the initial position, rotation and velocity, all 0 here
    init = dataset.get_init_value()
    poses, poses_gt = [init['pos']], [init['pos']]
    T = len(loader)
    q, r, p = 0.2, 0.2, 5
    P = torch.eye(N, device=args.device).repeat( len(loader), 1, 1) * p ** 2  # estimation covariance
    Q = torch.eye(N, device=args.device) * q ** 2  # covariance of transition
    R = torch.eye(N, device=args.device) * r ** 2  # covariance of observation
    est = torch.randn(T, N, device=args.device) * p
    obs = torch.zeros(T, N, device=args.device)
    state_ = torch.zeros(T, N, device=args.device)  # true states
    car = Car()
    filter = PF(car, Q, R).to(args.device)
    poses, poses_gt = [init['pos']], [init['pos']]
    est[0] = init['pos'].reshape(-1,N)
    for i, data in enumerate(loader):
        if i == T-1:
          break
        data = move_to(data, args.device)
        dt = data['dt']
        states = data['gt_pos'].reshape(-1,N)
        input = data['input'].reshape(-1)
        poses_gt.append(data['gt_pos'][..., -1, :].cpu())
        state_[i + 1], obs[i] = car(states , input)  # model measurement
        est[i + 1], P[i + 1] = filter(est[i], obs[i] , input, P[i],t = dt)
    poses_gt = torch.cat(poses_gt).numpy()
    state_ = state_.numpy()
    fig, ax = plt.subplots(1, 1, figsize=(8, 6))
    ax.plot(est[:,0],est[:,1])
    ax.plot(poses_gt[:,0],poses_gt[:,1])
    plt.title("PyPose PF")
    plt.legend(["PyPose", "Ground Truth"])
    ax.set_xlabel('X [m]')
    ax.set_ylabel('Y [m]')
    ax.grid();
    plt.show()