# 2. Correlation Calculation

Notebook to calculate the correlation between the two sinusoids extracted in notebook 1 and perform statistical tests.

## 1. Import Libraries

In [None]:
import numpy as np
import skvideo
import cv2
from scipy.stats import multivariate_normal
import os
from scipy import signal, stats
import matplotlib.pyplot as plt
import pickle
import pandas as pd

## 2. Define Functions and Classes

In [None]:
# Class for storing two sinusoid arrays
# One for storing laser point in top half
# One for storing visualisation in bottom halfb

class swing_rec:
    
    def __init__(self, name):
        
        self.name = name
        self.top_array = 0
        self.bottom_array = 0

## 3. Calculate Correlations, Summary Statistics, and Plots

### 3.1. One Position Marker

### 3.1.1. Load Pickle File

In [None]:
# Filepath to pickle file containing swing_rec objects
path = ''

# Declare dictionary for correlations
rb_1_dict = {}


for file in sorted(os.listdir(path)):
    
    if '01.pkl' in file:
        
        filepath = path + '/' + file

        # Open pickle file
        with open(filepath, 'rb') as file:
            
            dicty = pickle.load(file)

            # Place swing_rec objects in dict
            for key, item in dicty.items():

                rb_1_dict[key] = item

### 3.1.2. Calculate Correlations

In [None]:
# Declare dictionary for correlations
rb_1_corrs_dict = {}

# Iterate through swing_rec objects
for key, item in sorted(rb_1_dict.items()):
    
    # Set arrays
    top = item.top_array
    bottom = item.bottom_array
    
    # Calculate correlation
    corr = signal.correlate(bottom, top)

    # Calculate correlation lags
    lags = signal.correlation_lags(len(bottom), len(top))

    # Normalise correlation
    corr = corr/np.max(corr)
    
    # Find peaks in correlation
    cpeaks = signal.find_peaks(corr)[0]
    
    # Find the maximum lag - provides latency in video frames
    latency_samps = lags[cpeaks][np.where(lags[cpeaks] > 0)[0]][0]
    
    # Place in dict
    rb_1_corrs_dict[key] = latency_samps
    
    # Plot sinusoids with latency compensation to check
    fig = plt.figure(figsize=(12, 6), dpi=300)

    plt.plot(bottom, label='Visualisation')
    plt.plot(np.roll(top, latency_samps), label='Laser Point')

    plt.legend()

    plt.suptitle(key)
    plt.title('Latency: ' + str(latency_samps) + 's')

### 3.1.3. Place in dataframe and calculate summary statistics for latency in samples

In [None]:
# Extract information from keys
keys_info = [key.split('-')[:3] for key in rb_1_corrs_dict.keys()]

# Create a DataFrame
rb1_df = pd.DataFrame(keys_info, columns=['RB', 'Cycle', 'Phase'])

# Drop duplicates
rb1_df = rb1_df.drop_duplicates()

# Add columns for each last part of the key
for key, value in rb_1_corrs_dict.items():
    last_part = key.split('-')[-1].split('.')[0]
    column_name = f"Rep{last_part}"
    rb1_df[column_name] = rb1_df.apply(lambda row: rb_1_corrs_dict.get(f"{row['RB']}-{row['Cycle']}-{row['Phase']}-{last_part}.mp4"), axis=1)

# Sort columns
rb1_df = rb1_df.sort_index(axis=1)

# Reset the index
rb1_df = rb1_df.reset_index(drop=True)

# Add extra columns for summary statistics
rep_columns = [col for col in rb1_df.columns if col.startswith('Rep')]

# Add summary statistic columns
rb1_df['Min'] = rb1_df[rep_columns].min(axis=1)
rb1_df['Mean'] = rb1_df[rep_columns].mean(axis=1)
rb1_df['Median'] = rb1_df[rep_columns].median(axis=1)
rb1_df['Max'] = rb1_df[rep_columns].max(axis=1)
rb1_df['StdDev'] = rb1_df[rep_columns].std(axis=1)

# Calculate standard error
rb1_df['StdError'] = rb1_df.apply(lambda row: row['StdDev'] / np.sqrt(len(rep_columns)), axis=1)

In [None]:
# Display dataframe
rb1_df

In [None]:
# Save csv
rb1_df.to_csv('rb1_corr_frames.csv', index=False)

### 3.1.4. Place in dataframe and calculate summary statistics for latency in seconds

In [None]:
# Extract information from keys
keys_info = [key.split('-')[:3] for key in rb_1_corrs_dict.keys()]

# Create a DataFrame
rb1_df = pd.DataFrame(keys_info, columns=['RB', 'Cycle', 'Phase'])

# Drop duplicates
rb1_df = rb1_df.drop_duplicates()

# Add columns for each last part of the key
for key, value in rb_1_corrs_dict.items():
    last_part = key.split('-')[-1].split('.')[0]
    column_name = f"Rep{last_part}"
    rb1_df[column_name] = rb1_df.apply(lambda row: rb_1_corrs_dict.get(f"{row['RB']}-{row['Cycle']}-{row['Phase']}-{last_part}.mp4"), axis=1)

