In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# import 
import os
import numpy as np
import pandas as pd
import seaborn as sns
from fitter import Fitter, get_common_distributions, get_distributions
import matplotlib.pyplot as plt
from uncertainty_motion_prediction.dataloader import Dataloader
from uncertainty_motion_prediction.evaluators import DistanceErrorEvaluator
from uncertainty_motion_prediction.predictor import (
    HMMMultinomialFirstOrder, HMMLatentSegmentsExtractor, KMeansOutcome, HMMLatentSegmentsPredictor
)
from uncertainty_motion_prediction.predictor import HMMContinuousEmissionsPredictor
from hmmlearn import hmm

from uncertainty_motion_prediction.visualizer import SamplingVisualizer, visualize_covariance_to_axis


SyntaxError: invalid syntax (hmm_multinomial.py, line 191)

In [None]:
#from uncertainty_motion_prediction.predictor.hmm_continuous_emissions import normalise_segment_batch
from uncertainty_motion_prediction.predictor import normalise_segment_batch


# Load the Dataset and Overview of the Data

Load and Data and conduct some brief analysis on the pattern of the normalised segment.

In [None]:
#Switch the dataset
'''
0: eth-univ
1: eth-hotel
2: ucy-zara1
3: ucy-zara2
4: ucy-univ3
'''
dataset_no = 0

In [None]:
dataset=Dataloader();
traj_dataset = dataset.load(dataset_no)

In [None]:
segment_length = 4
num_segment = 4# Hidden Markov Model with Gaussian emissions (to model trajectory observations)

#Start by instantiating a HMM with continuous Gaussian emission distributions.
trajlets_for_hmm_learning = traj_dataset.get_trajlets(
    length =num_segment * segment_length*0.4, overlap=4, to_numpy=True
)
print(trajlets_for_hmm_learning.shape)

In [None]:
trajlets_shape = trajlets_for_hmm_learning.shape
normalised = normalise_segment_batch(trajlets_for_hmm_learning[:, :, 0:4], segment_length)
sequence_lengths = np.array([num_segment for i in range(trajlets_shape[0])])



In [None]:
#take a look at the data
df = pd.DataFrame(normalised)
df.describe()

In [None]:
sns.set_style('white')
sns.set_context("paper", font_scale = 2)
sns.displot(data=df, x=3, kind="hist", bins = 100, aspect = 1.5)


In [None]:
# Pair-wise Scatter Plots
pp = sns.pairplot(df, size=1.8, aspect=1.8,
                  plot_kws=dict(edgecolor="k", linewidth=0.5),
                  diag_kind="kde", diag_kws=dict(shade=True))

fig = pp.fig 
fig.subplots_adjust(top=0.93, wspace=0.3)
t = fig.suptitle('Wine Attributes Pairwise Plots', fontsize=14)


# Hidden Markov Model with Gaussian emissions (to model trajectory observations)

Start by instantiating a HMM with continuous Gaussian emission distributions.

In [None]:
if dataset_no == 0:
    # ETC Uni
    n_components = 10
    covariance_type = "full"
    n_iter = 50
elif dataset_no == 1:
    # ETC Hotel
    n_components = 4
    covariance_type = "full"
    n_iter = 50
elif dataset_no == 2:
    # ETC Hotel
    n_components = 4
    covariance_type = "full"
    n_iter = 50
elif dataset_no == 3:
    # ETC Hotel
    n_components = 4
    covariance_type = "full"
    n_iter = 50
elif dataset_no == 4:
    # ucy-univ3
    n_components = 10
    covariance_type = "full"
    n_iter = 80
elif dataset_no == 5:
    # ucy-univ3
    n_components = 4
    covariance_type = "full"
    n_iter = 80


model = hmm.GaussianHMM(n_components=n_components, covariance_type=covariance_type, 
                        init_params='stmc', n_iter=n_iter, verbose=True)

Preprocess the trajectories by normalising them (without the adjustment on Rotation). Then we estimate the model parameters (transition matrix and Gaussian emission distribution parameters).

In [None]:
var_param = 1
covar = np.array ([[1, 0., 0., 0., 0., 0., 0., 0.],
                  [0., 1, 1, 1, 0., 0., 0., 0.],
                   [0., 1, 1, 1, 0., 0., 0., 0.],
                   [0., 1, 1, 1, 0., 0., 0., 0.],
                    [0., 0, 0, 0, 1, 0., 0., 0.],
                    [0., 0, 0, 0, 0, 1, 1, 1],
                    [0., 0, 0, 0, 0, 1, 1, 1],
                    [0., 0, 0, 0, 0, 1, 1, 1]])



covars = var_param* np.tile(covar, (n_components, 1, 1))


In [None]:
#model.covars_ = covars

In [None]:
trajlets_shape = trajlets_for_hmm_learning.shape
normalised = normalise_segment_batch(trajlets_for_hmm_learning[:, :, 0:4], segment_length)
sequence_lengths = np.array([num_segment for i in range(trajlets_shape[0])])

model.fit(normalised[:,np.r_[1:4,5:8]], lengths=sequence_lengths)

In [None]:
model.covars_

In [None]:
import pickle
with open('gaussian_hmm_%s.pkl'%dataset_no, 'wb') as file:
    pickle.dump(model, file)


In [None]:
model.transmat_

## Deploy Trained Model

Set up to test the model.

In [None]:
import pickle

model = None
with open('gaussian_hmm_%s.pkl'%dataset_no, 'rb') as file:
    model = pickle.load(file)


In [None]:
segment_length = 4
num_segment = 4
num_history_segment = 2
num_future_segment = num_segment - num_history_segment
predictor = HMMContinuousEmissionsPredictor(hmm = model,N_future_segment=num_future_segment)

In [None]:
traj_length = num_segment * (segment_length - 1) + 1
history_traj_length = num_history_segment * (segment_length - 1) + 1
future_traj_length = num_future_segment * (segment_length - 1) + 1

trajlets_for_testing = traj_dataset.get_trajlets(
    length=traj_length * 0.4 ,
    overlap=1,
    to_numpy=True
)
print(trajlets_for_testing.shape)


In [None]:
traj = trajlets_for_testing[10]


In [None]:
traj.shape

In [None]:
traj[0:history_traj_length, :].shape[0]

In [None]:
predictor._seg_len

In [None]:
traj = trajlets_for_testing[10]
print(traj[:, 0:2])
predicted = predictor.predict(traj[0:history_traj_length, :])
print(predicted)

Compute the ADE and FDE of the model.

In [None]:
future_traj_length

In [None]:
evaluator = DistanceErrorEvaluator(N_future=future_traj_length, N_history=history_traj_length)
evaluator.evaluate(predictor, trajlets_for_testing)
evaluator.hist()
res_state = evaluator.statistic()

In [None]:
import matplotlib.pyplot as plt

viz = SamplingVisualizer(N_future=4, N_history=history_traj_length, sample_size=2)
fig, axs = plt.subplots(5, 1, figsize=(10, 20))

for ax in axs:
    idx = np.random.randint(0, trajlets_for_testing.shape[0])
    traj = trajlets_for_testing[idx, :, :]
    viz.visualize_to_axis(ax, predictor, traj)
    ax.set_title(f"Index: {idx} ADE: {result['ade'][idx]:.2f} FDE: {result['fde'][idx]:.2f}")
#     ax.set_aspect(1)

In [None]:
history_traj_length