In [None]:
# Enable interactive plot
#@formatter:off
%matplotlib notebook
%matplotlib notebook
%load_ext autoreload
%autoreload 2
#@formatter:on
import numpy as np
from pathlib import Path
import pandas as pd
from matplotlib import pyplot as plt
from matplotlib import patches as ptch
import matplotlib.animation as animation
from matplotlib.font_manager import FontProperties
import os
import sys
import torch
from torch import nn
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.data import DataLoader
from torchsummary import summary
import logging
log = logging.getLogger()
from datetime import datetime
import gc
import cv2
from math import ceil
from scipy.signal import find_peaks, butter, filtfilt
from scipy.stats import linregress
from sklearn import linear_model
from IPython.display import display_html 

from metrics.Maokai import mpbrpe, mpkpe, pck, pmpkpe, bone_lengths, distance_ordinal_histogram
from loss.CompositionalLoss import parents

from datasets.RealSenseOptitrackDataset import RealSenseOptitrackDataset as Dataset
from datasets.RealSenseOptitrackDataset import marker_set

from scipy.stats import pearsonr

# Models

## Linear Regression Model

In [None]:
import models.LinearRegression.LinearRegression_pytorch as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_default
neural_network = NeuralNetwork.LinearRegression(hyper_params['input_size'], hyper_params['output_size'])

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters())

#Path for loading trained model
output_path = "models/LinearRegression/LinearRegression_default/2022-07-28_15-07-01"
model_name = 'model'

## Linear Regression Model without Rotation

In [None]:
import models.LinearRegression.LinearRegression_pytorch as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_no_rot
neural_network = NeuralNetwork.LinearRegression(hyper_params['input_size'], hyper_params['output_size'])

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters())

#Path for loading trained model
output_path = "models/LinearRegression/LinearRegression_no_rot/2022-09-23_16-40-33"
model_name = "model"

## Baseline NN

In [None]:
import models.BaselineNN.BaselineNN as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_default
neural_network = NeuralNetwork.BaselineNN(hyper_params['output_size'])

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = "models/BaselineNN/BaselineNN_default/2022-09-30_12-51-28"
model_name = "model"

## Baseline NN without Rot

In [None]:
import models.BaselineNN.BaselineNN as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_no_rot
neural_network = NeuralNetwork.BaselineNN(hyper_params['output_size'])

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = "models/BaselineNN/BaselineNN_no_rot/2022-09-30_12-39-16"
model_name = "model"

## Own Baseline CNN

In [None]:
import models.OwnBaselineCNN.OwnBaselineCNN as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_default
neural_network = NeuralNetwork.NeuralNetwork(hyper_params['output_size'])

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = "models/OwnBaselineCNN/OwnBaselineCNN_default/2022-09-29_17-51-33"
model_name = "model_epoch_10"

## Own Baseline CNN no Rotation

In [None]:
import models.OwnBaselineCNN.OwnBaselineCNN as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_no_rot
neural_network = NeuralNetwork.NeuralNetwork(hyper_params['output_size'])

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = "models/OwnBaselineCNN/OwnBaselineCNN_no_rot/2022-09-29_17-44-20"
model_name = "model"

## CNNModel LiChan2014 with AvgPool

