# Jumping Tensegrity Robot Motion Capture Data Analysis

Jonathan Green and Dario Bozinowski, 2024

## Inital Setup

Dependencies

In [64]:
import cv2
import pandas               as pd
import numpy                as np
import matplotlib.pyplot    as plt
import matplotlib.animation as animation
import ipywidgets           as widgets
import plotly.graph_objects as go

from mpl_toolkits.mplot3d          import Axes3D
from IPython.display               import display, Video
from sklearn.cluster               import DBSCAN
from sklearn.neighbors             import LocalOutlierFactor
from sklearn.ensemble              import IsolationForest
from scipy.signal                  import butter, filtfilt, savgol_filter
from moviepy.editor                import VideoFileClip, concatenate_videoclips, CompositeVideoClip, vfx
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip

Retrieve Motion capture data

In [2]:
# Path to the CSV file
file_path = './data/20240603_test5/T5_3A_thirdcomp.csv'

# Read the file and find the start of frame data
with open(file_path, 'r') as file:
    lines = file.readlines()


# Extract lines that begin with the word "frame"
frame_start_index = None
for index, line in enumerate(lines):
    if line.startswith('frame'):
        frame_start_index = index
        break

Extract and parse datapoints from file

In [3]:
all_positions = []  # This will store a list of positions for each frame
time_stamps   = []
numb_points   = []

for line in lines[frame_start_index:]:
    parts = line.split(',')
    if parts[0] != 'frame':
        continue
    
    frame_time = float(parts[2])
    num_points = int(parts[4])
    
    time_stamps.append(frame_time)
    numb_points.append(num_points)

    frame_positions = []  # Store positions for this frame only
    for i in range(num_points):
        base_index = 5 + i * 5
        x = float(parts[base_index])
        z = float(parts[base_index + 1])
        y = float(parts[base_index + 2])
        frame_positions.append((x, y, z))
    
    all_positions.append(frame_positions)  # Append list of tuples for each frame

pass

## Visualise unprocessed data

Visualise the data extracted from each frame

In [4]:

def update_plot(frame_index):
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')
    
    # Convert current frame positions to a NumPy array
    positions = np.array(all_positions[frame_index])
    
    if positions.size > 0:
        ax.scatter(
            positions[:, 0],
            positions[:, 1],
            positions[:, 2]
        )

    # Set axis limits
    ax.set_xlim((-1.5, 1.5))
    ax.set_ylim((-1.5, 1.5))
    ax.set_zlim((-2.0, 0.5))

    ax.set_xlabel('X Coordinate')
    ax.set_ylabel('Y Coordinate')
    ax.set_zlabel('Z Coordinate')
    ax.set_title(f'Interactive Plot of Marker Positions at Frame {frame_index}')
    plt.show()

# Slider to select the frame to display
frame_slider = widgets.IntSlider(
    min=0, 
    max=len(all_positions) - 1, 
    step=1, 
    value=0, 
    description='Select Frame:'
)

# Interactive widget
out = widgets.interactive_output(update_plot, {'frame_index': frame_slider})

# Display the interactive plot with slider
display(widgets.VBox([frame_slider, out]))

VBox(children=(IntSlider(value=0, description='Select Frame:', max=1693), Output()))

Visualise all datapoints in 3D space

In [30]:
def interactive_plot_all_points(all_positions: list):
    # Assuming all_positions is a list of lists where each sub-list contains tuples (x, y, z)
    # Flatten all_points for plotting
    all_points = [point for frame in all_positions for point in frame]
    all_points = np.array(all_points)  # Convert to a NumPy array for easier handling

    # Check if there are points to plot
    if all_points.size == 0:
        print("No points to plot.")
    else:
        # Create the 3D scatter plot
        fig = go.Figure(data=[go.Scatter3d(
            x=all_points[:, 0],
            y=all_points[:, 1],
            z=all_points[:, 2],
            mode='markers',
            marker=dict(
                size=2,
                color=np.linspace(0, 1, all_points.shape[0]),  # Colour markers by position in dataset
                colorscale='thermal'
            )
        )])

        # Update plot layout
        fig.update_layout(
            title='Scatter Plot of MoCap Data',
            margin=dict(l=0, r=0, b=0, t=0),
            scene=dict(
                xaxis_title='X Coordinate',
                yaxis_title='Y Coordinate',
                zaxis_title='Z Coordinate'
            ),
            # title='All Marker Positions Across All Frames'
        )

        # Show the plot
        fig.show()