# Sort columns
rb1_df = rb1_df.sort_index(axis=1)

# Reset the index
rb1_df = rb1_df.reset_index(drop=True)

# Multiply all values by 1/120 to get value in seconds
rb1_df.iloc[:, 3:] = rb1_df.iloc[:, 3:].multiply(1/120)

# Add extra columns for summary statistics
rep_columns = [col for col in rb1_df.columns if col.startswith('Rep')]

# Add summary statistic columns
rb1_df['Min'] = rb1_df[rep_columns].min(axis=1)
rb1_df['Mean'] = rb1_df[rep_columns].mean(axis=1)
rb1_df['Median'] = rb1_df[rep_columns].median(axis=1)
rb1_df['Max'] = rb1_df[rep_columns].max(axis=1)
rb1_df['StdDev'] = rb1_df[rep_columns].std(axis=1)

# Calculates standard error (assuming standard deviation is population std dev)
rb1_df['StdError'] = rb1_df.apply(lambda row: row['StdDev'] / np.sqrt(len(rep_columns)), axis=1)

In [None]:
# Display dataframe
rb1_df

In [None]:
# Save csv
rb1_df.to_csv('rb1_corr_seconds.csv', index=False)

### 3.1.5. Create Plots

In [None]:
# Plot the means of each phase across performance
plt.figure(figsize=(20, 10))
plt.plot(rb1_df['Cycle'].astype(str) + '-P' + rb1_df['Phase'].astype(str), rb1_df['Mean'], marker='o')
plt.xlabel('Performance Phase')
plt.ylabel('Mean Latency (Seconds)')
plt.xticks(rotation=45, ha='right')
plt.title('Mean Latency Across Performance Phases')
plt.tight_layout()
plt.grid()
plt.show()

In [None]:
# Box plots of cycle means regardless of phase

# Calculate means for each cycle
cycle_means = rb1_df.groupby('Cycle')['Mean'].mean()

# Plot the box plot
plt.figure(figsize=(6, 3.1))
boxprops = dict(facecolor='white', color='black')
plt.boxplot([rb1_df[rb1_df['Cycle'] == cycle]['Mean'] for cycle in rb1_df['Cycle'].unique()],
            labels=rb1_df['Cycle'].unique(), showfliers=True, patch_artist=True, boxprops=boxprops)
plt.xlabel('Cycle')
plt.ylabel('Mean Latency (Seconds)')
plt.tight_layout()
plt.savefig('rb_1_cycle_means_box.png', dpi=150)
plt.show()

In [None]:
# Box plots of phase means regardless of cycle

# Calculate means for each phase
phase_means = rb1_df.groupby('Phase')['Mean'].mean()

# Plot the box plot
plt.figure(figsize=(6, 3.1))
boxprops = dict(facecolor='white', color='black')  # Set box color to white
plt.boxplot([rb1_df[rb1_df['Phase'] == phase]['Mean'] for phase in rb1_df['Phase'].unique()],
            labels=rb1_df['Phase'].unique(), showfliers=True, patch_artist=True, boxprops=boxprops)
plt.xlabel('Phase')
plt.ylabel('Mean Latency (Seconds)')
plt.tight_layout()
plt.savefig('rb_1_phase_means_box.png', dpi=150)
plt.show()

### 3.1.6. Print Summary Statistics

In [None]:
# Summary statistics for the whole DataFrame
whole_dataframe_summary = {
    'Min': round(rb1_df['Mean'].min(), 3),
    'Max': round(rb1_df['Mean'].max(), 3),
    'Mean': round(rb1_df['Mean'].mean(), 3),
    'Median': round(rb1_df['Mean'].median(), 3),
    'StdDev': round(rb1_df['Mean'].std(), 3),
    'StdError': round(rb1_df['Mean'].std() / np.sqrt(len(rb1_df)), 3),
}

print("Summary Statistics for the Whole DataFrame:")
print(whole_dataframe_summary)

### 3.2. Five Position Markers

### 3.2.1. Load Pickle File

In [None]:
# Filepath to pickle file containing swing_rec objects
path = ''

# Declare dictionary for correlations
rb_5_dict = {}


for file in sorted(os.listdir(path)):
    
    if '05.pkl' in file:
        
        filepath = path + '/' + file

        # Open pickle file
        with open(filepath, 'rb') as file:
            
            dicty = pickle.load(file)

            # Place swing_rec objects in dict
            for key, item in dicty.items():

                rb_5_dict[key] = item

### 3.2.2. Calculate Correlations

In [None]:
# Declare dictionary for correlations
rb_5_corrs_dict = {}

# Iterate through swing_rec objects
for key, item in sorted(rb_5_dict.items()):
    
    # Set arrays
    top = item.top_array
    bottom = item.bottom_array
    
    # Calculate correlation
    corr = signal.correlate(bottom, top)

    # Calculate correlation lags
    lags = signal.correlation_lags(len(bottom), len(top))

    # Normalise correlation
    corr = corr/np.max(corr)
    
    # Find peaks in correlation
    cpeaks = signal.find_peaks(corr)[0]
    
    # Find the maximum lag - provides latency in video frames
    latency_samps = lags[cpeaks][np.where(lags[cpeaks] > 0)[0]][0]
    
    # Place in dict
    rb_5_corrs_dict[key] = latency_samps
    
    # Plot sinusoids with latency compensation to check
    fig = plt.figure(figsize=(12, 6), dpi=300)

    plt.plot(bottom, label='Visualisation')
    plt.plot(np.roll(top, latency_samps), label='Laser Point')

    plt.legend()

    plt.suptitle(key)
    plt.title('Latency: ' + str(latency_samps) + 's')

