In [1]:
import sys
sys.executable

'/ssd/anaconda-envs/gtsam/bin/python'

In [2]:
import numpy as np
import pandas as pd
import gtsam
import plotly.express as px
import plotly.graph_objects as go
from tqdm import tqdm

In [3]:
# choose dataset from 1 to 9
data_num = 1
# number robots is fixed in every dataset
n_robots = 5
# set sample time to process data
sample_time = 0.02
# set visualization timesteps skip
timesteps_per_frame = 100
# draw robots measurements flag
draw_measurements = True
# if measurements ratio is set to high, animation can take a very long time to show
measurements_ratio = 0.05

In [4]:
def textread(filepath):
    """
    read from MRCLAM data in which every line
    contains equal number of values seperated by
    some whitespaces.
    """
    return np.array(pd.read_csv(filepath, 
                       sep = "\s+|\t+|\s+\t+|\t+\s+",
                       header=None,
                       comment='#',
                       engine='python'))

root = F'../MRCLAM_Dataset{data_num}/'
print('Parsing Dataset...')

print('\nReading barcode numbers...')
barcodes = textread(root + 'Barcodes.dat')
barcodes = np.array(barcodes)
print('Barcodes: (Subject, Barcode)')

print('\nReading landmark groundtruth...')
landmark_groundtruth = textread(root + 'Landmark_Groundtruth.dat')
n_landmarks = landmark_groundtruth.shape[0]
print(F'Landmark Groundtruth({landmark_groundtruth.shape}): (Subject, x [m], y [m], x std-dev [m], y std-dev [m])')


robots_data = {}
print('\nReading all robots data...')
print('robot groundtruth data((N, 4)): (Time [s], x [m], y [m], orientation [rad])')
print('robot odometry data((N, 3)): (Time [s], forward velocity [m/s], angular velocity [rad/s])')
print('robot measurement data((N, 3)): (Time [s], Subject, range [m], bearing [rad])\n')
for i in range(1, n_robots + 1):
    print(F'Reading robot{i} groundtruth...')
    robots_data[F'robot{i}_groundtruth'] = textread(root + F'Robot{i}_Groundtruth.dat')
    print(F'Reading robot{i} odometry...')
    robots_data[F'robot{i}_odometry'] = textread(root + F'Robot{i}_Odometry.dat')
    print(F'Reading robot{i} measurements...')
    robots_data[F'robot{i}_measurements'] = textread(root + F'Robot{i}_Measurement.dat')
    print('\n')

print('Parsing Complete\n')

# get the start and end time for the whole dataset
min_time, max_time = float('inf'), float('-inf')
for i in range(1, n_robots + 1):
    min_time = min(min_time, robots_data[F'robot{i}_groundtruth'][0, 0])
    max_time = max(max_time, robots_data[F'robot{i}_groundtruth'][-1, 0])
for data in robots_data.values():
    data[:, 0] -= min_time