interactive_plot_all_points(all_positions)

## Process Data

Remove outliers, which are erroneous datapoints from MoCap reflections

In [6]:

def IQR_outlier_detection(all_positions: list, time_stamps: list):
    # Detect outliers using the interquartile range

    # Initialize containers for the filtered points and outliers
    filtered_positions   = []
    outlier_positions    = []
    filtered_time_stamps = []
    
    # Process each frame for outlier detection
    for frame, time_stamp in zip(all_positions, time_stamps):
        if len(frame) > 0:
            points = np.array(frame)  # Convert frame to a numpy array
            
            # Calculate Q1, Q3, and IQR
            Q1 = np.percentile(points, 25, axis=0)
            Q3 = np.percentile(points, 75, axis=0)
            IQR = Q3 - Q1
            
            # Define lower and upper bounds for outliers
            lower_bound = Q1 - 3 * IQR
            upper_bound = Q3 + 3 * IQR
            
            # Determine inliers and outliers
            is_inlier = (points >= lower_bound) & (points <= upper_bound)
            inliers = points[np.all(is_inlier, axis=1)]
            outliers = points[~np.all(is_inlier, axis=1)]

            # Append the results to their respective lists
            if inliers.size > 0:
                filtered_positions.append(inliers.tolist())
                filtered_time_stamps.append(time_stamp)
            if outliers.size > 0:
                outlier_positions.append(outliers.tolist())

    return filtered_positions, outlier_positions, filtered_time_stamps


def LOF_outlier_detection(all_positions, time_stamps, n_neighbors=8, contamination='auto'):
    """
    Detects outliers in 3D positions using the Local Outlier Factor (LOF) algorithm for each time step.
    
    Parameters:
    - all_positions: List of lists, where each inner list contains tuples representing points in 3D space
    - time_stamps: List of timestamps corresponding to the points in all_positions
    - n_neighbors: Number of neighbors to use for LOF
    - contamination: The proportion of points to be considered as outliers
    
    Returns:
    - filtered_positions: List of lists of positions excluding outliers for each time step
    - outlier_positions: List of lists of positions identified as outliers for each time step
    - filtered_time_stamps: List of lists of timestamps excluding outliers for each time step
    """
    
    # Validate the input data
    if len(all_positions) != len(time_stamps):
        raise ValueError("The length of all_positions and time_stamps must be the same")
    
    for positions in all_positions:
        if not all(isinstance(pos, tuple) and len(pos) == 3 for pos in positions):
            raise ValueError("Each item in the inner lists of all_positions must be a tuple of three numerical values")
    
    # Initialize lists to hold the results
    filtered_positions   = []
    outlier_positions    = []
    filtered_time_stamps = []
    
    # Loop through each time step
    for i, positions in enumerate(all_positions):
        if len(positions) == 0:
            continue
        
        # Convert the list of positions to a NumPy array
        positions_array = np.array(positions)
        
        # Initialize the Local Outlier Factor model
        lof = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination)
        
        # Fit the model and predict outliers
        predictions = lof.fit_predict(positions_array)
        
        # Separate the inliers and outliers for the current time step
        current_filtered_positions = []
        current_outlier_positions = []
        current_filtered_time_stamps = []
        
        for j, pred in enumerate(predictions):
            if pred == 1:  # Inlier
                current_filtered_positions.append(positions[j])
                current_filtered_time_stamps.append(time_stamps[i])
            else:  # Outlier
                current_outlier_positions.append(positions[j])
        
        # Append the results for the current time step
        if current_filtered_positions:
            filtered_positions.append(current_filtered_positions)
            filtered_time_stamps.append(current_filtered_time_stamps)
        if current_outlier_positions:
            outlier_positions.append(current_outlier_positions)
    
    return filtered_positions, outlier_positions, filtered_time_stamps