In [None]:
import models.CNNModel_LiChan2014_AvgPool.CNNModel_LiChan2014_AvgPool as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_default
neural_network = NeuralNetwork.NeuralNetwork(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = 'models/CNNModel_LiChan2014_AvgPool/CNNModel_LiChan2014_AvgPool_default/2022-07-28_17-24-43'
model_name = "model_epoch_25"

## CNNModel LiChan2014 with AvgPool no Rotation

In [None]:
import models.CNNModel_LiChan2014_AvgPool.CNNModel_LiChan2014_AvgPool as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_no_rot
neural_network = NeuralNetwork.NeuralNetwork(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = 'models/CNNModel_LiChan2014_AvgPool/CNNModel_LiChan2014_AvgPool_no_rot/2022-10-06_21-13-50'
model_name = "model"

## CNNModel LiChan2014 with AvgPool no Rotation low LR

In [None]:
import models.CNNModel_LiChan2014_AvgPool.CNNModel_LiChan2014_AvgPool as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_no_rot_low_lr
neural_network = NeuralNetwork.NeuralNetwork(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = 'models/CNNModel_LiChan2014_AvgPool/CNNModel_LiChan2014_AvgPool_no_rot_low_lr/2022-10-10_12-04-08'
model_name = "model"

## CNNModel LiChan2014

In [None]:
import models.CNNModel_LiChan2014.CNNModel_LiChan2014 as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_default
neural_network = NeuralNetwork.NeuralNetwork(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = "models/CNNModel_LiChan2014/CNNModel_LiChan2014_default/2022-07-29_11-20-17"
model_name = "model_epoch_10"

## CNNModel LiChan2014 no Rotation with Shuffle

In [None]:
import models.CNNModel_LiChan2014.CNNModel_LiChan2014 as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_no_rot_shuffle
neural_network = NeuralNetwork.NeuralNetwork(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = "models/CNNModel_LiChan2014/CNNModel_LiChan2014_no_rotation_shuffle/2022-08-02_12-47-35"
model_name = "model"

## ResNet50 SunShangetAl with L1 Loss

In [None]:
import models.ResNet50_SunShangetAl.ResNet50_SunShangetAl as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_l1_loss
neural_network = NeuralNetwork.create_nn(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"], 
                                      momentum=hyper_params["momentum"], weight_decay=hyper_params["weight_decay"])

#Path for loading trained model
output_path = "models/ResNet50_SunShangetAl/ResNet50_SunShangetAl_l1_loss/2022-10-24_16-55-31"
model_name = "model"

## ResNet50 SunShangetAl with L1 Loss no Rotation

In [None]:
import models.ResNet50_SunShangetAl.ResNet50_SunShangetAl as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_l1_loss_no_rot
neural_network = NeuralNetwork.create_nn(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"], )

#Path for loading trained model
output_path = "models/ResNet50_SunShangetAl/ResNet50_SunShangetAl_l1_loss_no_rot/2022-10-06_21-43-33"
model_name = "model"

## ResNet50 SunShangetAl with MSE Loss no Rot

In [None]:
import models.ResNet50_SunShangetAl.ResNet50_SunShangetAl as NeuralNetwork

hyper_params = NeuralNetwork.hyper_params_mse_loss_no_rot
neural_network = NeuralNetwork.create_nn(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params["loss_function"]
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = "models/ResNet50_SunShangetAl/ResNet50_SunShangetAl_mse_loss_no_rot/2022-10-06_21-41-15"
model_name = "model"

## ResNet50 SunShangetAl with Compositional Loss

In [None]:
import models.ResNet50_SunShangetAl.ResNet50_SunShangetAl as NeuralNetwork
import loss.CompositionalLoss as compositional_loss

hyper_params = NeuralNetwork.hyper_params_compositional_loss
neural_network = NeuralNetwork.create_nn(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params['loss_function'].CompositionalLoss()
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"], 
                                      momentum=hyper_params["momentum"], weight_decay=hyper_params["weight_decay"])

#Path for loading trained model
output_path = "models/ResNet50_SunShangetAl/ResNet50_SunShangetAl_compositional_loss/2022-10-21_14-16-52"
model_name = "model_epoch_5"

## ResNet50 SunShangetAl with Compositional Loss and custom LR

In [None]:
import models.ResNet50_SunShangetAl.ResNet50_SunShangetAl as NeuralNetwork
import loss.CompositionalLoss as compositional_loss

hyper_params = NeuralNetwork.hyper_params_compositional_loss_custom_lr
neural_network = NeuralNetwork.create_nn(hyper_params)

# Define the loss function and optimizer
loss_function = hyper_params['loss_function'].CompositionalLoss()
optimizer = hyper_params["optimizer"](neural_network.parameters(), lr=hyper_params["learning_rate"])

#Path for loading trained model
output_path = ""
model_name = "model"

# Model summary

In [None]:
summary(neural_network, (1, 480, 848))

## Create Datasets

In [None]:
drop_rotation = not hyper_params["use_rotation_data"]
normalize = hyper_params["normalize"]

print("\rLoading data... (1/6)", end='')
session_1_data = Dataset(
    csv_file=f'../recordings/2022-04-20/session_1/optitrack/session_1.csv',
    root_dir='../recordings/2022-04-20/session_1/realsense/npy',
    drop_rotation=drop_rotation,
    normalize=normalize
)

print("\rLoading data... (2/6)", end='')
session_2_data = Dataset(
    csv_file=f'../recordings/2022-04-20/session_2/optitrack/session_2.csv',
    root_dir='../recordings/2022-04-20/session_2/realsense/npy',
    drop_rotation=drop_rotation,
    normalize=normalize
)

print("\rLoading data... (3/6)", end='')
session_3_data = Dataset(
    csv_file=f'../recordings/2022-04-20/session_3/optitrack/session_3.csv',
    root_dir='../recordings/2022-04-20/session_3/realsense/npy',
    drop_rotation=drop_rotation,
    normalize=normalize
)

print("\rLoading data... (4/6)", end='')
session_4_data = Dataset(
    csv_file=f'../recordings/2022-04-20/session_4/optitrack/session_4.csv',
    root_dir='../recordings/2022-04-20/session_4/realsense/npy',
    drop_rotation=drop_rotation,
    normalize=normalize
)

print("\rLoading data... (5/6)", end='')
session_5_data = Dataset(
    csv_file=f'../recordings/2022-04-20/session_5/optitrack/session_5.csv',
    root_dir='../recordings/2022-04-20/session_5/realsense/npy',
    drop_rotation=drop_rotation,
    normalize=normalize
)

print("\rLoading data... (6/6)", end='')
session_6_data = Dataset(
    csv_file=f'../recordings/2022-04-20/session_6/optitrack/session_6.csv',
    root_dir='../recordings/2022-04-20/session_6/realsense/npy',
    drop_rotation=drop_rotation,
    normalize=normalize
)

print("\rLoading data... Done.")

In [None]:
BATCH_SIZE = hyper_params["batch_size"]
NUM_WORKERS = 20
print("Composing Datasets... ", end='')

train_data = torch.utils.data.ConcatDataset([session_1_data, session_2_data, session_3_data, session_4_data, session_5_data])
test_data = session_6_data
all_data = torch.utils.data.ConcatDataset([session_1_data, session_2_data, session_3_data, session_4_data, session_5_data, session_6_data])

train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=hyper_params["shuffle"],
                                               num_workers=NUM_WORKERS, pin_memory=True)

test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False,
                                              num_workers=NUM_WORKERS, pin_memory=True)

all_dataloader = torch.utils.data.DataLoader(all_data, batch_size=BATCH_SIZE, shuffle=False,
                                              num_workers=NUM_WORKERS, pin_memory=True)
all_dataloader_no_batch = torch.utils.data.DataLoader(all_data, batch_size=1, shuffle=False,
                                              num_workers=NUM_WORKERS, pin_memory=True)
session_1_dataloader = torch.utils.data.DataLoader(session_1_data, batch_size=BATCH_SIZE, shuffle=False,
                                              num_workers=NUM_WORKERS, pin_memory=True)
print("Done.")

# Training

In [None]:
# clear gpu memory
torch.cuda.empty_cache()

# Move to GPU
neural_network.cuda()

# track statistics
train_loss = []
val_loss = []

# storage and logging stuff
start_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
output_path = f"models/{NeuralNetwork.name}/{hyper_params['name']}/{start_time}"
os.makedirs(output_path)

# logging
if log.hasHandlers():
    log.handlers.clear()
start_time = datetime.now().strftime('%Y_%m_%d_%H.%M.%S')
log.addHandler(logging.FileHandler(f"{output_path}/training.log"))
log.warning(hyper_params)
log.addHandler(logging.StreamHandler(sys.stdout))
log.setLevel('WARN')
log.warning(f"\nStart training of model: {hyper_params['name']}")

# Write loss to file
file = open(f"{output_path}/loss.csv", "w+", buffering=1)
file.write(f",{str(hyper_params['loss_function'])},{str(hyper_params['loss_function'])}\n")
file.write("epoch,train_loss,val_loss\n")

# Learning rate decay
scheduler = MultiStepLR(optimizer, milestones=hyper_params['lr_decay_milestones'], gamma=hyper_params['lr_decay_gamma'])

# Run the training loop
for epoch in range(1, hyper_params["epochs"] + 1):
    
    # Print epoch
    log.warning(f'\nStarting epoch {epoch}/{hyper_params["epochs"]}')
    file.write(f"{epoch},")

    # Set current loss value
    total_train_loss = 0
    total_val_loss = 0

    # Training
    neural_network.train()
    for i, data in enumerate(train_dataloader, 1):

        # Get and prepare inputs
        inputs = data['realsense']
        targets = data['optitrack']
        inputs, targets = inputs.float().cuda(), targets.float().cuda()

        # Zero the gradients
        optimizer.zero_grad()

        # Perform forward pass
        outputs = neural_network(inputs)

        # Compute loss
        loss = loss_function(outputs, targets)
#         print(f'loss: {loss.item()}')

        # Perform backward pass
        loss.backward()

        # Perform optimization
        optimizer.step()

        # Print statistics
        total_train_loss += loss.item()
        if i % 100 == 0:
            log.warning(f'    Train loss after mini-batch {i:5d}: {total_train_loss / i:.3f}')
    total_train_loss = total_train_loss / i
    train_loss.append(total_train_loss)
    file.write(f"{total_train_loss},")

    # Validation
    neural_network.eval()
    for i, data in enumerate(test_dataloader, 1):
            with torch.no_grad():

                # Get and prepare inputs
                inputs = data['realsense']
                targets = data['optitrack']
                inputs, targets = inputs.float().cuda(), targets.float().cuda()

                # Perform forward pass
                outputs = neural_network(inputs)
                
                # Compute loss
                loss = loss_function(outputs, targets)
                
                total_val_loss += loss.item()
    
    total_val_loss = total_val_loss / i
    val_loss.append(total_val_loss)
    file.write(f"{total_val_loss}\n")

    log.warning(f"Epoch: {epoch}/{hyper_params['epochs']}, Train Loss: {total_train_loss:.8f}, Val Loss: {total_val_loss:.8f}")

    # Save intermediate model
    if epoch % 5 == 0:
        torch.save(neural_network.state_dict(), f"{output_path}/model_epoch_{epoch}.pth")
        log.warning(f'Model {output_path}/model_epoch_{epoch}.pth saved!')
    
    #LR decay
    scheduler.step()
    

# Process is complete.
log.warning('Training process has finished.')
file.close()

# Save model
log.warning('Saving model... ')
torch.save(neural_network.state_dict(), f"{output_path}/model.pth")
log.warning(f"Saved: {output_path}/model.pth")

In [None]:
log.warning('Saving model... ')
torch.save(neural_network.state_dict(), f"{output_path}/model.pth")
log.warning(f"Saved: {output_path}/model.pth")

## Load model from disk

In [None]:
print(f"From: {output_path}/{model_name}.pth")
print('Loading model... ', end='')
neural_network.cuda()
neural_network.load_state_dict(torch.load(f"{output_path}/{model_name}.pth"))
print(f"{hyper_params['name']} loaded!")

## Show training and validation loss

In [None]:
def read_loss_csv(output_path: str, train_name="Trainingsfehler", val_name="Validierungsfehler", 
                  val_trend=True, normalize=False):
    loss_df = pd.read_csv(f"{output_path}/loss.csv", skiprows=1, index_col='epoch')
    loss_df.columns = [train_name, val_name]
    loss_df.index.name = 'Epoche'
    if normalize:
        min_df = loss_df.min().min()
        max_df = loss_df.max().max()
        loss_df = (loss_df - min_df) / (max_df - min_df)
#         loss_df= (loss_df - loss_df.mean()) / loss_df.std()
    if val_trend:
        epochs = loss_df.index.size
        regr = linear_model.LinearRegression()
        x = np.arange(1, epochs+1).reshape(epochs,1)
        y = loss_df["Validierungsfehler"].values.reshape(epochs,1)
        regr.fit(x, y)
        predict = regr.predict(x)
        loss_df['Trend des Validierungsfehlers'] = predict
    return loss_df

In [None]:
(read_loss_csv(output_path, normalize=False, val_trend=False) / 63).plot(
    ylabel='Compositional Loss\npro Schlüsselpunkt',
    title='Trainings- und Validierungsfehler\nModell 6d)')


In [None]:
ax = (read_loss_csv("models/CNNModel_LiChan2014_AvgPool/CNNModel_LiChan2014_AvgPool_default/2022-07-28_17-24-43", val_trend=False, 
                       train_name="5a) Trainingsfehler", 
                       val_name="5a) Validierungsfehler") / 126).plot()
ax = (read_loss_csv("models/CNNModel_LiChan2014_AvgPool/CNNModel_LiChan2014_AvgPool_no_rot/2022-10-06_21-13-50", val_trend=False, 
                       train_name="5b) Trainingsfehler", 
                       val_name="5b) Validierungsfehler") / 63).plot(ax=ax, linestyle='dotted')
ax = (read_loss_csv("models/CNNModel_LiChan2014_AvgPool/CNNModel_LiChan2014_AvgPool_no_rot_low_lr/2022-10-10_12-04-08", val_trend=False, 
                          train_name="5c) Trainingsfehler",
                          val_name="5c) Validierungsfehler") / 63).plot(ax=ax, linestyle='dashdot',
                                                                title='Trainings- und Validierungsfehler',
                                                                xlabel='Epochen', 
                                                                ylabel='Mittlere Quadratische Abweichung \npro Schlüsselpunkt')


plt.legend(loc='best')


In [None]:
fig, axes = plt.subplots(nrows=3,ncols=1,figsize=(9,14))
mse_no_rot[0:25].plot(ax = axes[0], title="Mittlere Quadratische Abweichung")
l1_no_rot[0:25].plot(ax = axes[1])
l1_joints.plot(ax = axes[1], title="L1-Norm")
comp_loss.plot(ax = axes[2], title="Compositional Loss")
plt.legend(loc='center right')
axes[0].legend(loc='center right')
fig.tight_layout(pad=2.0)
plt.show()


## Predict on test set

In [None]:
pelvis_prediction = np.load('models/CNN_Pelvis_Location/CNN_Pelvis_Location_default/2022-10-19_12-51-20/prediction.npy')
pelvis_prediction_tiled = np.tile(pelvis_prediction, 21)


In [None]:
prediction = []
neural_network.eval()
total_mpkpe = 0
total_pmpkpe = 0
total_mpbrpe = 0
total_pck = 0
total_bone_lengths = None
total_bone_lengths_gt = None

dataloader = test_dataloader

compute_metrics = False

for i, data in enumerate(dataloader, 0):
        with torch.no_grad():
            
            # Get and prepare inputs
            inputs = data['realsense']
            targets = data['optitrack']
            inputs, targets = inputs.float().cuda(), targets.float().cuda()

            # Perform forward pass
            outputs = neural_network(inputs)
            
            if hyper_params["normalize"]:
                outputs = test_data.denormalize_cuda(outputs)
                targets = test_data.denormalize_cuda(targets)
            
            # outputs of net are relative to parent bone/joint
            # compose skeleton
#             outputs = compose_output(outputs)
            # place outputs in correct position in space
#             outputs += pelvis_prediction_tiled_tensor[i * dataloader.batch_size : (i + 1) * dataloader.batch_size]
            
            prediction.append(outputs.cpu().numpy())
            
            # Compute evaluation metrics
            if compute_metrics:
                total_pck += pck(outputs, targets)
                total_pmpkpe += pmpkpe(outputs, targets, has_rot_data=hyper_params["use_rotation_data"])
                total_bone_lengths = bone_lengths(outputs, hyper_params["use_rotation_data"],
                                                  total_bone_lengths, 'joints')

#                 total_bone_lengths = bone_lengths(outputs, total_bone_lengths, 'bones')

                if hyper_params["use_rotation_data"]:
                    total_mpbrpe += mpbrpe(outputs, targets)
                else:
                    total_mpkpe += mpkpe(outputs, targets)

            # Show progress
            print(f"\rProgress: {i/len(dataloader)*100:.1f}%   ", end='')

prediction = np.concatenate(prediction)
print("\rProgress: Done!  ")

### Show Metric results

In [None]:
if hyper_params["use_rotation_data"]: 
    print(f"MPKPE: {np.round(total_mpbrpe[0].cpu().numpy() / i, 2):.02f}")
    print(f"MPBRE: {np.round(total_mpbrpe[1].cpu().numpy() / i, 2):.02f}")
else:
    print(f"MPKPE: {np.round(total_mpkpe.cpu().numpy() / i, 2):.02f}")

print(f"PCK: {np.round(total_pck.cpu().numpy() / i, 2):.02f}")
print(f"PMPKPE: {np.round(total_pmpkpe.cpu().numpy() / i, 2):.02f}")
    

# print('\nPrediction:')
std_bones = []
for bone_name, bone_std in zip(parents.keys(), total_bone_lengths):
    std_bones.append(np.std(bone_std))
#     print(f"  {bone_name} Std.: {np.std(bone_std)}")
print(f'MBS: {np.round(np.mean(std_bones), 2):.02f}')

## Calculate Ground truth Bone Stability

In [None]:
total_bone_lengths_gt = None
for i, data in enumerate(all_dataloader, 0):
        with torch.no_grad():
            
            # Get and prepare inputs
            targets = data['optitrack'].float().cuda()
            
            # Compute bone stability
            total_bone_lengths_gt = bone_lengths(targets, hyper_params["use_rotation_data"],
                                                 total_bone_lengths_gt, 'joints')
            
            # Show progress
            if i % 50 == 0 :
                print(f"\rProgress: {i/len(all_data)*100*all_dataloader.batch_size:.1f}%   ", end='')
print("\rProgress: Done!")

In [None]:
print('For All Bones:')
std_bones_gt = []
for bone_name, bone_std in zip(parents.keys(), total_bone_lengths_gt): 
    std_bones_gt.append(np.std(bone_std))
    print(f"  {bone_name} Std.: {np.round(np.std(bone_std), 2):.2f}")
print(f'  Mean Std.: {np.round(np.mean(std_bones_gt), 2):.2f}')

In [None]:
print('For Lower Bones only:')
std_bones_gt = []
for bone_name, bone_std in zip(parents.keys(), total_bone_lengths_gt): 
    if bone_name not in ['LFoot', 'LToe', 'RFoot', 'RToe', 'LShin', 'LThigh', 'RShin', 'RThigh', 'Hip']:
        continue
    std_bones_gt.append(np.std(bone_std))
    print(f"  {bone_name} Std.: {np.round(np.std(bone_std), 2):.2f}")
print(f'  Mean Std.: {np.round(np.mean(std_bones_gt), 2):.2f}')

## Calc Realsense background Noise

In [None]:
number_patches = 10
patch_size = 5
patch_long = 550

step_size = ceil(480/(number_patches+1))
patch_coordinates = list(range(step_size, 480, step_size))
patches = np.ndarray((len(all_dataloader_no_batch), number_patches))
holes_percentages = np.ndarray(len(all_dataloader_no_batch))

for i, data in enumerate(all_dataloader_no_batch, 0):
        with torch.no_grad():
            
            # Get and prepare inputs
            inputs = data['realsense'].numpy().squeeze(axis=1).squeeze(axis=0)
                                    
            # Compute patches   
            patches[i] = [inputs[x:x+patch_size, patch_long:patch_long+patch_size].mean() for x in patch_coordinates]

            # Compute Holes percentage
            holes_percentages[i] = (inputs.size - np.count_nonzero(inputs)) / inputs.size * 100
                              
            # Show depth image
#             if i == 440:
# #             if i in excerpts:
#                 depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(inputs, alpha=0.03), cv2.COLORMAP_JET)
#                 fig, ax = plt.subplots()
#                 ax.imshow(depth_colormap)
#                 rects = [ptch.Rectangle((patch_long, x), patch_size, patch_size, linewidth=1, edgecolor='none', facecolor='black', label=f'Patch: {i}') for i, x in enumerate(patch_coordinates)]
#                 for rect in rects:
#                     ax.add_patch(rect)
#                 plt.show()
# #                 plt.savefig(f'matplotvideo/frame_{i:05d}.png')
#                 break

            
            # Show progress
            if i % 50 == 0 :
                print(f"\rProgress: {i/len(all_data)*100*all_dataloader_no_batch.batch_size:.1f}%   ", end='')
print("\rProgress: Done!  ")

In [None]:
np.round(holes_percentages.std(),3)

In [None]:
no_testperson_frames = list(range(440,680)) + list(range(900, 1140)) + list(range(40170, 40370)) + list(range(41100, 41300))\
+ list(range(100550, 100770)) + list(range(101745, 101950))  + list(range(151580, 152000))

plt.plot(no_testperson_frames, patches[no_testperson_frames, 0])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 1])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 2])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 3])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 4])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 5])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 6])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 7])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 8])
plt.plot(no_testperson_frames, patches[no_testperson_frames, 9])