### 3.2.3. Place in dataframe and calculate summary statistics for latency in samples

In [None]:
# Extract information from keys
keys_info = [key.split('-')[:3] for key in rb_5_corrs_dict.keys()]

# Create a DataFrame
rb5_df = pd.DataFrame(keys_info, columns=['RB', 'Cycle', 'Phase'])

# Drop duplicates
rb5_df = rb5_df.drop_duplicates()

# Add columns for each last part of the key
for key, value in rb_5_corrs_dict.items():
    last_part = key.split('-')[-1].split('.')[0]
    column_name = f"Rep{last_part}"
    rb5_df[column_name] = rb5_df.apply(lambda row: rb_5_corrs_dict.get(f"{row['RB']}-{row['Cycle']}-{row['Phase']}-{last_part}.mp4"), axis=1)

# Sort columns
rb5_df = rb5_df.sort_index(axis=1)

# Reset the index
rb5_df = rb5_df.reset_index(drop=True)

# Add extra columns for summary statistics
rep_columns = [col for col in rb5_df.columns if col.startswith('Rep')]

# Add summary statistic columns
rb5_df['Min'] = rb5_df[rep_columns].min(axis=1)
rb5_df['Mean'] = rb5_df[rep_columns].mean(axis=1)
rb5_df['Median'] = rb5_df[rep_columns].median(axis=1)
rb5_df['Max'] = rb5_df[rep_columns].max(axis=1)
rb5_df['StdDev'] = rb5_df[rep_columns].std(axis=1)

# Calculate standard error
rb5_df['StdError'] = rb5_df.apply(lambda row: row['StdDev'] / np.sqrt(len(rep_columns)), axis=1)

In [None]:
# Display dataframe
rb5_df

In [None]:
# Save csv
rb5_df.to_csv('rb5_corr_frames.csv', index=False)

### 3.2.4. Place in dataframe and calculate summary statistics for latency in seconds

In [None]:
# Extract information from keys
keys_info = [key.split('-')[:3] for key in rb_5_corrs_dict.keys()]

# Create a DataFrame
rb5_df = pd.DataFrame(keys_info, columns=['RB', 'Cycle', 'Phase'])

# Drop duplicates
rb5_df = rb5_df.drop_duplicates()

# Add columns for each last part of the key
for key, value in rb_5_corrs_dict.items():
    last_part = key.split('-')[-1].split('.')[0]
    column_name = f"Rep{last_part}"
    rb5_df[column_name] = rb5_df.apply(lambda row: rb_5_corrs_dict.get(f"{row['RB']}-{row['Cycle']}-{row['Phase']}-{last_part}.mp4"), axis=1)

# Sort columns
rb5_df = rb5_df.sort_index(axis=1)

# Reset the index
rb5_df = rb5_df.reset_index(drop=True)

# Multiply all values by 1/120 to get value in seconds
rb5_df.iloc[:, 3:] = rb5_df.iloc[:, 3:].multiply(1/120)

# Add extra columns for summary statistics
rep_columns = [col for col in rb5_df.columns if col.startswith('Rep')]

# Add summary statistic columns
rb5_df['Min'] = rb5_df[rep_columns].min(axis=1)
rb5_df['Mean'] = rb5_df[rep_columns].mean(axis=1)
rb5_df['Median'] = rb5_df[rep_columns].median(axis=1)
rb5_df['Max'] = rb5_df[rep_columns].max(axis=1)
rb5_df['StdDev'] = rb5_df[rep_columns].std(axis=1)

# Calculates standard error (assuming standard deviation is population std dev)
rb5_df['StdError'] = rb5_df.apply(lambda row: row['StdDev'] / np.sqrt(len(rep_columns)), axis=1)

In [None]:
# Display dataframe
rb5_df

In [None]:
# Save csv
rb5_df.to_csv('rb5_corr_seconds.csv', index=False)

### 3.2.5. Create Plots

In [None]:
# Plot the means of each phase across performance
plt.figure(figsize=(20, 10))
plt.plot(rb5_df['Cycle'].astype(str) + '-P' + rb5_df['Phase'].astype(str), rb5_df['Mean'], marker='o')
plt.xlabel('Performance Phase')
plt.ylabel('Mean Latency (Seconds)')
plt.xticks(rotation=45, ha='right')
plt.title('Mean Latency Across Performance Phases')
plt.tight_layout()
plt.grid()
plt.show()

In [None]:
# Box plots of cycle means regardless of phase

# Calculate means for each cycle
cycle_means = rb5_df.groupby('Cycle')['Mean'].mean()