def IF_outlier_detection(all_positions, time_stamps, n_estimators=100, contamination=0.1):
    """
    Detects outliers in 3D positions using the Isolation Forest algorithm for each time step.
    
    Parameters:
    - all_positions: List of lists, where each inner list contains tuples representing points in 3D space
    - time_stamps: List of timestamps corresponding to the points in all_positions
    - n_estimators: Number of base estimators in the Isolation Forest ensemble
    - contamination: The proportion of points to be considered as outliers
    
    Returns:
    - filtered_positions: List of lists of positions excluding outliers for each time step
    - outlier_positions: List of lists of positions identified as outliers for each time step
    - filtered_time_stamps: List of lists of timestamps excluding outliers for each time step
    """
    
    # Validate the input data
    if len(all_positions) != len(time_stamps):
        raise ValueError("The length of all_positions and time_stamps must be the same")
    
    for positions in all_positions:
        if not all(isinstance(pos, tuple) and len(pos) == 3 for pos in positions):
            raise ValueError("Each item in the inner lists of all_positions must be a tuple of three numerical values")
    
    # Initialize lists to hold the results
    filtered_positions = []
    outlier_positions = []
    filtered_time_stamps = []
    
    # Loop through each time step
    for i, positions in enumerate(all_positions):
        if len(positions) == 0:
            continue
        
        # Convert the list of positions to a NumPy array
        positions_array = np.array(positions)
        
        # Initialize the Isolation Forest model
        iforest = IsolationForest(n_estimators=n_estimators, contamination=contamination, random_state=42)
        
        # Fit the model and predict outliers
        predictions = iforest.fit_predict(positions_array)
        
        # Separate the inliers and outliers for the current time step
        current_filtered_positions = []
        current_outlier_positions = []
        current_filtered_time_stamps = []
        
        for j, pred in enumerate(predictions):
            if pred == 1:  # Inlier
                current_filtered_positions.append(positions[j])
                current_filtered_time_stamps.append(time_stamps[i])
            else:  # Outlier
                current_outlier_positions.append(positions[j])
        
        # Append the results for the current time step
        if current_filtered_positions:
            filtered_positions.append(current_filtered_positions)
            filtered_time_stamps.append(current_filtered_time_stamps)
        if current_outlier_positions:
            outlier_positions.append(current_outlier_positions)
    
    return filtered_positions, outlier_positions, filtered_time_stamps

Average datapoints in each frame to determine geometric centroid \
Note: We assume that the geometric centroid is reasonably close to the centre of mass

In [7]:

filtered_avg_positions = []
# Assume 'all_positions' is a list of numpy arrays, where each array contains points for a frame
for frame in all_positions:
    if len(frame) > 0:  # Ensure there are points in the frame
        points           = np.array(frame)
        average_position = np.mean(points, axis=0)  # Compute the mean along each column (x, y, z)
        filtered_avg_positions.append(average_position.tolist())
    else:
        filtered_avg_positions.append([np.nan, np.nan, np.nan])  # Append NaNs if frame is empty


Filter averaged data to generate and smooth a trajectory

In [8]:
# Butterworth low-pass filter
def butter_lowpass_filter(data):
    fs     = 120  # Sample rate, Hz (adjust based on your data sampling rate)
    cutoff = 50   # Desired cutoff frequency of the filter, Hz    
    order  = 5   # Butterworth order

    nyq           = 0.5 * fs  # Nyquist Frequency
    normal_cutoff = cutoff / nyq
    b, a          = butter(order, normal_cutoff, btype='low', analog=False)
    y             = filtfilt(b, a, data, axis=0)
    return y