start=150000
end=start+3000
x = list(range(start, end))
# plt.plot(x, patches[start:end, 0])
# plt.plot(x, patches[start:end, 1])
# plt.plot(x, patches[start:end, 2])
# plt.plot(x, patches[start:end, 3])
# plt.plot(x, patches[start:end, 4])
# plt.plot(x, patches[start:end, 5])
# plt.plot(x, patches[start:end, 6])
# plt.plot(x, patches[start:end, 7])
# plt.plot(x, patches[start:end, 8])
# plt.plot(x, patches[start:end, 9])
plt.show()

In [None]:
patches_avg_distances = patches[no_testperson_frames].mean(axis=0)
patches_std = patches[no_testperson_frames].std(axis=0)

plt.scatter(patches_avg_distances, patches_std)
for i in range(number_patches):
    plt.annotate(i, (patches_avg_distances[i], patches_std[i]))

plt.ylabel('Standardabweichung')
plt.xlabel('Entfernung in mm')
plt.grid()
plt.show()

# Visualize as Graph

In [None]:
start_frame = 0
FPS = 15
length = FPS*105

plt.rcParams['figure.dpi'] = 300
plt.rcParams['savefig.dpi'] = 300

In [None]:
plt.rcParams['figure.dpi'] = 90
plt.rcParams['savefig.dpi'] = 90