# Plot the box plot
plt.figure(figsize=(6, 3.1))
boxprops = dict(facecolor='white', color='black')
plt.boxplot([rb5_df[rb5_df['Cycle'] == cycle]['Mean'] for cycle in rb5_df['Cycle'].unique()],
            labels=rb5_df['Cycle'].unique(), showfliers=True, patch_artist=True, boxprops=boxprops)
plt.xlabel('Cycle')
plt.ylabel('Mean Latency (Seconds)')
plt.tight_layout()
plt.savefig('rb_5_cycle_means_box.png', dpi=150)
plt.show()

In [None]:
# Box plots of phase means regardless of cycle

# Calculate means for each phase
phase_means = rb5_df.groupby('Phase')['Mean'].mean()

# Plot the box plot
plt.figure(figsize=(6, 3.1))
boxprops = dict(facecolor='white', color='black')  # Set box color to white
plt.boxplot([rb5_df[rb5_df['Phase'] == phase]['Mean'] for phase in rb5_df['Phase'].unique()],
            labels=rb5_df['Phase'].unique(), showfliers=True, patch_artist=True, boxprops=boxprops)
plt.xlabel('Phase')
plt.ylabel('Mean Latency (Seconds)')
plt.tight_layout()
plt.savefig('rb_5_phase_means_box.png', dpi=150)
plt.show()

### 3.2.6. Print Summary Statistics

In [None]:
# Summary statistics for the whole DataFrame
whole_dataframe_summary = {
    'Min': round(rb5_df['Mean'].min(), 3),
    'Max': round(rb5_df['Mean'].max(), 3),
    'Mean': round(rb5_df['Mean'].mean(), 3),
    'Median': round(rb5_df['Mean'].median(), 3),
    'StdDev': round(rb5_df['Mean'].std(), 3),
    'StdError': round(rb5_df['Mean'].std() / np.sqrt(len(rb5_df)), 3),
}

print("Summary Statistics for the Whole DataFrame:")
print(whole_dataframe_summary)

### 3.3. Ten Position Markers

### 3.3.1. Load Pickle File

In [None]:
# Filepath to pickle file containing swing_rec objects
path = ''

# Declare dictionary for correlations
rb_10_dict = {}


for file in sorted(os.listdir(path)):
    
    if '10.pkl' in file:
        
        filepath = path + '/' + file

        # Open pickle file
        with open(filepath, 'rb') as file:
            
            dicty = pickle.load(file)

            # Place swing_rec objects in dict
            for key, item in dicty.items():

                rb_10_dict[key] = item

### 3.3.2. Calculate Correlations

In [None]:
# Declare dictionary for correlations
rb_10_corrs_dict = {}

# Iterate through swing_rec objects
for key, item in sorted(rb_10_dict.items()):
    
    # Set arrays
    top = item.top_array
    bottom = item.bottom_array
    
    # Calculate correlation
    corr = signal.correlate(bottom, top)

    # Calculate correlation lags
    lags = signal.correlation_lags(len(bottom), len(top))

    # Normalise correlation
    corr = corr/np.max(corr)
    
    # Find peaks in correlation
    cpeaks = signal.find_peaks(corr)[0]
    
    # Find the maximum lag - provides latency in video frames
    latency_samps = lags[cpeaks][np.where(lags[cpeaks] > 0)[0]][0]
    
    # Place in dict
    rb_10_corrs_dict[key] = latency_samps
    
    # Plot sinusoids with latency compensation to check
    fig = plt.figure(figsize=(12, 6), dpi=300)

    plt.plot(bottom, label='Visualisation')
    plt.plot(np.roll(top, latency_samps), label='Laser Point')

    plt.legend()

    plt.suptitle(key)
    plt.title('Latency: ' + str(latency_samps) + 's')

### 3.3.3. Place in dataframe and calculate summary statistics for latency in samples

In [None]:
# Extract information from keys
keys_info = [key.split('-')[:3] for key in rb_10_corrs_dict.keys()]

# Create a DataFrame
rb10_df = pd.DataFrame(keys_info, columns=['RB', 'Cycle', 'Phase'])

# Drop duplicates
rb10_df = rb10_df.drop_duplicates()

# Add columns for each last part of the key
for key, value in rb_10_corrs_dict.items():
    last_part = key.split('-')[-1].split('.')[0]
    column_name = f"Rep{last_part}"
    rb10_df[column_name] = rb10_df.apply(lambda row: rb_10_corrs_dict.get(f"{row['RB']}-{row['Cycle']}-{row['Phase']}-{last_part}.mp4"), axis=1)

# Sort columns
rb10_df = rb10_df.sort_index(axis=1)

# Reset the index
rb10_df = rb10_df.reset_index(drop=True)

# Add extra columns for summary statistics
rep_columns = [col for col in rb10_df.columns if col.startswith('Rep')]

# Add summary statistic columns
rb10_df['Min'] = rb10_df[rep_columns].min(axis=1)
rb10_df['Mean'] = rb10_df[rep_columns].mean(axis=1)
rb10_df['Median'] = rb10_df[rep_columns].median(axis=1)
rb10_df['Max'] = rb10_df[rep_columns].max(axis=1)
rb10_df['StdDev'] = rb10_df[rep_columns].std(axis=1)