# Savitzky-Golay filter
def SG_filter(filtered_avg_positions, window_length=5, polyorder=2):
    """
    Applies the Savitzky-Golay filter to smooth a list of 3D points.
    
    Parameters:
    - filtered_avg_positions: List of tuples representing 3D points [(x1, y1, z1), (x2, y2, z2), ...]
    - window_length: The length of the filter window (i.e., the number of coefficients). Must be a positive odd integer.
    - polyorder: The order of the polynomial used to fit the samples. Must be less than window_length.
    
    Returns:
    - smoothed_trajectory: List of tuples representing smoothed 3D points
    """
    
    if len(filtered_avg_positions) < window_length:
        raise ValueError("The length of filtered_avg_positions must be at least as long as the window_length.")
    
    # Convert the positions to a NumPy array for processing
    positions_array = np.array(filtered_avg_positions)
    
    # Apply Savitzky-Golay filter to each dimension (x, y, z) separately
    smoothed_x = savgol_filter(positions_array[:, 0], window_length, polyorder)
    smoothed_y = savgol_filter(positions_array[:, 1], window_length, polyorder)
    smoothed_z = savgol_filter(positions_array[:, 2], window_length, polyorder)
    
    # Combine the smoothed coordinates back into a list of tuples
    smoothed_trajectory = list(zip(smoothed_x, smoothed_y, smoothed_z))
    
    return smoothed_trajectory


## Visualise Processed Data

Create HTML visualisation of outliers detected and removed

In [28]:
def plot_outliers(filtered_positions: list, outlier_positions: list, plot_type: str='html', filname: str='out/outlier_detection.html'):
    # Assuming 'filtered_positions' and 'outlier_positions' contain lists of 3D coordinates for each frame
    # Flatten the lists of lists into single lists for Plotly plotting
    filtered_points = np.vstack(filtered_positions)  # Stacking all filtered points
    outlier_points  = np.vstack(outlier_positions)   # Stacking all outlier points

    # Create a Plotly graph object for the interactive plot
    fig = go.Figure()

    # Add the filtered points in blue
    fig.add_trace(go.Scatter3d(
        x=filtered_points[:, 0],
        y=filtered_points[:, 1],
        z=filtered_points[:, 2],
        mode='markers',
        marker=dict(
            size=3,
            color='blue',  # Use blue to indicate filtered points
            opacity=0.8
        ),
        name='Filtered Points'
    ))

    # Add the outliers in red
    fig.add_trace(go.Scatter3d(
        x=outlier_points[:, 0],
        y=outlier_points[:, 1],
        z=outlier_points[:, 2],
        mode='markers',
        marker=dict(
            size=3,
            color='red',  # Use red to indicate outliers
            opacity=0.8
        ),
        name='Outliers'
    ))

    # Update the layout to create a visually appealing and informative plot
    fig.update_layout(
        title='Scatter Plot of outliers removed from MoCap Data',
        scene=dict(
            xaxis_title='X Coordinates',
            yaxis_title='Y Coordinates',
            zaxis_title='Z Coordinates'
        ),
        legend_title_text='Point Type'
    )

    # Output the plot
    if plot_type == 'html':
        fig.write_html(filename)
    else:
        fig.show()


# Select which outlier detection method to use

# filtered_positions, outlier_positions, filtered_time_stamps = IQR_outlier_detection(all_positions, time_stamps)
filtered_positions, outlier_positions, filtered_time_stamps = LOF_outlier_detection(all_positions, time_stamps, n_neighbors=9)
# filtered_positions, outlier_positions, filtered_time_stamps = IF_outlier_detection(all_positions, time_stamps, n_estimators=30)

# Show outliers detected and removed
plot_outliers(filtered_positions, outlier_positions, plot_type='inline')

# Show filtered data
interactive_plot_all_points(filtered_positions)

Create HTML visualisation of trajectory of centroid

In [31]:

def plot_trajectory(positions: list, start_time, end_time, plot_type: str='html', filename: str='out/trajectory.html'):

    # Slice out relevant section of time
    positions = positions[int(len(positions)*start_time/time_stamps[-1]):int(len(positions)*end_time/time_stamps[-1])]

    # Convert list of average positions to a NumPy array for easier handling
    positions = np.array(positions)

    # Create a Plotly figure
    fig = go.Figure(data=[go.Scatter3d(
        x=positions[:, 0],
        y=positions[:, 1],
        z=positions[:, 2],
        mode='lines+markers',

        text=[f'x: {round(xp,4)}, y: {round(yp,4)}, z: {round(zp,4)}, t: {t}, n of points: {np}' for np, xp, yp, zp, t in zip(numb_points, positions[:, 0], positions[:, 1], positions[:, 2], filtered_time_stamps)],
        hoverinfo='text',
        marker=dict(
            size=5,
            color=np.linspace(0, end_time - start_time, len(positions)),  # Gradient color based on time
            colorscale='thermal',
            showscale=True,
            colorbar=dict(
                title='Time (seconds)'  # Add title to the colorbar
            )
        ),

        line=dict(
            color='#1f77b4',  # Consistent color for the line
            width=2
        )
    )])


    # Update plot layout for better visualization
    fig.update_layout(
        title='Trajectory of Average Positions Over Time',
        scene=dict(
            xaxis_title='X Coordinates',
            yaxis_title='Y Coordinates',
            zaxis_title='Z Coordinates'
        ),
        
    )

    # Show the interactive plot
    if plot_type == 'html':
        fig.write_html(filename)
    else:
        fig.show()


# Plot trajectory before and after trajectory smoothing

# smoothed_avg_positions = butter_lowpass_filter(filtered_avg_positions)
smoothed_avg_positions = SG_filter(filtered_avg_positions, window_length=15, polyorder=3)

# Slice out region of interest
start_time = 7.5
end_time   = 11.0

# Create plots
plot_trajectory(filtered_avg_positions, start_time, end_time, plot_type='inline')
plot_trajectory(smoothed_avg_positions, start_time, end_time, plot_type='inline')

## Define Video Creation Helper Functions

Take trajectory data and create an animated plot of the jump height

In [70]:
def create_jump_animation(heights, filename, fps=30, duration=10):
    """
    Creates a video of a line plot being drawn from a list of heights.
    
    Parameters:
    - heights: List of floats representing the y-values of the line plot.
    - filename: Name of the output video file.
    - fps: Frames per second for the video.
    - duration: Duration of the video in seconds.
    """
    # Calculate the total number of frames needed
    total_frames = int(fps * duration)

    # Set style
    plt.style.use("seaborn-v0_8")
    colors = plt.rcParams['axes.prop_cycle'].by_key()['color']

    # Create plot
    fig, ax = plt.subplots()
    line, = ax.plot([], [], lw=2, color=colors[2]) # colors[0] is blue, colors[1] is green, colors[2] is red
    ax.set_xlim(0, len(heights) - 1)
    ax.set_ylim(min(heights), max(heights))
    ax.set_title("Motion Capture Jump Height")
    ax.set_xlabel("Frame index (at 120fps)")
    ax.set_ylabel("Height (m)")
    
    # Initialize the data of the line plot
    def init():
        line.set_data([], [])
        return line,

    # Update the line plot for each frame
    def update(frame):
        # Calculate the current progress through the heights list
        current_idx = int(frame / total_frames * len(heights))
        x = list(range(current_idx + 1))
        y = heights[:current_idx + 1]
        line.set_data(x, y)
        return line,

    # Create the animation
    anim = animation.FuncAnimation(fig, update, frames=total_frames, init_func=init, blit=True)

    # Save the animation using FFMpegWriter for mp4
    writer = animation.FFMpegWriter(fps=fps)
    anim.save(filename, writer=writer)
    
    plt.close(fig)


Stitch the video footage to the animated jump plot