In [None]:
# SetUp 3D Plots
def setup_3d_scatter(fig: plt.Figure, position: int, data: pd.DataFrame, title_prefix='3D Test'):
    ax = fig.add_subplot(position, projection='3d')
    ax.set_xlabel('X in cm')
    ax.set_ylabel('Y in cm')
    ax.set_zlabel('Z in cm')

    all_x, all_y, all_z = get_bone_coordinates(data)
    all_x = all_x[np.logical_not(np.isnan(all_x))]
    all_y = all_y[np.logical_not(np.isnan(all_y))]
    all_z = all_z[np.logical_not(np.isnan(all_z))]

    ax.set_xlim3d(all_x.min() - 10, all_x.max() + 10)
    ax.set_ylim3d(all_y.min() - 10, all_y.max() + 10)
    ax.set_zlim3d(all_z.min() - 10, all_z.max() + 10)

    ax.set_xlim3d(-200, 200)
    ax.set_ylim3d(-200, 200)
    ax.set_zlim3d(0, 200)

#     title = ax.set_title(title_prefix)
    title = None

    xs, ys, zs = get_bone_coordinates(data.iloc[[0]])
    graph = ax.scatter(xs, ys, zs, c='tab:blue', label="Model 6b)")

    length = len(all_x)

    return graph, data, title, length, title_prefix


def get_bone_coordinates(df: pd.DataFrame):
    xs = np.concatenate([df[x].Position.Z.values for x in marker_set])
    ys = np.concatenate([df[y].Position.X.values for y in marker_set])
    zs = np.concatenate([df[z].Position.Y.values for z in marker_set])
    return xs, ys, zs


def update(num):
#     update_graph(num, data_left, graph_left, title_left, title_prefix_left)
    update_graph(num, data_right, graph_right, title_right, title_prefix_right)


def update_graph(num, data, graph, title, title_prefix: str):
    num += start_frame
    data = data.iloc[[num]]
    graph._offsets3d = get_bone_coordinates(data)
#     title.set_text(f'{title_prefix}, time={num}')

# Prepare data
left_graph_data = session_1_data.optitrack_data
if hyper_params["normalize"]:
    left_graph_data = test_data.denormalize(left_graph_data)

right_graph_data = pd.DataFrame(prediction)
right_graph_data.columns = test_data.optitrack_data.columns


# draw 3D animation of model
fig = plt.figure()

# graph_left, data_left, title_left, length, title_prefix_left = setup_3d_scatter(fig, 121,
#                                                                                 left_graph_data, "Grundwahrheit")
graph_right, data_right, title_right, _, title_prefix_right = setup_3d_scatter(fig, 111, 
                                                                               right_graph_data, "Vorhersage")

plt.legend(loc='upper left')
anim = matplotlib.animation.FuncAnimation(fig, update, length, interval=1000/FPS, blit=False)