# Calculate standard error
rb10_df['StdError'] = rb10_df.apply(lambda row: row['StdDev'] / np.sqrt(len(rep_columns)), axis=1)

In [None]:
# Display dataframe
rb10_df

In [None]:
# Save csv
rb10_df.to_csv('rb10_corr_frames.csv', index=False)

### 3.3.4. Place in dataframe and calculate summary statistics for latency in seconds

In [None]:
# Extract information from keys
keys_info = [key.split('-')[:3] for key in rb_10_corrs_dict.keys()]

# Create a DataFrame
rb10_df = pd.DataFrame(keys_info, columns=['RB', 'Cycle', 'Phase'])

# Drop duplicates
rb10_df = rb10_df.drop_duplicates()

# Add columns for each last part of the key
for key, value in rb_10_corrs_dict.items():
    last_part = key.split('-')[-1].split('.')[0]
    column_name = f"Rep{last_part}"
    rb10_df[column_name] = rb10_df.apply(lambda row: rb_10_corrs_dict.get(f"{row['RB']}-{row['Cycle']}-{row['Phase']}-{last_part}.mp4"), axis=1)

# Sort columns
rb10_df = rb10_df.sort_index(axis=1)

# Reset the index
rb10_df = rb10_df.reset_index(drop=True)

# Multiply all values by 1/120 to get value in seconds
rb10_df.iloc[:, 3:] = rb10_df.iloc[:, 3:].multiply(1/120)

# Add extra columns for summary statistics
rep_columns = [col for col in rb10_df.columns if col.startswith('Rep')]

# Add summary statistic columns
rb10_df['Min'] = rb10_df[rep_columns].min(axis=1)
rb10_df['Mean'] = rb10_df[rep_columns].mean(axis=1)
rb10_df['Median'] = rb10_df[rep_columns].median(axis=1)
rb10_df['Max'] = rb10_df[rep_columns].max(axis=1)
rb10_df['StdDev'] = rb10_df[rep_columns].std(axis=1)

# Calculates standard error (assuming standard deviation is population std dev)
rb10_df['StdError'] = rb10_df.apply(lambda row: row['StdDev'] / np.sqrt(len(rep_columns)), axis=1)

In [None]:
# Display dataframe
rb10_df

In [None]:
# Save csv
rb10_df.to_csv('rb10_corr_seconds.csv', index=False)

### 3.3.5. Create Plots

In [None]:
# Plot the means of each phase across performance
plt.figure(figsize=(20, 10))
plt.plot(rb10_df['Cycle'].astype(str) + '-P' + rb10_df['Phase'].astype(str), rb10_df['Mean'], marker='o')
plt.xlabel('Performance Phase')
plt.ylabel('Mean Latency (Seconds)')
plt.xticks(rotation=45, ha='right')
plt.title('Mean Latency Across Performance Phases')
plt.tight_layout()
plt.grid()
plt.show()

In [None]:
# Box plots of cycle means regardless of phase

# Calculate means for each cycle
cycle_means = rb10_df.groupby('Cycle')['Mean'].mean()

# Plot the box plot
plt.figure(figsize=(6, 3.1))
boxprops = dict(facecolor='white', color='black')
plt.boxplot([rb10_df[rb10_df['Cycle'] == cycle]['Mean'] for cycle in rb10_df['Cycle'].unique()],
            labels=rb10_df['Cycle'].unique(), showfliers=True, patch_artist=True, boxprops=boxprops)
plt.xlabel('Cycle')
plt.ylabel('Mean Latency (Seconds)')
plt.tight_layout()
plt.savefig('rb_10_cycle_means_box.png', dpi=150)
plt.show()

In [None]:
# Box plots of phase means regardless of cycle

# Calculate means for each phase
phase_means = rb10_df.groupby('Phase')['Mean'].mean()

# Plot the box plot
plt.figure(figsize=(6, 3.1))
boxprops = dict(facecolor='white', color='black')  # Set box color to white
plt.boxplot([rb10_df[rb10_df['Phase'] == phase]['Mean'] for phase in rb10_df['Phase'].unique()],
            labels=rb10_df['Phase'].unique(), showfliers=True, patch_artist=True, boxprops=boxprops)
plt.xlabel('Phase')
plt.ylabel('Mean Latency (Seconds)')
plt.tight_layout()
plt.savefig('rb_10_phase_means_box.png', dpi=150)
plt.show()

### 3.3.6. Print Summary Statistics

In [None]:
# Summary statistics for the whole DataFrame
whole_dataframe_summary = {
    'Min': round(rb10_df['Mean'].min(), 3),
    'Max': round(rb10_df['Mean'].max(), 3),
    'Mean': round(rb10_df['Mean'].mean(), 3),
    'Median': round(rb10_df['Mean'].median(), 3),
    'StdDev': round(rb10_df['Mean'].std(), 3),
    'StdError': round(rb10_df['Mean'].std() / np.sqrt(len(rb10_df)), 3),
}

print("Summary Statistics for the Whole DataFrame:")
print(whole_dataframe_summary)

