
Code by Runbei Cheng, Oct 2023,

for the Oxford NIL YoYo Intermittent Recovery Test (Level 1) workload study.

Python Versio: 3.9

Email: runbei.cheng@eng.ox.ac.uk

In [None]:
# Imports
# Run this before using one of the data loading examples below
import numpy as np
import pandas as pd
import math
import matplotlib.pyplot as plt
from scipy.io import wavfile

In [None]:
# Load metadata files
# Run this before using one of the data loading examples below
Rounds = np.array([['FirstRound',1], ['SecondRound',2]])
Metadata = dict()
for Round,Round_idx in Rounds:
    file_dir = f"./{Round}/Metadata_{Round_idx}.csv"
    df = pd.read_csv(file_dir, header = 0, index_col = 0, encoding='unicode_escape')
    Metadata[Round] = df
    del df

print('Info included in the metadata file:')
print(Metadata['FirstRound'].columns)
print('-------------------------------------------------')
print('Metadata for subset 1')
display(Metadata['FirstRound'])
print('-------------------------------------------------')
print('Metadata for subset 2')
display(Metadata['SecondRound'])

In [None]:
""" Functions for this script """
# function to remove zero value artifacts by extrapolation
def HR_zero_artifact_removal(hr,t):
    Zero_Elements = np.nonzero(hr==0)[0]
    Non_Zero_Elements = np.nonzero(hr)[0]
    for Zero_idx in Zero_Elements:
        Below = Non_Zero_Elements[Non_Zero_Elements<Zero_idx]
        Below_idx = []
        if Below.size>0:
            Below_idx = max(Below)

        Above = Non_Zero_Elements[Non_Zero_Elements>Zero_idx]
        Above_idx = []
        if Above.size>0:
            Above_idx = min(Above)

        if not Below_idx and Above_idx:
            hr[Zero_idx] = hr[Above_idx]
        elif not Above_idx and Below_idx:
            hr[Zero_idx] = hr[Below_idx]
        elif not Above_idx and not Below_idx:
            pass
        elif Above_idx and Below_idx:
            hr[Zero_idx] = round(hr[Below_idx] + (hr[Above_idx]-hr[Below_idx]) * (t[Zero_idx]-t[Below_idx]) / (t[Above_idx]-t[Below_idx]))
    return hr

# function to load the three types of data files in this dataset
def Load_Data(metadata, subset = 'FirstRound', data_type = 'COSMED', must_have_HR = 0, must_have_audio = 0, auto_crop_COSMED = 0, hr_artifact_removal = 0):
    # subset = 'FirstRound', set this value to load different subsets: 'FirstRound' or 'SecondRound'
    # data_type = 'COSMED', set this value to load different types of data : 'COSMED' or 'RPE' or 'Audio'
    # if must_have_HR = 1, only subjects with heart rate data will be loaded
    # if must_have_audio = 1, only subjects with audio data will be loaded
    # if auto_crop_COSMED = 1, load COSMED data collected during the duration of the yoyo test without any of the data before or after
    # if hr_artifact_removal = 1, remove zero value noise in heart rate data due to motion artifacts by extrapolation
    Subjects = np.array(list(metadata[subset].index))
    Subject_load_idx = np.ones(np.shape(Subjects))

    if data_type == 'Audio':
        must_have_audio = 1

    if must_have_HR:
        Subject_load_idx *= np.array(list(metadata[subset]['Heart Rate Included?']))
    if must_have_audio:
        Subject_load_idx *= np.array(list(metadata[subset]['Audio Included?']))
    
    Subjects_to_load = Subjects[Subject_load_idx.astype(bool)]
    data_out = dict()
    
    for Subject in Subjects_to_load:
        file_name = metadata[subset][f'{data_type} File Name'][Subject]
        print(f'Loading {file_name}')
        file_dir = f"./{subset}/{Subject}/{file_name}"
        if data_type == 'Audio':
            _, data = wavfile.read(file_dir)
        elif data_type == 'COSMED':
            if auto_crop_COSMED:
                # crop COSMED data down to just the YoYo test
                YoYo_Start = metadata[subset]['YoYo Test Start Index'][Subject]
                YoYo_End = metadata[subset]['YoYo Test End Index'][Subject]
                data = pd.read_csv(file_dir, header = [0,1])
                if hr_artifact_removal:
                    # remove zero value artifacts by extrapolation
                    hr = data.HR.bpm.to_numpy()
                    Time = data.t.s
                    hr = HR_zero_artifact_removal(hr,Time)
                    data.HR = hr

                data = data[(YoYo_Start-1):YoYo_End].reset_index() # -1 to convert to python indexing
                Time = data.t.s
                NewTime = Time - Time[0]
                data.t = NewTime
            else:
                data = pd.read_csv(file_dir, header = [0,1])

                if hr_artifact_removal:
                    # remove zero value artifacts by extrapolation
                    hr = data.HR.bpm.to_numpy()
                    Time = data.t.s
                    hr = HR_zero_artifact_removal(hr,Time)
                    data.HR = hr

        elif data_type == 'RPE':
            data = pd.read_csv(file_dir, header = 0)
        data_out[Subject] = data
    
    return data_out