f = f"./model6b-pred_{start_frame}-{start_frame+length}_{FPS}fps.mp4" 
writervideo = matplotlib.animation.FFMpegWriter(fps=FPS) 
anim.save(f, writer=writervideo)



## Visualize in one Graph

In [None]:
# SetUp 3D Plots
def get_limit(data: pd.DataFrame):
    all_x, all_y, all_z = get_bone_coordinates(data)
    all_x = all_x[np.logical_not(np.isnan(all_x))]
    all_y = all_y[np.logical_not(np.isnan(all_y))]
    all_z = all_z[np.logical_not(np.isnan(all_z))]
    return all_x.min(), all_x.max(), all_y.min(), all_y.max(), all_z.min(), all_z.max()

def get_limits(data1: pd.DataFrame, data2: pd.DataFrame):
    xmin1, xmax1, ymin1, ymax1, zmin1, zmax1 = get_limit(data1)
    xmin2, xmax2, ymin2, ymax2, zmin2, zmax2 = get_limit(data2)
    return min(xmin1, xmin2), max(xmax1, xmax2), min(ymin1, ymin2), max(ymax1, ymax2), min(zmin1, zmin2), max(zmax1, zmax2) 

def setup_3d_scatter(fig: plt.Figure, data1: pd.DataFrame, data2: pd.DataFrame,
                     title_prefix: str):
    ax = fig.add_subplot(111, projection='3d')
    ax.set_xlabel('X in cm')
    ax.set_ylabel('Y in cm')
    ax.set_zlabel('Z in cm')

    xmin, xmax, ymin, ymax, zmin, zmax = get_limits(data1, data2)  
    ax.set_xlim3d(xmin - 10, xmax + 10)
    ax.set_ylim3d(ymin - 10, ymax + 10)
    ax.set_zlim3d(zmin - 10, zmax + 10)

    ax.set_xlim3d(-200, 200)
    ax.set_ylim3d(-200, 200)
    ax.set_zlim3d(0, 200)

    if title_prefix:
        title = ax.set_title(title_prefix)

    xs1, ys1, zs1 = get_bone_coordinates(data1.iloc[[0]])
    xs2, ys2, zs2 = get_bone_coordinates(data2.iloc[[0]])
    graph1 = ax.scatter(xs1, ys1, zs1, c='tab:orange', label='Grundwahrheit')
    graph2 = ax.scatter(xs2, ys2, zs2, c='tab:blue', label='Model 6b)')
    
    return graph1, graph2

def get_bone_coordinates(df: pd.DataFrame):
    xs = np.concatenate([df[x].Position.Z.values for x in marker_set])
    ys = np.concatenate([df[y].Position.X.values for y in marker_set])
    zs = np.concatenate([df[z].Position.Y.values for z in marker_set])
    return xs, ys, zs

def update(num):
    num += start_frame
    update_graph(num, red_data, graph1)
    update_graph(num, blue_data, graph2)


def update_graph(num, data, graph):
    data = data.iloc[[num]]
    graph._offsets3d = get_bone_coordinates(data)
    
# Prepare data
red_data = test_data.optitrack_data
if hyper_params["normalize"]:
    red_data = test_data.denormalize(red_data)

blue_data = pd.DataFrame(prediction)
blue_data.columns = test_data.optitrack_data.columns
# blue_data += np.tile(pelvis_prediction, 21)

# draw 3D animation of model
fig = plt.figure()

graph1, graph2 = setup_3d_scatter(fig, red_data, blue_data, None)
plt.legend(loc='upper left')

# length = len(red_data)

anim = matplotlib.animation.FuncAnimation(fig, update, length, interval=1000/FPS, blit=False)

# f = f"./model6b-pred+gt_{start_frame}-{start_frame+length}_{FPS}fps.mp4" 
# writervideo = matplotlib.animation.FFMpegWriter(fps=FPS) 
# anim.save(f, writer=writervideo)


## Show Realsense Pictures

In [None]:
images = []
fig, ax = plt.subplots()
plt.axis('off')
for i in range(start_frame, start_frame+length):
    image = test_data[i]['realsense'].numpy().squeeze()
    image = np.flipud(image)
    depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(image, alpha=0.03), cv2.COLORMAP_JET)
    
    images.append([ax.imshow(depth_colormap,animated=True)])
    
    # Show progress
    print(f"\rProgress: {i/length*100:.1f}%   ", end='')

In [None]:
plt.rcParams['figure.dpi'] = 300 /3
plt.rcParams['savefig.dpi'] = 300 /3

anim = matplotlib.animation.ArtistAnimation(fig, images, interval=1000/FPS, blit=False)
# plt.show()

f = f"./realsense_{start_frame}-{start_frame+length}_{FPS}fps.mp4" 
writervideo = matplotlib.animation.FFMpegWriter(fps=FPS) 
anim.save(f, writer=writervideo)

## Mean Position of Ground Truth

In [None]:
mean_df = pd.DataFrame(columns=session_2_data.optitrack_data.columns)
# mean_df.columns = test_data.optitrack_data.columns
mean_df.at[0] = test_data.optitrack_data.mean()
mean_df = mean_df.loc[mean_df.index.repeat(test_data.optitrack_data.index.size)]
mean_df.index = np.arange(test_data.optitrack_data.index.size)
mean_df = mean_df.astype(float)
left_graph_data = mean_df

# Calculate Stride Length

In [None]:
df = pd.DataFrame(prediction)
df.columns = test_data.optitrack_data.columns
df_gt = test_data.optitrack_data

frm=start_frame
to=frm+length*1.2
df = df.loc[frm:to]
df_gt = df_gt.loc[frm:to]

In [None]:
# df = pd.DataFrame(prediction)
# df.columns = test_data.optitrack_data.columns
# df_gt = test_data.optitrack_data

# frm=start_frame
# to=frm+length
# df = df.loc[frm:to]
# df_gt = df_gt.loc[frm:to]

# Low Pass Butterworth Filter
def low_pass_filter(data, cutoff=4, sample_rate=30.0, order=2):
    normal_cutoff = cutoff / (0.5 * sample_rate)
    # Get the filter coefficients 
    b, a = butter(order, normal_cutoff, btype='lowpass', analog=False)
    y = filtfilt(b, a, data)
    return y


def _add_velocity_and_speed(df: pd.DataFrame):
    df = df.sort_index(axis=1)
    for joint_name in set(df.columns.get_level_values(0)):  # für alle Gelenke Speed berechnen
        velocity = df[joint_name, 'Position'].diff()
        velocity = velocity.interpolate(axis=0, limit_direction='both')
        speed = np.linalg.norm(velocity[['X', 'Y', 'Z']].values, axis=1, ord=2)
        speed = low_pass_filter(speed)

        df[joint_name, 'Velocity', 'X'] = velocity['X']
        df[joint_name, 'Velocity', 'Y'] = velocity['Y']
        df[joint_name, 'Velocity', 'Z'] = velocity['Z']
        df[joint_name, 'Speed', 'XYZ'] = speed
        df = df.sort_index(axis=1)
    return df


def _find_valley_indices(df, distance, height=None, threshold=None, prominence=0.2):
    return find_peaks(-(df).to_numpy().flatten(), height, threshold, distance, prominence)[0]