## 4. Create summarys plots

### 4.1. Box Plot of latency mean between position markers across all cycles/phases

In [None]:
dfs = [rb1_df, rb5_df, rb10_df]
labels = [1, 5, 10]

# Plot box plots for the 'Mean' column in each dataframe
plt.figure(figsize=(6, 3.1))
boxprops = dict(facecolor='white', color='black')
for j, (i, df) in zip(labels, enumerate(dfs)):
    plt.boxplot(df['Mean'], positions=[i], labels=[f'{j}'], showfliers=True, patch_artist=True, boxprops=boxprops)

plt.xlabel('Position Markers')
plt.ylabel('Latency (Seconds)')
plt.tight_layout()
plt.savefig('position_markers_means_box.png', dpi=150)
plt.show()

## 4.2. Plot of latency across performance

In [None]:
# Plot the means of each phase across performance
plt.figure(figsize=(20, 10))
plt.plot(rb1_df['Cycle'].astype(str) + '-P' + rb1_df['Phase'].astype(str), rb1_df['Mean'], marker='o', label='Position Marker: 1')
plt.plot(rb5_df['Cycle'].astype(str) + '-P' + rb5_df['Phase'].astype(str), rb5_df['Mean'], marker='o', label='Position Marker: 5')
plt.plot(rb10_df['Cycle'].astype(str) + '-P' + rb10_df['Phase'].astype(str), rb10_df['Mean'], marker='o', label='Position Marker: 10')
plt.xlabel('Performance Phase')
plt.ylabel('Mean Latency (Seconds)')
plt.xticks(rotation=45, ha='right')
plt.title('Mean Latency Across Performance Phases')
plt.legend()
plt.tight_layout()
plt.grid()
plt.show()

## 5. Statistical Tests

### 5.1. Tests across number of position markers

In [None]:
# Format dataframes as np.arrays
rb1_data = rb1_df[rep_columns].to_numpy()
rb5_data = rb5_df[rep_columns].to_numpy()
rb10_data = rb10_df[rep_columns].to_numpy()

rb1_data = rb1_data.flatten()
rb5_data = rb5_data.flatten()
rb10_data = rb10_data.flatten()

# Remove nans for safety
rb1_data = rb1_data[~np.isnan(rb1_data)]
rb5_data = rb5_data[~np.isnan(rb5_data)]
rb10_data = rb10_data[~np.isnan(rb10_data)]

### 5.1.1. Levene's Test

In [None]:
stat, p = stats.levene(rb1_data, rb5_data, rb10_data, center='mean')
statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Levene Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.1.2. ANOVA

In [None]:
# Function for Welch's ANOVA
# From https://github.com/scipy/scipy/issues/11122#issuecomment-587964214

def welch_anova_np(*args, var_equal=False):
    # https://svn.r-project.org/R/trunk/src/library/stats/R/oneway.test.R
    # translated from R Welch ANOVA (not assuming equal variance)

    args = [np.asarray(arg, dtype=float) for arg in args]
    k = len(args)
    ni =np.array([len(arg) for arg in args])
    mi =np.array([np.mean(arg) for arg in args])
    vi =np.array([np.var(arg,ddof=1) for arg in args])
    wi = ni/vi

    tmp =sum((1-wi/sum(wi))**2 / (ni-1))
    tmp /= (k**2 -1)

    dfbn = k - 1
    dfwn = 1 / (3 * tmp)

    m = sum(mi*wi) / sum(wi)
    f = sum(wi * (mi - m)**2) /((dfbn) * (1 + 2 * (dfbn - 1) * tmp))
    prob = stats.f.sf(dfbn, dfwn, f)   # equivalent to stats.f.sf
    return f, prob

In [None]:
stat, p = welch_anova_np(rb1_data, rb5_data, rb10_data)

statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.2. Tests across cycles regardless of phase

### 5.2.1. One Position Marker

In [None]:
# Declare dict of lists for separating cycles
cycle_dict = {'c1':[],
             'c2':[],
             'c3':[],
             'c4':[],
             'c5':[],
             'c6':[],
             'c7':[]}

# Place df rows into dict lists
for _, row in rb1_df.iterrows():
    
    key = row['Cycle']
    
    for col, value in row.items():
        
        if 'Rep' in col:
            
            cycle_dict[key].append(value) 

In [None]:
# Format as np.arrays
c1 = np.array(cycle_dict['c1'])
c2 = np.array(cycle_dict['c2'])
c3 = np.array(cycle_dict['c3'])
c4 = np.array(cycle_dict['c4'])
c5 = np.array(cycle_dict['c5'])
c6 = np.array(cycle_dict['c6'])
c7 = np.array(cycle_dict['c7'])

c1 = c1.flatten()
c2 = c2.flatten()
c3 = c3.flatten()
c4 = c4.flatten()
c5 = c5.flatten()
c6 = c6.flatten()
c7 = c7.flatten()

# Remove nans for safety
c1 = c1[~np.isnan(c1)]
c2 = c2[~np.isnan(c2)]
c3 = c3[~np.isnan(c3)]
c4 = c4[~np.isnan(c4)]
c5 = c5[~np.isnan(c5)]
c6 = c6[~np.isnan(c6)]
c7 = c7[~np.isnan(c7)]