In [61]:
def stitch_videos_side_by_side(jump_vid: str, plot_vid: str, output_file: str, start_time: float, end_time: float, offset: float):

    # Load the two videos
    jump_clip = VideoFileClip(jump_vid).subclip(start_time + offset, end_time + offset)
    plot_clip = VideoFileClip(plot_vid)
    
    # Resize the mov clip to match the height of the gif clip, preserving aspect ratio
    # gif_height = plot_clip.h
    # jump_clip = jump_clip.resize(height=gif_height)
    plot_clip = plot_clip.resize(height=jump_clip.h)
    
    # If the gif is shorter, make it as long as the mov file by looping it
    if plot_clip.duration < jump_clip.duration:
        plot_clip = plot_clip.loop(duration=jump_clip.duration)
    # Or if the gif is longer, cut it to the length of the mov file
    elif plot_clip.duration > jump_clip.duration:
        plot_clip = plot_clip.subclip(0, jump_clip.duration)

    # Calculate the total width of the final video
    total_width = jump_clip.w + plot_clip.w
    final_size = (total_width, jump_clip.h)

    # Create a CompositeVideoClip with the two clips side-by-side
    final_clip = CompositeVideoClip([
        jump_clip.set_position(("left", "center")), 
        plot_clip.set_position((jump_clip.w, "center"))
    ], size=final_size)
    
    # Write the result to a file
    final_clip.write_videofile(output_file, codec='libx264')

Take 240fps stitched video and slow down to 30fps to create slow motion footage and animation of jump

In [62]:
def create_slow_motion_video(input_file, output_file, target_fps=30):
    """
    Creates a slow-motion video from a high frame rate video by reducing the frame rate.

    Parameters:
    - input_file: Path to the input high frame rate video file (e.g., 240 fps).
    - output_file: Path to save the output slow-motion video file.
    - target_fps: The target frame rate for the slow-motion video (default is 30 fps).
    """
    # Load the high frame rate video
    clip = VideoFileClip(input_file)

    # Calculate the slow motion factor
    original_fps = clip.fps
    slow_motion_factor = original_fps / target_fps

    # Adjust the video speed to create slow motion
    slow_motion_clip = clip.fx(vfx.speedx, factor=1/slow_motion_factor)

    # Set the final frame rate
    slow_motion_clip = slow_motion_clip.set_fps(target_fps)

    # Write the result to a file
    slow_motion_clip.write_videofile(output_file, codec='libx264')

## Produce Videos

Render and export relevant videos (caution: slow, requires FFMPEG install)

In [66]:
start_time = 7.5
end_time   = 10.0
offset     = -0.22 # This accounts for a time shift between the mocap data and the video data
heights    = [z for (x, y, z) in smoothed_avg_positions if not np.isnan(z)]
heights    = heights[int(start_time*120):int(end_time*120)]

create_jump_animation(heights, 'out/videos/animated_height_plot.mp4', fps=240, duration=(end_time - start_time))

stitch_videos_side_by_side('data/20240603_test5/T5_3A_thirdcomp.mp4', 
                           'out/videos/animated_height_plot.mp4', 
                           'out/videos/stitched_jump.mp4',
                           start_time,
                           end_time,
                           offset)

create_slow_motion_video('out/videos/stitched_jump.mp4', 'out/videos/slomo_stitched_jump.mp4')

Moviepy - Building video out/videos/stitched_jump.mp4.
MoviePy - Writing audio in stitched_jumpTEMP_MPY_wvf_snd.mp3


                                                       

MoviePy - Done.
Moviepy - Writing video out/videos/stitched_jump.mp4



                                                              

Moviepy - Done !
Moviepy - video ready out/videos/stitched_jump.mp4
Moviepy - Building video out/videos/slomo_stitched_jump.mp4.
MoviePy - Writing audio in slomo_stitched_jumpTEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
Moviepy - Writing video out/videos/slomo_stitched_jump.mp4



                                                              

Moviepy - Done !
Moviepy - video ready out/videos/slomo_stitched_jump.mp4


Display final slow motion stitched video

In [68]:
Video('out/videos/slomo_stitched_jump.mp4')