def calc_standing_foot_positions(df, distance=20):
    df = _add_velocity_and_speed(df)
    
    results = pd.DataFrame(columns=['i', 'pos', 'vec', 'stride_len'], index=['LFoot', 'LToe', 'RFoot', 'RToe'])
    for foot, toe in [('LFoot', 'LToe'), ('RFoot', 'RToe')]:
                
        valley_indices       = _find_valley_indices(df[foot, 'Speed'], distance=distance)
        
        results.at[foot, 'i']   = valley_indices
        results.at[foot, 'pos'] = df.iloc[valley_indices].loc[:, (foot, 'Position')]
        results.at[foot, 'vec'] = df.iloc[valley_indices][foot]['Position'].diff()
        results.at[foot, 'stride_len'] = np.linalg.norm(results['vec'][foot][['X', 'Y', 'Z']].values, axis=1, ord=2)[1:]
        
        results.at[toe, 'i']   = valley_indices
        results.at[toe, 'pos'] = df.iloc[valley_indices].loc[:, (toe, 'Position')]
        results.at[toe, 'vec'] = df.iloc[valley_indices][toe]['Position'].diff()
        results.at[toe, 'stride_len'] = np.linalg.norm(results['vec'][toe][['X', 'Y', 'Z']].values, axis=1, ord=2)[1:]   
    return results, df


prediction_foots, df_valleys    = calc_standing_foot_positions(df)
gt_foots        , df_gt_valleys = calc_standing_foot_positions(df_gt)

## Visualize Valleys

In [None]:
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.plot(df_gt_valleys['LFoot', 'Speed'] * 0.3, '-x', markevery=gt_foots['i']['LFoot'], label="Grundwahrheit")
ax.plot(df_valleys['LFoot', 'Speed'] *0.3, '-x', markevery=prediction_foots['i']['LFoot'], label="Modell 6b)")
plt.xlabel("Datenpunkt")
plt.ylabel("Geschwindigkeit in m/s")
plt.title("Geschwindigkeit des linken Fußes\n von Modell 6b) und Grundwahrheit im Verlgleich")
plt.legend()
plt.show()

## Animated Valley Graph

In [None]:
np.arange(0, 5) / 30

In [None]:
speed_gt = df_gt_valleys['LFoot', 'Speed'] * 0.3
speed_pred = df_valleys['LFoot', 'Speed'] *0.3
valleys_gt = gt_foots['i']['LFoot']
valleys_pred = prediction_foots['i']['LFoot']

fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
line_gt,   = ax.plot([], [], marker='X', markevery=[], label="Grundwahrheit", c='tab:orange')
line_pred, = ax.plot([], [], marker='X', markevery=[], label="Modell 6b)", c='tab:blue')

plt.xlabel("Datenpunkt")
plt.ylabel("Geschwindigkeit in m/s")
plt.title("Geschwindigkeit des linken Fußes")
plt.legend(loc='upper left')

frames_to_show = 60*5
right_margin=30

ax.set_ylim(-0.3, 4.0)
ax.set_xlim(-5, frames_to_show+right_margin)

def animate(slice_end):
    slice_start = slice_end-frames_to_show
    for line, speed, valleys in [(line_gt, speed_gt, valleys_gt), (line_pred, speed_pred, valleys_pred)]:
        if (slice_start < 0):
            x_data = np.arange(0, slice_end)
            y_data = speed.iloc[0:slice_end].to_numpy().squeeze()
        else:
            x_data = np.arange(slice_start, slice_end)
            y_data = speed.iloc[slice_start:slice_end].to_numpy().squeeze()
            ax.set_xlim(slice_start, slice_end+right_margin)

        marker_in_slice = [x - slice_end for x in valleys if x in x_data]   
        line.set_data(x_data, y_data)
        line.set_markevery(marker_in_slice)


anim = animation.FuncAnimation(fig, animate, frames=length, interval=100/FPS, blit=False)

# f = f"./valley_graph_{start_frame}-{start_frame+length}_{FPS}fps.mp4" 
# writervideo = matplotlib.animation.FFMpegWriter(fps=FPS) 
# anim.save(f, writer=writervideo)

plt.show()


## Correlation MPKPE and F-Score

In [None]:
mpkpe =        np.asarray([25.20, 25.03, 28.94, 25.00, 23.61, 19.03, 20.24, 17.62, 21.34, 32.21])
f_score =      np.asarray([10.41, 10.63,  7.12, 10.30, 10.64, 14.53, 11.86, 17.71, 14.79,  8.51])
step_percent = np.asarray([123.81, 126.15, 106.81, 106.59, 121.89, 121.54, 122.85, 116.04, 120.78, 119.48])

x = f_score
y = mpkpe
# y = step_percent
res = linregress(x, y)

fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.scatter(x, y)
ax.plot(x, res.intercept + res.slope*x, 'r', label='fitted line')
plt.xlabel('f-Score')
plt.ylabel('MPKPE')
plt.show()

corr, _ = pearsonr(x, y)
print('Pearsons correlation: %.3f' % corr)

## Stride length distribution as histogram and histogram distance from ground truth

In [None]:
bins = 80

hist1_data = gt_foots['stride_len']['LFoot']
hist2_data = prediction_foots['stride_len']['LFoot']

# hist_range = (min(hist1_data.min(), hist2_data.min()), max(hist1_data.max(), hist2_data.max()))
hist_range = (0, 400)
gt_counts, gt_bins = np.histogram(gt_foots['stride_len']['LFoot'], bins=bins, range=hist_range)
pred_counts, pred_bins = np.histogram(prediction_foots['stride_len']['LFoot'], bins=bins, range=hist_range)

distance = distance_ordinal_histogram(gt_counts, pred_counts, bins)


print(f"Histogramm distance: {distance}")
print(f"Auflösung Histogramm: {np.round(gt_bins[1]-gt_bins[0], 2)} cm")

plt.figure()
plt.stairs(gt_counts, gt_bins, label="Grundwahrheit")
plt.stairs(pred_counts, pred_bins, label="Modell 5c)")
plt.xlabel("Schrittlänge in cm")
plt.ylabel("Anzahl der Schritte")
plt.legend()
plt.show()

## Print Stride length

In [None]:
for bone in ['LFoot', 'LToe', 'RFoot', 'RToe']:
    print(f"{bone}:")
#     print(f"  Ground Truth Stride length: \n{gt_foots.at[bone, 'stride_len']}")
    print(f"  Predicted Stride length: \n{prediction_foots.at[bone, 'stride_len']}")

## Calc Stride length Metrics

In [None]:
def add_stride_length_metrics(df: pd.DataFrame) -> pd.DataFrame:
    metrics = ['step amount', 
               'stride length mean', 
               'stride length median', 
               'stride length std', 
               'travelled distance'
              ]
    
    df[metrics[0]] = df['stride_len'].apply(len)
    df[metrics[1]] = df['stride_len'].apply(np.mean)
    df[metrics[2]] = df['stride_len'].apply(np.median)
    df[metrics[3]] = df['stride_len'].apply(np.std)
    df[metrics[4]] = df['stride_len'].apply(sum)
    df.at['__all__'] = df[metrics].mean()
    return df, metrics

gt_foots, stride_len_metrics = add_stride_length_metrics(gt_foots)
prediction_foots, _ = add_stride_length_metrics(prediction_foots)

print("Ground Truth:")
print(gt_foots[stride_len_metrics].round(3))
print("\nPrediction:")
print(prediction_foots[stride_len_metrics].round(3))