### 5.2.1.1. Levene's Test

In [None]:
stat, p = stats.levene(c1, c2, c3, c4, c5, c6, c7, center='mean')
statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Levene Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.2.1.2. ANOVA

In [None]:
stat, p = welch_anova_np(c1, c2, c3, c4, c5, c6, c7)

statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.2.2. Five Position Markers

In [None]:
# Declare dict of lists for separating cycles
cycle_dict = {'c1':[],
             'c2':[],
             'c3':[],
             'c4':[],
             'c5':[],
             'c6':[],
             'c7':[]}

# Place df rows into dict lists
for _, row in rb5_df.iterrows():
    
    key = row['Cycle']
    
    for col, value in row.items():
        
        if 'Rep' in col:
            
            cycle_dict[key].append(value) 

In [None]:
# Format as np.arrays
c1 = np.array(cycle_dict['c1'])
c2 = np.array(cycle_dict['c2'])
c3 = np.array(cycle_dict['c3'])
c4 = np.array(cycle_dict['c4'])
c5 = np.array(cycle_dict['c5'])
c6 = np.array(cycle_dict['c6'])
c7 = np.array(cycle_dict['c7'])

c1 = c1.flatten()
c2 = c2.flatten()
c3 = c3.flatten()
c4 = c4.flatten()
c5 = c5.flatten()
c6 = c6.flatten()
c7 = c7.flatten()

# Remove nans for safety
c1 = c1[~np.isnan(c1)]
c2 = c2[~np.isnan(c2)]
c3 = c3[~np.isnan(c3)]
c4 = c4[~np.isnan(c4)]
c5 = c5[~np.isnan(c5)]
c6 = c6[~np.isnan(c6)]
c7 = c7[~np.isnan(c7)]

### 5.2.2.1. Levene's Test

In [None]:
stat, p = stats.levene(c1, c2, c3, c4, c5, c6, c7, center='mean')
statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Levene Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.2.2.2. ANOVA

In [None]:
stat, p = welch_anova_np(c1, c2, c3, c4, c5, c6, c7)

statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.2.3. Ten Position Markers

In [None]:
# Declare dict of lists for separating cycles
cycle_dict = {'c1':[],
             'c2':[],
             'c3':[],
             'c4':[],
             'c5':[],
             'c6':[],
             'c7':[]}

# Place df rows into dict lists
for _, row in rb10_df.iterrows():
    
    key = row['Cycle']
    
    for col, value in row.items():
        
        if 'Rep' in col:
            
            cycle_dict[key].append(value) 

In [None]:
# Format as np.arrays
c1 = np.array(cycle_dict['c1'])
c2 = np.array(cycle_dict['c2'])
c3 = np.array(cycle_dict['c3'])
c4 = np.array(cycle_dict['c4'])
c5 = np.array(cycle_dict['c5'])
c6 = np.array(cycle_dict['c6'])
c7 = np.array(cycle_dict['c7'])

c1 = c1.flatten()
c2 = c2.flatten()
c3 = c3.flatten()
c4 = c4.flatten()
c5 = c5.flatten()
c6 = c6.flatten()
c7 = c7.flatten()

# Remove nans for safety
c1 = c1[~np.isnan(c1)]
c2 = c2[~np.isnan(c2)]
c3 = c3[~np.isnan(c3)]
c4 = c4[~np.isnan(c4)]
c5 = c5[~np.isnan(c5)]
c6 = c6[~np.isnan(c6)]
c7 = c7[~np.isnan(c7)]

### 5.2.3.1. Levene's Test

In [None]:
stat, p = stats.levene(c1, c2, c3, c4, c5, c6, c7, center='mean')
statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Levene Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.2.3.2. ANOVA

In [None]:
stat, p = welch_anova_np(c1, c2, c3, c4, c5, c6, c7)

statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.3. Tests across phases regardless of cycle

### 5.3.1. One Position Marker

In [None]:
# Declare dict of lists for separating phases
phase_dict = {'p1':[],
             'p2':[],
             'p3':[],
             'p4':[],
             'p5':[],
             'p6':[],
             'p7':[],
             'p8':[],
             'p9':[]}

# Place df rows into dict lists
for _, row in rb1_df.iterrows():
    
    key = row['Phase']
    
    for col, value in row.items():
        
        if 'Rep' in col:
            
            phase_dict[key].append(value) 

In [None]:
# Format as np.arrays
p1 = np.array(phase_dict['p1'])
p2 = np.array(phase_dict['p2'])
p3 = np.array(phase_dict['p3'])
p4 = np.array(phase_dict['p4'])
p5 = np.array(phase_dict['p5'])
p6 = np.array(phase_dict['p6'])
p7 = np.array(phase_dict['p7'])
p8 = np.array(phase_dict['p8'])
p9 = np.array(phase_dict['p9'])

p1 = p1.flatten()
p2 = p2.flatten()
p3 = p3.flatten()
p4 = p4.flatten()
p5 = p5.flatten()
p6 = p6.flatten()
p7 = p7.flatten()
p8 = p8.flatten()
p9 = p9.flatten()