In [None]:
# Example: load subset 1 COSMED data
COSMED_1 = Load_Data(Metadata, subset = 'FirstRound', data_type = 'COSMED')

print('---------------------')
print(f'{len(list(COSMED_1.keys()))} subjects loaded')
print('---------------------')
COSMED_column_names = list(COSMED_1[list(COSMED_1.keys())[0]].columns)
print(f'list of COSMED variables ({len(COSMED_column_names)} in total):')
for i in range(math.ceil(len(COSMED_column_names)/5)): # split into multiple lines for readability
    idx_0 = i*5
    idx_1 = idx_0+5
    if idx_1>len(COSMED_column_names):
        print(COSMED_column_names[idx_0:])
    else:
        print(COSMED_column_names[idx_0:idx_1])
print('---------------------')
print("Here's a sample")
print(f'{list(COSMED_1.keys())[0]}:')
Sample_Subject_idx = 5 # change this value to view different samples: int < len(hr_1.keys())
display(COSMED_1[list(COSMED_1.keys())[Sample_Subject_idx]])


In [None]:
# Example: load subset 1 audio data
Audio_1 = Load_Data(Metadata, subset = 'FirstRound', data_type = 'Audio')
Fs = 8000 # the sampling freaquency of all the audio in this dataset is 8kHz
Sample_Subject_idx = 7 # change this value to view different samples: int < len(hr_1.keys())
Audio_Sample = Audio_1[list(Audio_1.keys())[Sample_Subject_idx]]
T_Sample = np.linspace(0,len(Audio_Sample)/Fs,num = len(Audio_Sample))
print('---------------------')
print(f'{len(list(Audio_1.keys()))} subjects loaded')
print('---------------------')
print("Here's a sample")
plt.plot(T_Sample[math.floor(325*len(Audio_Sample)/1000):math.floor(335*len(Audio_Sample)/1000)],
         Audio_Sample[math.floor(325*len(Audio_Sample)/1000):math.floor(335*len(Audio_Sample)/1000)])
plt.xlabel('Time (s)')
plt.ylabel('Power (AU)')
plt.title(list(Audio_1.keys())[0])
plt.show()


In [None]:
# Example: load subset 1 RPE data ONLY for subjects with heart rate data
RPE_1_hr = Load_Data(Metadata, subset = 'FirstRound', data_type = 'RPE', must_have_HR = 1)