print(f"\nSchrittprozent: {np.round(100 * prediction_foots.at['__all__', 'step amount'] / gt_foots.at['__all__', 'step amount'], 2)}")
print(f"Mittlere Schrittlängendifferenz: {np.round(prediction_foots.at['__all__', 'stride length mean'] - gt_foots.at['__all__', 'stride length mean'], 2)}")
print(f"Mittlere Differenz der Schrittlängenstandardabweichung: {np.round(prediction_foots.at['__all__', 'stride length std'] - gt_foots.at['__all__', 'stride length std'], 2)}")
print(f"Verhältnis Strecke: {np.round(prediction_foots.at['__all__', 'travelled distance'] / gt_foots.at['__all__', 'travelled distance'], 2)}")


In [None]:
msld_sampled=np.array([-12.92, -9.64, -3.39, -22.61, -2.8, -0.32, -6.79, -9.13, -1.03, -6.53])
np.abs(msld_sampled).mean().round(2)

## Confusion Matrix

In [None]:
def calc_confusion_matrix(ground_truth_valleys: np.ndarray, predicted_valleys: np.ndarray, pad_size=3, normalize=True):
    
    predict = set(predicted_valleys)
    true_positive = []
    true_negative = [] # list of tuples: [(gt_step_index, prediction_step_index), (...)]
    false_positive = []
    false_negative = []
    confusion_matrix = pd.DataFrame(columns=['step', 'no step', '__all__'],
                                      index=['step', 'no step', '__all__'])
    confusion_matrix.index.name = 'actual'
    confusion_matrix.columns.name = 'predicted'

    for last_step_index, cur_step_index in zip(np.insert(ground_truth_valleys, 0, 0), ground_truth_valleys):
        pad_start = cur_step_index - pad_size
        pad_end = cur_step_index + pad_size
        step_pad = set(range(pad_start, pad_end))

        # false negative or true positive (and false positive)
        steps = predict.intersection(step_pad)
        if steps == set():
            false_negative.append(cur_step_index)
        elif len(steps) == 1:
            step, = steps
            true_positive.append((cur_step_index, step))
        else:
            step_list = list(steps)
            true_positive.append((cur_step_index, step_list[0]))
            false_positive += step_list[1:]

        # true negative or false positive
        if last_step_index != 0:
            last_step_index += pad_size
        no_step = set(range(last_step_index, pad_start))
        if predict.isdisjoint(no_step):
            true_negative.append(no_step)
        else:
            false_positive += list(predict.intersection(no_step))
    
   #confusion_matrix.at['actual', 'predcition']
    confusion_matrix.at[   'step',    'step'] = len(true_positive)
    confusion_matrix.at[   'step', 'no step'] = len(false_negative)
    confusion_matrix.at['no step',    'step'] = len(false_positive)
    confusion_matrix.at['no step', 'no step'] = len(true_negative)
    confusion_matrix.at['__all__'] = confusion_matrix.sum(0)
    confusion_matrix['__all__'] = confusion_matrix.sum(1)
    confusion_matrix = confusion_matrix.astype(int)
    if normalize:
        confusion_matrix /= confusion_matrix.at['__all__','__all__']
    return confusion_matrix
        
pad_size=3
confusion_matrix_lfoot = calc_confusion_matrix(gt_foots['i']['LFoot'], prediction_foots['i']['LFoot'], pad_size)
confusion_matrix_rfoot = calc_confusion_matrix(gt_foots['i']['RFoot'], prediction_foots['i']['RFoot'], pad_size)
confusion_matrix_ltoe = calc_confusion_matrix(gt_foots['i']['LToe'], prediction_foots['i']['LToe'], pad_size)
confusion_matrix_rtoe = calc_confusion_matrix(gt_foots['i']['RToe'], prediction_foots['i']['RToe'], pad_size)

confusion_matrix_mean = (confusion_matrix_lfoot + confusion_matrix_rfoot + 
                         confusion_matrix_ltoe + confusion_matrix_rtoe) / 4

df1_styler = confusion_matrix_lfoot.style.set_table_attributes("style='display:inline'").set_caption('Left Foot')
df2_styler = confusion_matrix_rfoot.style.set_table_attributes("style='display:inline'").set_caption('Right Foot')
df3_styler = confusion_matrix_ltoe.style.set_table_attributes("style='display:inline'").set_caption('Left Toe')
df4_styler = confusion_matrix_rtoe.style.set_table_attributes("style='display:inline'").set_caption('Right Toe')

df5_styler = confusion_matrix_mean.style.set_table_attributes("style='display:inline'").set_caption('Mean')

space = "\xa0" * 10
display_html(df3_styler._repr_html_() + space + df4_styler._repr_html_(), raw=True)
display_html(df1_styler._repr_html_() + space + df2_styler._repr_html_(), raw=True)

display_html(df5_styler._repr_html_(), raw=True)

TP = confusion_matrix_mean.at['step', 'step']
FP = confusion_matrix_mean.at['no step', 'step']
TN = confusion_matrix_mean.at['no step', 'no step']
FN = confusion_matrix_mean.at['step', 'no step']
accuracy = (TP + TN) / (TP + TN + FP + FN)
precision = (TP) / (TP + FP)
recall = (TP) / (TP + FN)
f_score = (precision * recall) / (precision + recall)

print(f"Accuracy in %:  {np.round(accuracy*100, 2)}")
print(f"Precision in %: {np.round(precision*100, 2)}")
print(f"Recall in %:    {np.round(recall*100, 2)}")
print(f"F-Score in %:   {np.round(f_score*100, 2)}")


## Calc Confusion Matrix with Random Foot Indicies

In [None]:
sample_size = 10000
total_accuracy  = [] 
total_precision = []
total_recall    = []
total_f_score   = []
for _ in range(sample_size):
    random_foots = np.sort(np.random.randint(low=1, high=len(df_gt), size=int(gt_foots['step amount']['__all__'])))
    confusion_matrix_random = calc_confusion_matrix(gt_foots['i']['LFoot'], random_foots)

    TP = confusion_matrix_random.at['step', 'step']
    FP = confusion_matrix_random.at['no step', 'step']
    TN = confusion_matrix_random.at['no step', 'no step']
    FN = confusion_matrix_random.at['step', 'no step']
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    precision = (TP) / (TP + FP)
    recall = (TP) / (TP + FN)
    f_score = (precision * recall) / (precision + recall)
    total_accuracy.append(accuracy)
    total_precision.append(precision)
    total_recall.append(recall) 
    total_f_score.append(f_score) 
    
print(f"Accuracy in %:  {np.round(np.mean(total_accuracy)*100, 2)}")
print(f"Precision in %: {np.round(np.mean(total_precision)*100, 2)}")
print(f"Recall in %:    {np.round(np.mean(total_recall)*100, 2)}")
print(f"F-Score in %:   {np.round(np.mean(total_f_score)*100, 2)}")

## Visualize Footsteps

In [None]:
def draw_foot_arrows(foot_data, ax, left_color, right_color):
    for f, t, color in [('LFoot', 'LToe', left_color), ('RFoot', 'RToe', right_color)]:
        for (i, foot), (_, toe) in zip(foot_data['pos'][f].iterrows(), foot_data['pos'][t].iterrows()):
            x = foot['X']
            y = foot['Z']
            d_x = toe['X'] - x
            d_y = toe['Z'] - y
            ax.arrow(x, y, d_x, d_y, color=color, head_width=3)
        
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
# fig.suptitle('Einige erkannte Schritte der Grundwahrheit')
fig.suptitle('Modell 6b)')
ax.set(xlabel='X in cm', ylabel='Y in cm')