# Remove nans for safety
p1 = p1[~np.isnan(p1)]
p2 = p2[~np.isnan(p2)]
p3 = p3[~np.isnan(p3)]
p4 = p4[~np.isnan(p4)]
p5 = p5[~np.isnan(p5)]
p6 = p6[~np.isnan(p6)]
p7 = p7[~np.isnan(p7)]
p8 = p8[~np.isnan(p8)]
p9 = p9[~np.isnan(p9)]

### 5.3.1.1. Levene's Test

In [None]:
stat, p = stats.levene(p1, p2, p3, p4, p5, p6, p7, p8, p9, center='mean')
statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Levene Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.3.1.2. ANOVA

In [None]:
stat, p = welch_anova_np(p1, p2, p3, p4, p5, p6, p7, p8, p9)

statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.3.2. Five Position Markers

In [None]:
# Declare dict of lists for separating phases
phase_dict = {'p1':[],
             'p2':[],
             'p3':[],
             'p4':[],
             'p5':[],
             'p6':[],
             'p7':[],
             'p8':[],
             'p9':[]}

# Place df rows into dict lists
for _, row in rb5_df.iterrows():
    
    key = row['Phase']
    
    for col, value in row.items():
        
        if 'Rep' in col:
            
            phase_dict[key].append(value) 

In [None]:
# Format as np.arrays
p1 = np.array(phase_dict['p1'])
p2 = np.array(phase_dict['p2'])
p3 = np.array(phase_dict['p3'])
p4 = np.array(phase_dict['p4'])
p5 = np.array(phase_dict['p5'])
p6 = np.array(phase_dict['p6'])
p7 = np.array(phase_dict['p7'])
p8 = np.array(phase_dict['p8'])
p9 = np.array(phase_dict['p9'])

p1 = p1.flatten()
p2 = p2.flatten()
p3 = p3.flatten()
p4 = p4.flatten()
p5 = p5.flatten()
p6 = p6.flatten()
p7 = p7.flatten()
p8 = p8.flatten()
p9 = p9.flatten()

# Remove nans for safety
p1 = p1[~np.isnan(p1)]
p2 = p2[~np.isnan(p2)]
p3 = p3[~np.isnan(p3)]
p4 = p4[~np.isnan(p4)]
p5 = p5[~np.isnan(p5)]
p6 = p6[~np.isnan(p6)]
p7 = p7[~np.isnan(p7)]
p8 = p8[~np.isnan(p8)]
p9 = p9[~np.isnan(p9)]

### 5.3.2.1. Levene's Test

In [None]:
stat, p = stats.levene(p1, p2, p3, p4, p5, p6, p7, p8, p9, center='mean')
statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Levene Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.3.2.2. ANOVA

In [None]:
stat, p = stats.f_oneway(p1, p2, p3, p4, p5, p6, p7, p8, p9)

statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.3.3. Ten Position Markers

In [None]:
# Declare dict of lists for separating phases
phase_dict = {'p1':[],
             'p2':[],
             'p3':[],
             'p4':[],
             'p5':[],
             'p6':[],
             'p7':[],
             'p8':[],
             'p9':[]}

# Place df rows into dict lists
for _, row in rb10_df.iterrows():
    
    key = row['Phase']
    
    for col, value in row.items():
        
        if 'Rep' in col:
            
            phase_dict[key].append(value) 

In [None]:
# Format as np.arrays
p1 = np.array(phase_dict['p1'])
p2 = np.array(phase_dict['p2'])
p3 = np.array(phase_dict['p3'])
p4 = np.array(phase_dict['p4'])
p5 = np.array(phase_dict['p5'])
p6 = np.array(phase_dict['p6'])
p7 = np.array(phase_dict['p7'])
p8 = np.array(phase_dict['p8'])
p9 = np.array(phase_dict['p9'])

p1 = p1.flatten()
p2 = p2.flatten()
p3 = p3.flatten()
p4 = p4.flatten()
p5 = p5.flatten()
p6 = p6.flatten()
p7 = p7.flatten()
p8 = p8.flatten()
p9 = p9.flatten()

# Remove nans for safety
p1 = p1[~np.isnan(p1)]
p2 = p2[~np.isnan(p2)]
p3 = p3[~np.isnan(p3)]
p4 = p4[~np.isnan(p4)]
p5 = p5[~np.isnan(p5)]
p6 = p6[~np.isnan(p6)]
p7 = p7[~np.isnan(p7)]
p8 = p8[~np.isnan(p8)]
p9 = p9[~np.isnan(p9)]

### 5.3.3.1. Levene's Test

In [None]:
stat, p = stats.levene(p1, p2, p3, p4, p5, p6, p7, p8, p9, center='mean')
statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Levene Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)

### 5.3.3.2. ANOVA

In [None]:
stat, p = stats.f_oneway(p1, p2, p3, p4, p5, p6, p7, p8, p9)

statistic_rounded = round(stat, 3)
p_value_rounded = round(p, 3)

print("Statistic:", statistic_rounded)
print("P-value:", p_value_rounded)