print('---------------------')
print(f'{len(list(RPE_1_hr.keys()))} subjects loaded')
print('---------------------')
print('list of RPE sheet variables:')
RPE_column_names = list(RPE_1_hr[list(RPE_1_hr.keys())[0]].columns)
print(RPE_column_names)
print('---------------------')
print("Here's a sample")
print(f'{list(RPE_1_hr.keys())[0]}:')
Sample_Subject_idx = 5 # change this value to view different samples: int < len(hr_1.keys())
display(RPE_1_hr[list(RPE_1_hr.keys())[Sample_Subject_idx]])

In [None]:
# Example: load subset 1 COSMED heart rate data 
COSMED_1_hr = Load_Data(Metadata, subset = 'FirstRound', data_type = 'COSMED', must_have_HR = 1, auto_crop_COSMED = 1)

# extracting just HR and time form the loaded COSMED tables
Subjects = np.array(list(COSMED_1_hr.keys()))
hr_1 = dict()
for Subject in Subjects:
    Time = COSMED_1_hr[Subject]['t'].to_numpy()
    hr = COSMED_1_hr[Subject]['HR'].to_numpy()
    hr_1[Subject] = pd.DataFrame(np.hstack((Time,hr)), columns = ['t','HR'])


print('---------------------')
print(f'{len(list(hr_1.keys()))} subjects loaded')
print('---------------------')
print("Here's a sample")
print(f'{list(hr_1.keys())[0]}:')

Sample_Subject_idx = 0 # change this value to view different samples: int < len(hr_1.keys())

display(hr_1[list(hr_1.keys())[Sample_Subject_idx]])

# plot heart rate with RPE
fig, ax1 = plt.subplots()
color = 'tab:red'
ax1.set_xlabel('time (s)')
ax1.set_ylabel('RPE', color=color)
ax1.plot(RPE_1_hr[list(RPE_1_hr.keys())[Sample_Subject_idx]].Time , RPE_1_hr[list(RPE_1_hr.keys())[Sample_Subject_idx]].RPE, color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Heart Rate', color=color)
ax2.plot(hr_1[list(hr_1.keys())[Sample_Subject_idx]].t, hr_1[list(hr_1.keys())[Sample_Subject_idx]].HR, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()

In [None]:
# Example: load subset 1 COSMED heart rate data with heart rate data artifact removal
# As can be seen in the last example, the heart rate data captured by Polar H10 can sometimes have '0 value' noise due to motion artifacts 
# Let's try removing that via extrapolation, to do so, simply set hr_artifact_removal = 1

COSMED_1_hr = Load_Data(Metadata, subset = 'FirstRound', data_type = 'COSMED', must_have_HR = 1, auto_crop_COSMED = 1, hr_artifact_removal = 1)

# extracting just HR and time form the loaded COSMED tables
Subjects = np.array(list(COSMED_1_hr.keys()))
hr_1 = dict()
for Subject in Subjects:
    Time = COSMED_1_hr[Subject]['t'].to_numpy()
    hr = COSMED_1_hr[Subject]['HR'].to_numpy()
    hr_1[Subject] = pd.DataFrame(np.hstack((Time,hr)), columns = ['t','HR'])


print('---------------------')
print(f'{len(list(hr_1.keys()))} subjects loaded')
print('---------------------')
print("Here's a sample")
print(f'{list(hr_1.keys())[0]}:')

Sample_Subject_idx = 0 # change this value to view different samples: int < len(hr_1.keys())

display(hr_1[list(hr_1.keys())[Sample_Subject_idx]])

# plot heart rate with RPE
fig, ax1 = plt.subplots()
color = 'tab:red'
ax1.set_xlabel('time (s)')
ax1.set_ylabel('RPE', color=color)
ax1.plot(RPE_1_hr[list(RPE_1_hr.keys())[Sample_Subject_idx]].Time , RPE_1_hr[list(RPE_1_hr.keys())[Sample_Subject_idx]].RPE, color=color)
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Heart Rate', color=color)
ax2.plot(hr_1[list(hr_1.keys())[Sample_Subject_idx]].t, hr_1[list(hr_1.keys())[Sample_Subject_idx]].HR, color=color)
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()