ax.set_aspect('equal', 'box')

draw_foot_arrows(gt_foots,         ax, 'lightsteelblue', 'bisque')
draw_foot_arrows(prediction_foots, ax, 'tab:blue', 'tab:orange')


## Animate Footsteps

In [None]:
def draw_foot_arrows(num, foot_data, ax, left_color, right_color):
    num -= frames_to_show
    for f, t, color in [('LFoot', 'LToe', left_color), ('RFoot', 'RToe', right_color)]:
        foot_iter = foot_data.pos[f].loc[foot_data.pos[f].index.isin(range(num, num+frames_to_show))].iterrows()
        toe_iter  = foot_data.pos[t].loc[foot_data.pos[t].index.isin(range(num, num+frames_to_show))].iterrows()
        for (i, foot), (_, toe) in zip(foot_iter, toe_iter):
            x = foot['X']
            y = foot['Z']
            d_x = toe['X'] - x
            d_y = toe['Z'] - y
            ax.arrow(x, y, d_x, d_y, color=color, head_width=3)
        
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
# fig.suptitle('Einige erkannte Schritte der Grundwahrheit')
# fig.suptitle('Modell 6b)')
ax.set(xlabel='X in cm', ylabel='Y in cm')

ax.set_xlim(-250,250)
ax.set_ylim(-250,250)

legend_elements = [Line2D([0], [0], color='tab:orange', lw=4, label='Links Grundw.'),
                   Line2D([0], [0], color='tab:blue', lw=4, label='Links Model'),
                   Line2D([0], [0], color='bisque', lw=4, label='Rechts Grundw.'),
                   Line2D([0], [0], color='lightsteelblue', lw=4, label='Rechts Model'),
                   ]

def animate(num):  
    ax.clear()
    ax.set_xlim(-250,250)
    ax.set_ylim(-250,250)
    ax.set(xlabel='X in cm', ylabel='Y in cm')
    ax.legend(handles=legend_elements, loc='lower left')
    draw_foot_arrows(num, gt_foots,         ax, 'tab:orange', 'bisque')
    draw_foot_arrows(num, prediction_foots, ax, 'tab:blue', 'lightsteelblue')

anim = animation.FuncAnimation(fig, animate, frames=length, interval=1000/FPS, blit=False)

f = f"./foot_position_graph_{start_frame}-{start_frame+length}_{FPS}fps.mp4" 
writervideo = animation.FFMpegWriter(fps=FPS) 
anim.save(f, writer=writervideo)

# plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt


data = [[ 66386, 174296,  75131, 577908,  32015],
        [ 58230, 381139,  78045,  99308, 160454],
        [ 89135,  80552, 152558, 497981, 603535],
        [ 78415,  81858, 150656, 193263,  69638],
        [139361, 331509, 343164, 781380,  52269]]

columns = ('Freeze', 'Wind', 'Flood', 'Quake', 'Hail')
rows = ['%d year' % x for x in (100, 50, 20, 10, 5)]

values = np.arange(0, 2500, 500)
value_increment = 1000

# Get some pastel shades for the colors
colors = plt.cm.BuPu(np.linspace(0, 0.5, len(rows)))
n_rows = len(data)

index = np.arange(len(columns)) + 0.3
bar_width = 0.4

# Initialize the vertical-offset for the stacked bar chart.
y_offset = np.zeros(len(columns))

# Plot bars and create text labels for the table
cell_text = []
for row in range(n_rows):
    plt.bar(index, data[row], bar_width, bottom=y_offset, color=colors[row])
    y_offset = y_offset + data[row]
    cell_text.append(['%1.1f' % (x / 1000.0) for x in y_offset])
# Reverse colors and text labels to display the last value at the top.
colors = colors[::-1]
cell_text.reverse()

# Add a table at the bottom of the axes
the_table = plt.table(cellText=cell_text,
                      rowLabels=rows,
                      rowColours=colors,
                      colLabels=columns,
                      loc='center')

# Adjust layout to make room for the table:
plt.subplots_adjust(left=0.2, bottom=0.2)

plt.ylabel("Loss in ${0}'s".format(value_increment))
plt.yticks(values * value_increment, ['%d' % val for val in values])
plt.xticks([])
plt.title('Loss by Disaster')

plt.show()


In [None]:
def set_text_row(table, text_array, column_index=0):
    if text_array.shape == (): text_array = np.expand_dims(text_array, 0)
    text_array = text_array[::-1]
    for i in range(min(text_array.size, 5)):
        table.get_celld()[(i+1, column_index)].get_text().set_text(text_array[i]) 

left_foot_gt_strides = pd.DataFrame(
    np.concatenate((np.array([0]), gt_foots.stride_len.LFoot)), 
    index=gt_foots.i.LFoot, 
    columns=["stride_len"]
)

left_foot_pred_strides = pd.DataFrame(
    np.concatenate((np.array([0]), prediction_foots.stride_len.LFoot)), 
    index=prediction_foots.i.LFoot, 
    columns=["stride_len"]
)

fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.axis('off')

columns = ('Grundw.', 'Modell')

# cell_colours =np.array(['tab:orange', 'tab:blue'])

table = ax.table(cellColours=np.array(['white']).repeat(10).reshape(5,2),
                 colColours=['tab:orange', 'tab:blue'],
                 colLabels=columns,
                 loc='center')
for (row, col), cell in table.get_celld().items():
    if (row == 0):
        cell.set_text_props(fontproperties=FontProperties(weight='bold'))
    if (row > 1):
        cell.get_text().set_color('lightgrey')
            
table.set_fontsize(34)
table.scale(1, 4)


def animate(num):
    set_text_row(table, left_foot_gt_strides.loc[num-frames_to_show:num].to_numpy().squeeze().round(2), 0)
    set_text_row(table, left_foot_pred_strides.loc[num-frames_to_show:num].to_numpy().squeeze().round(2), 1)


# for num in range(0, length):
#     set_text_row(table, left_foot_gt_strides.loc[num-frames_to_show:num].to_numpy().squeeze().round(2), 0)




anim = animation.FuncAnimation(fig, animate, frames=length, interval=1000/FPS, blit=False)

f = f"./stride_length_table_{start_frame}-{start_frame+length}_{FPS}fps.mp4" 
writervideo = animation.FFMpegWriter(fps=FPS) 
anim.save(f, writer=writervideo)

# plt.show()

## Calculate optimal distance for Stride length Calculation
Depending on representation (either bones/rotation or joints) the stride length calculation calculates different steps for the ground truth. This script iterates over the distance parameter to get the value for which the calculated ground truth steps are equal.

In [None]:
distances = []
max_distance = 100
for distance in range(1, max_distance):
    foot_pos, _ = calc_standing_foot_positions(test_data.optitrack_data, distance=distance)
    distances.append(foot_pos['stride_len'].apply(len).mean())
    print(f"\rProgress: {distance/max_distance*100:.1f}%   ", end='')

print("\rProgress: 100.0%   ")

In [None]:
plt.plot(list(range(1, max_distance)), distances)
plt.title("Erkannte Schritte über Mindestabstand")
plt.xlabel("Mindestabstand")
plt.ylabel("Erkannte Schritte")
plt.show()