total_time = max_time - min_time
timesteps = int(total_time // sample_time + 1)
print(F'start time: {min_time} [s]\nend time: {max_time} [s]\ntotal time: {total_time} [s]')
print(F'sampling time is {sample_time} [s] ({1 / sample_time} [Hz])')
print(F'Discretize to {timesteps} timesteps')

for data_name in robots_data:
    print(F'Processing {data_name}')
    data = robots_data[data_name]
    if 'groundtruth' in data_name or 'odometry' in data_name:
        # linear interpolation for groundtruth and odometry data
        rows, cols = data.shape
        sampling_data = np.zeros((timesteps, cols))
        k, i = 0, 0
        for k in range(timesteps):
            t = k * sample_time
            sampling_data[k, 0] = t
            while i < rows - 1 and data[i, 0] <= t:
                i += 1
            if i == 0 or i == rows - 1:
                if 'groundtruth' in data_name:
                    sampling_data[k, 1:] = data[i, 1:]
            else:
                p = (t - data[i - 1, 0]) / (data[i, 0] - data[i - 1, 0])
                for c in range(1, cols):
                    sampling_data[k, c] = p * (data[i, c] - data[i - 1, c]) + data[i - 1, c]
        robots_data[data_name] = sampling_data
    else:
        # for measurements, just round the measurement time
        # to nearest sampling time, and keep other data untouched
        data[:, 0] = np.round(data[:, 0] / sample_time) * sample_time
print("Discretize Complete\n")

max_num_measurements = 1
for i in range(1, n_robots + 1):
    measurements_time = robots_data[F"robot{i}_measurements"][:, 0]
    count = 1
    for j in range(1, len(measurements_time)):
        if measurements_time[j] == measurements_time[j - 1]:
            count += 1
        else:
            max_num_measurements = max(max_num_measurements, count)
            count = 1
print(F"maxmium number of measurements per timestep is {max_num_measurements}")

Parsing Dataset...

Reading barcode numbers...
Barcodes: (Subject, Barcode)

Reading landmark groundtruth...
Landmark Groundtruth((15, 5)): (Subject, x [m], y [m], x std-dev [m], y std-dev [m])

Reading all robots data...
robot groundtruth data((N, 4)): (Time [s], x [m], y [m], orientation [rad])
robot odometry data((N, 3)): (Time [s], forward velocity [m/s], angular velocity [rad/s])
robot measurement data((N, 3)): (Time [s], Subject, range [m], bearing [rad])

Reading robot1 groundtruth...
Reading robot1 odometry...
Reading robot1 measurements...


Reading robot2 groundtruth...
Reading robot2 odometry...
Reading robot2 measurements...


Reading robot3 groundtruth...
Reading robot3 odometry...
Reading robot3 measurements...


Reading robot4 groundtruth...
Reading robot4 odometry...
Reading robot4 measurements...


Reading robot5 groundtruth...
Reading robot5 odometry...
Reading robot5 measurements...


Parsing Complete

start time: 1248272263.325 [s]
end time: 1248273763.319 [s]
total

In [5]:
groundtruth_data = {}
groundtruth_data["landmark_groundtruth"] = landmark_groundtruth
for i in range(1, n_robots + 1):
    groundtruth_data[F"robot{i}_groundtruth"] = robots_data[F"robot{i}_groundtruth"]

In [6]:
class simulation:
    def __init__(self, groundtruth_data, seed=0):
        """
        including:
            - landmark_groundtruth
                2D numpy array, every row contains (Subject, x [m], y [m], x std-dev [m], y std-dev [m])
            - robot1_groundtruth, ..., robot5_groundtruth
                2D numpy array, every row contains (Time [s], x [m], y [m], orientation [rad])
        data: dictionary containing groundtruth information of MRCLAM dataset, here we use self-generated
        odometry and measurements in order to have better control of the noise
        """
        self.data = groundtruth_data
        self.steps = self.data["robot1_groundtruth"].shape[0]
        self.seed = seed
        self.random_state = np.random.RandomState(self.seed)
        
        self.max_range = 4.0
        self.max_bearing = np.pi / 3.0
        
#         self.sigma_gps_x = 0
#         self.sigma_gps_y = 0
#         self.sigma_gps_theta = 0
        
#         self.sigma_odom_x = 0
#         self.sigma_odom_y = 0
#         self.sigma_odom_theta = 0
        
#         self.sigma_bearing = 0
#         self.sigma_range = 0
        
        self.sigma_gps_x = 0.1
        self.sigma_gps_y = 0.1
        self.sigma_gps_theta = np.pi / 18
        
        self.sigma_odom_x = 0.01
        self.sigma_odom_y = 0.01
        self.sigma_odom_theta = np.pi / 180
        
        self.sigma_bearing = np.pi / 180
        self.sigma_range = 0.01
    
    def step(self):
        """
        return:
            dict of (robot numbering -> (current_pose, odometry, measurements))
                current_pose: pose used to simulate GPS
                odometry: odometry between two poses (initial pose is returned for the first step)
                observations: dict of (numbering (robot or landmark) -> (bearing, range))
        """
        for i in range(self.steps):
            info = {}
            for j in range(1, n_robots + 1):
                groundtruth = self.data[F"robot{j}_groundtruth"]
                current_pose = gtsam.Pose2(groundtruth[i, 1:4])
                
                dx = self.random_state.normal(0.0, self.sigma_gps_x)
                dy = self.random_state.normal(0.0, self.sigma_gps_y)
                dt = self.random_state.normal(0.0, self.sigma_gps_theta)
                gps_pose = current_pose.compose(gtsam.Pose2(dx, dy, dt))
                
                if i == 0:
                    odom = current_pose
                else:
                    prev_pose = gtsam.Pose2(groundtruth[i - 1, 1:4])
                    odom = prev_pose.between(current_pose)
                dx = self.random_state.normal(0.0, self.sigma_odom_x)
                dy = self.random_state.normal(0.0, self.sigma_odom_y)
                dt = self.random_state.normal(0.0, self.sigma_odom_theta)
                odom = odom.compose(gtsam.Pose2(dx, dy, dt))
                
                obs = {}
                for k in range(1, 1 + n_robots + n_landmarks):
                    if k == j:
                        continue
                    elif k < 1 + n_robots:
                        point = gtsam.Point2(self.data[F"robot{k}_groundtruth"][i, 1:3])
                    else:
                        point = gtsam.Point2(self.data["landmark_groundtruth"][k - n_robots - 1, 1:3])
                    bearing = current_pose.bearing(point).theta()
                    distance = current_pose.range(point)
                    if 0 < distance < self.max_range and abs(bearing) < self.max_bearing:
                        bearing += self.random_state.normal(0.0, self.sigma_bearing)
                        distance += self.random_state.normal(0.0, self.sigma_range)
                        obs[k] = bearing, distance
                
                info[j] = (gps_pose, odom, obs)
            yield info

In [7]:
def X(i, t):
    """
    robot pose symbol
    """
    assert 1 <= i <= n_robots, "robot numbering is out of range"
    name = ['_', 'A', 'B', 'C', 'D', 'E']
    return gtsam.symbol(name[i], t)

def x(i, t):
    """
    robot location symbol
    """
    assert 1 <= i <= n_robots, "robot numbering is out of range"
    name = ['_', 'a', 'b', 'c', 'd', 'e']
    return gtsam.symbol(name[i], t)

def L(j):
    assert n_robots + 1 <= j <= n_landmarks + n_robots + 1, "landmark numbering is out of range"
    return gtsam.symbol('L', j)

def localization(sim, num_timesteps=300, gps_skip=50):
    isam = gtsam.ISAM2()
    new_factors = gtsam.NonlinearFactorGraph()
    initial_estimate = gtsam.Values()
    observed = set()
    with tqdm(total=num_timesteps) as pbar:
        for t, info in enumerate(sim.step()):
            if t > num_timesteps:
                break
            
            for k in info:
                gps_pose, odom, _ = info[k]
                if t % gps_skip == 0:
                    gps_noise = gtsam.noiseModel.Diagonal.Sigmas(np.array(
                        [sim.sigma_gps_x, sim.sigma_gps_y, sim.sigma_gps_theta]))
                    gps_factor = gtsam.PriorFactorPose2(X(k, t), gps_pose, gps_noise)
                    new_factors.add(gps_factor)
                        
                odom_noise = gtsam.noiseModel.Diagonal.Sigmas(np.array(
                    [sim.sigma_odom_x, sim.sigma_odom_y, sim.sigma_odom_theta]))
                if t == 0:
                    odom_factor = gtsam.PriorFactorPose2(X(k, t), odom, odom_noise)
                    new_factors.add(odom_factor)
                    initial_estimate.insert(X(k, t), odom)
                else:
                    odom_factor = gtsam.BetweenFactorPose2(X(k, t - 1), X(k, t), odom, odom_noise)
                    new_factors.add(odom_factor)
                    estimated_pose = isam.calculateEstimatePose2(X(k, t - 1))
                    initial_estimate.insert(X(k, t), estimated_pose.compose(odom))
                
                isam.update(new_factors, initial_estimate)
                new_factors.resize(0)
                initial_estimate.clear()

            for k in info:
                _, _, obs = info[k]
                for j, (bearing, distance) in obs.items():
                    br_noise = gtsam.noiseModel.Diagonal.Sigmas(np.array(
                        [sim.sigma_bearing, sim.sigma_range]))
                    if j < 1 + n_robots:
                        perfect_noise = gtsam.noiseModel.Diagonal.Sigmas(np.array([0.0, 0.0]))
                        self_factor = gtsam.BearingRangeFactor2D(
                            X(j, t), x(j, t), gtsam.Rot2(0), 0, perfect_noise)
                        br_factor = gtsam.BearingRangeFactor2D(
                            X(k, t), x(j, t), gtsam.Rot2(bearing), distance, br_noise)
                        new_factors.add(self_factor)
                        new_factors.add(br_factor)
                        if (j, t) not in observed:
                            pose = isam.calculateEstimatePose2(X(j, t))
                            point = gtsam.Point2(np.array([pose.x(), pose.y()]))
                            initial_estimate.insert(x(j, t), point)
                            observed.add((j, t))
                    else:
                        br_factor = gtsam.BearingRangeFactor2D(
                            X(k, t), L(j), gtsam.Rot2(bearing), distance, br_noise)
                        new_factors.add(br_factor)
                        if j not in observed:
                            initial_estimate.insert(L(j), 
                                gtsam.Point2(sim.data["landmark_groundtruth"][j - 1 - n_robots, 1:3]))
                            observed.add(j)
                            
                    isam.update(new_factors, initial_estimate)
                    new_factors.resize(0)
                    initial_estimate.clear()
            
            pbar.update(1)
            
    estimated_trajectory = {}
    for i in range(1, n_robots + 1):
        trajectory = np.zeros((sim.steps, 4))
        for t in range(sim.steps):
            if t > num_timesteps:
                break
            pose = isam.calculateEstimatePose2(X(i, t))
            trajectory[t] = [t * sample_time, pose.x(), pose.y(), pose.theta()]
        estimated_trajectory[F"robot{i}_estimation"] = trajectory
    return estimated_trajectory

In [8]:
import time
sim = simulation(groundtruth_data, seed=int(time.time()))

In [None]:
num_timesteps = 4500
estimated_trajectory = localization(sim, num_timesteps, gps_skip=50)

 14%|█▍        | 652/4500 [01:08<27:11,  2.36it/s]

In [None]:
# make an empty figure
fig_dict = {
    "data": [],
    "layout": {},
    "frames": []
}

# fill in most of layout
fig_dict["layout"]["xaxis"] = {"range": [-2, 6], "title": "x [m]"}
fig_dict["layout"]["yaxis"] = {"range": [-6, 7], "title": "y [m]"}
fig_dict["layout"]["hovermode"] = "closest"
fig_dict["layout"]["updatemenus"] = [
    {
        "buttons": [
            {
                "args": [
                         None,
                         {
                             "frame": {"duration": 50, "redraw": False},
                             "fromcurrent": True,
                             "transition": {"duration": 0, "easing": "quadratic-in-out"}
                         }
                        ],
                "label": "Play",
                "method": "animate"
            },
            {
                "args": [
                         [None],
                         {
                             "frame": {"duration": 0, "redraw": False},
                             "mode": "immediate",
                             "transition": {"duration": 0}
                         }
                        ],
                "label": "Pause",
                "method": "animate"
            }
        ],
        "direction": "left",
        "pad": {"r": 10, "t": 87},
        "showactive": False,
        "type": "buttons",
        "x": 0.1,
        "xanchor": "right",
        "y": 0,
        "yanchor": "top"
    }
]

sliders_dict = {
    "active": 0,
    "yanchor": "top",
    "xanchor": "left",
    "currentvalue": {
        "font": {"size": 20},
        "prefix": F"Time(1 unit for {timesteps_per_frame * sample_time} second):",
        "visible": True,
        "xanchor": "right"
    },
    "transition": {"duration": 300, "easing": "cubic-in-out"},
    "pad": {"b": 10, "t": 50},
    "len": 0.9,
    "x": 0.1,
    "y": 0,
    "steps": []
}

In [None]:
colors = px.colors.qualitative.Plotly

def robot_pose(i, k):
    poses = robots_data[F'robot{i}_groundtruth']
    x, y, theta = poses[k, 1:]
    location = {
        "x": [x],
        "y": [y],
        "mode": "markers",
        "marker": {"size": 13, "color": colors[i], "line": {"color": "black", "width": 2}},
        "name": F"robot{i} groundtruth"
    }
    orientation_length = 0.3
    orientation = {
        "x": [x, x + orientation_length * np.cos(theta)],
        "y": [y, y + orientation_length * np.sin(theta)],
        "mode": "lines",
        "line": {"color": "black", "width": 2.5},
        "showlegend": False
    }
    return [location, orientation] 

def robot_estimated_pose(i, k):
    poses = estimated_trajectory[F'robot{i}_estimation']
    x, y, theta = poses[k, 1:]
    location = {
        "x": [x],
        "y": [y],
        "mode": "markers",
        "opacity": 0.5,
        "marker": {"size": 13, "color": colors[i], "line": {"color": "black", "width": 2}},
        "name": F"robot{i} estimation"
    }
    orientation_length = 0.3
    orientation = {
        "x": [x, x + orientation_length * np.cos(theta)],
        "y": [y, y + orientation_length * np.sin(theta)],
        "mode": "lines",
        "opacity": 0.5,
        "line": {"color": "black", "width": 2.5},
        "showlegend": False
    }
    return [location, orientation] 

def robot_measurement(i, k):
    measurements = robots_data[F'robot{i}_measurements']
    poses = robots_data[F'robot{i}_groundtruth']
    x, y, theta = poses[k, 1:4]
    xs, ys = [], []
    for offset in range(int(timesteps_per_frame * measurements_ratio)):
        measurment_x, measurment_y = [], []
        time = (k - offset) * sample_time
        measurements_time = measurements[:, 0]
        index = np.searchsorted(measurements_time, time)
        while index < len(measurements_time) and measurements[index, 0] == time:
            distance, bearing = measurements[index, 2:4]
            measurment_x.extend([x, x + distance * np.cos(bearing + theta), None])
            measurment_y.extend([y, y + distance * np.sin(bearing + theta), None])
            index += 1
        remain = max_num_measurements * 3 - len(measurment_x)
        if remain > 0:
            measurment_x.extend([None for _ in range(remain)])
            measurment_y.extend([None for _ in range(remain)])
        xs.extend(measurment_x)
        ys.extend(measurment_y)
    return {
        "x": xs,
        "y": ys,
        "mode": "lines",
        "line": {"color": colors[i], "width": 1.5},
        "showlegend": False
    }

# robots initial location
for i in range(1, n_robots + 1):
    fig_dict["data"].extend(robot_pose(i, 0))
    fig_dict["data"].extend(robot_estimated_pose(i, 0))
    if draw_measurements:
        fig_dict["data"].append(robot_measurement(i, 0))

landmarks = dict(
    mode='markers',
    x=landmark_groundtruth[:, 1],
    y=landmark_groundtruth[:, 2],
    marker=dict(
        color='LightSkyBlue',
        size=8,
        line=dict(
            color='MediumPurple',
            width=2
        )
    ),
    name='Landmarks'
)

fig_dict["data"].append(landmarks)

timesteps_per_frame = 10
num_frames = num_timesteps // timesteps_per_frame
for k in range(num_frames):
    time = k * timesteps_per_frame * sample_time
    frame = {"data": [], "name": str(k)}
    for i in range(1, n_robots + 1):
        frame["data"].extend(robot_pose(i, k * timesteps_per_frame))
        frame["data"].extend(robot_estimated_pose(i, k * timesteps_per_frame))
        if draw_measurements:
            frame["data"].append(robot_measurement(i, k * timesteps_per_frame))
    frame["data"].append(landmarks)
        
    fig_dict["frames"].append(frame)
    slider_step = {
        "args": [
            [k],
            {
                "frame": {"duration": 20, "redraw": False},
                "mode": "immediate",
                "transition": {"duration": 0}
            }
        ],
        "label": k,
        "method": "animate"
    }
    sliders_dict["steps"].append(slider_step)
fig_dict["layout"]["sliders"] = [sliders_dict]

fig = go.Figure(fig_dict)
fig.update_layout(
    width = 700,
    height = 700,
    title = "Multi Robots Localization Visulization"
)
fig.update_yaxes(
    scaleanchor = "x",
    scaleratio = 1,
)
fig.show()

In [None]:
a = np.zeros([5, 3])
a[1] = 1, 2, 3
print(a)

In [None]:
import time
time.time()