Open up data and seperate for different types, no time series because its developed further in the file

In [None]:
import os
import re
import pandas as pd
import numpy as np

# Helper function
def get_label(filename):
    match = re.match(r'^(als|hunt|control|park)', filename.lower())
    if match:
        return match.group(1)
    return 'unknown'

# Function to decode header file
def decode_header(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    
    header_dict = {}
    lines = [line.strip() for line in lines if line.strip() and not line.startswith('#')]
    if not lines:
        return header_dict

    first_line = lines[0]
    fields = first_line.split()
    if len(fields) >= 5:
        header_dict['data_file'] = fields[0]
        
        header_dict['channels'] = int(fields[1])
        
        header_dict['sampling_rate'] = float(fields[2])
        
        header_dict['calibration_factor'] = float(fields[3])
        
        header_dict['gain'] = int(fields[4])

        for i, val in enumerate(fields[5:], start=5):
            header_dict[f'field_{i}'] = val

    # Process subsequent lines in "key: value" format.
    for line in lines[1:]:
        if ':' in line:
            key, val = line.split(':', 1)
            header_dict[key.strip()] = val.strip()

    return header_dict

# Function to decode binary signal files (.let and .rit)
def decode_signal(file_path, dtype=np.int16):
    with open(file_path, 'rb') as f:
        data = f.read()
    # Convert the binary data into a NumPy array
    signal = np.frombuffer(data, dtype=dtype)
    df = pd.DataFrame({
        'sample_index': np.arange(len(signal)),
        'signal_value': signal
    })
    return df

folder_path = 'gait_data'

headers_list = []
left_signals = []
right_signals = []

# Process each file in the folder
for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)
    if not os.path.isfile(file_path):
        continue

    label = get_label(filename)
    
    if filename.endswith('.hea'):
        header_info = decode_header(file_path)
        header_info['filename'] = filename
        header_info['label'] = label
        headers_list.append(header_info)
    
    elif filename.endswith('.let'):
        df_left = decode_signal(file_path)
        df_left['filename'] = filename
        df_left['label'] = label
        df_left['side'] = 'left'
        left_signals.append(df_left)
    
    elif filename.endswith('.rit'):
        df_right = decode_signal(file_path)
        df_right['filename'] = filename
        df_right['label'] = label
        df_right['side'] = 'right'
        right_signals.append(df_right)


# Create final DataFrames for each file type
df_headers = pd.DataFrame(headers_list)
df_left_signals = pd.concat(left_signals, ignore_index=True) if left_signals else pd.DataFrame()
df_right_signals = pd.concat(right_signals, ignore_index=True) if right_signals else pd.DataFrame()


print("Decoded Header DataFrame:")
print(df_headers.head(), "\n")

print("Decoded Left Foot Signal DataFrame:")
print(df_left_signals.head(), "\n")

print("Decoded Right Foot Signal DataFrame:")
print(df_right_signals.head(), "\n")

Decoded Header DataFrame:
    filename label
0   als1.hea   als
1  als10.hea   als
2  als11.hea   als
3  als12.hea   als
4  als13.hea   als 

Decoded Left Foot Signal DataFrame:
   sample_index  signal_value  filename label  side
0             0         -8878  als1.let   als  left
1             1         21330  als1.let   als  left
2             2         21469  als1.let   als  left
3             3         -8877  als1.let   als  left
4             4         21331  als1.let   als  left 

Decoded Right Foot Signal DataFrame:
   sample_index  signal_value  filename label   side
0             0         -6144  als1.rit   als  right
1             1        -10794  als1.rit   als  right
2             2        -10514  als1.rit   als  right
3             3         -4394  als1.rit   als  right
4             4        -10537  als1.rit   als  right 



Develope timeseries files, and how they interact

In [7]:
import os
import pandas as pd

def extract_double_support(file_path):
    """
    Reads a .ts file and extracts the Double Support Time column.
    
    Assumes the .ts file is whitespace-delimited and that:
      - The file does not have a header row.
      - The 12th column (index 11) represents the Double Support Time.
    
    Parameters:
        file_path (str): Path to the .ts file.
        
    Returns:
        pd.DataFrame: A DataFrame with two columns:
            - 'Double Support Time': The double support time for each stride.
            - 'filename': The name of the file.
    """
    # Read the .ts file
    df = pd.read_csv(file_path, sep=r'\s+', header=None, engine='python')
    df.dropna(how='all', inplace=True)
    
    # Extract the double support time column
    double_support_time = df.iloc[:, 11]
    
    # Get the filename
    file_name = os.path.basename(file_path)

    label_name = re.match(r'^(als|hunt|control|park)', file_name.lower())
    
    # Create a new DataFrame with the desired two columns.
    result_df = pd.DataFrame({
        'Double Support Time': double_support_time,
        'filename': file_name,
        'label': label_name.group(1)
    })
    
    return result_df

# Dataframes of our variables
file_als = 'gait_data/als1.ts'
df_double_support_als = extract_double_support(file_als)
print(len(df_double_support_als))

file_hunt = 'gait_data/hunt1.ts'
df_double_support_hunt = extract_double_support(file_hunt)
print(len(df_double_support_hunt))

file_control = 'gait_data/control1.ts'
df_double_support_control = extract_double_support(file_control)
print(len(df_double_support_control))

file_park = 'gait_data/park1.ts'
df_double_support_park = extract_double_support(file_park)
print(len(df_double_support_park))

194
310
259
245


Combine all the singular time series df into one big combined timeseries df

In [8]:
import pandas as pd

# Determine the smallest number of rows among the four DataFrames
min_length = min(
    len(df_double_support_als),
    len(df_double_support_hunt),
    len(df_double_support_control),
    len(df_double_support_park)
)

# Trim each DataFrame to the minimum number of rows and reset their index
df_als_trim = df_double_support_als.iloc[:min_length].reset_index(drop=True)
df_hunt_trim = df_double_support_hunt.iloc[:min_length].reset_index(drop=True)
df_control_trim = df_double_support_control.iloc[:min_length].reset_index(drop=True)
df_park_trim = df_double_support_park.iloc[:min_length].reset_index(drop=True)

# Create a combined DataFrame with one column per variable.
df_combined = pd.DataFrame({
    'als': df_als_trim['Double Support Time'],
    'hunt': df_hunt_trim['Double Support Time'],
    'control': df_control_trim['Double Support Time'],
    'park': df_park_trim['Double Support Time']
})

# Display the combined DataFrame
df_combined


Unnamed: 0,als,hunt,control,park
0,0.4633,0.1900,0.3200,0.4333
1,0.4400,0.1800,0.3067,0.3767
2,0.4267,0.1933,0.2933,0.3800
3,0.5467,0.1867,0.3233,0.4133
4,0.5333,0.1933,0.2767,0.3800
...,...,...,...,...
189,0.5100,0.1967,0.3600,0.3500
190,0.5333,0.1833,0.3300,0.3000
191,0.5300,0.1967,0.3533,0.3067
192,0.5400,0.2067,0.3933,0.3367


In [9]:
import plotly.express as px
import pandas as pd
import numpy as np

# Combine the individual DataFrames for each group into one DataFrame.
df_all = pd.concat([
    df_double_support_als,
    df_double_support_hunt,
    df_double_support_control,
    df_double_support_park
], ignore_index=True)

# Filter out rows with Double Support Time above 1.
df_all = df_all[df_all['Double Support Time'] <= 1]

# Map each group to a numeric value for the x-axis.
group_mapping = {'als': 0, 'hunt': 1, 'control': 2, 'park': 3}
df_all['group_numeric'] = df_all['label'].map(group_mapping)

# Add a small horizontal jitter to avoid overlapping dots.
df_all['jitter'] = df_all['group_numeric'] + np.random.uniform(-0.1, 0.1, size=len(df_all))

# Create the scatter plot using Plotly Express.
fig = px.scatter(
    df_all,
    x='jitter',
    y='Double Support Time',
    color='label',
    opacity=0.7,
    title="Double Support Time per Group (Filtered: Values <= 1)",
    labels={'jitter': 'Group', 'Double Support Time': 'Double Support Time (s)'}
)

# Adjust the x-axis ticks to show group names.
fig.update_xaxes(
    tickmode='array',
    tickvals=[0, 1, 2, 3],
    ticktext=['als', 'hunt', 'control', 'park']
)

fig.show()


In [12]:
# convert to json
df_json = df_all.to_json(orient="records")
with open("data.json", "w") as f:
    f.write(df_json)

In [None]:
#### Get left and right strides for each group
import os
import pandas as pd

def extract_strides(file_path):
    """
    Reads a .ts file and extracts the Double Support Time column.
    
    Assumes the .ts file is whitespace-delimited and that:
      - The file does not have a header row.
      - Columns 1, 2, 3, 4 are for left stride, right stride, left swing, right swing respectively.
    
    Parameters:
        file_path (str): Path to the .ts file.
        
    Returns:
        pd.DataFrame: A DataFrame with two columns:
            - 'Double Support Time': The double support time for each stride.
            - 'filename': The name of the file.
    """
    # Read the .ts file
    df = pd.read_csv(file_path, sep=r'\s+', header=None, engine='python')
    df.dropna(how='all', inplace=True)
    
    # Extract the double support time column
    left_stride = df.iloc[:, 1]
    right_stride = df.iloc[:, 2]
    left_swing = df.iloc[:, 3]
    right_swing = df.iloc[:, 4]

    # Get the filename
    file_name = os.path.basename(file_path)

    label_name = re.match(r'^(als|hunt|control|park)', file_name.lower())
    
    # Create a new DataFrame with the desired two columns.
    result_df = pd.DataFrame({
        'left_stride': left_stride,
        'right_stride': right_stride,
        'left_swing': left_swing,
        'right_swing': right_swing,
        'filename': file_name,
        'label': label_name.group(1)
    })
    
    return result_df

# Dataframes of our variables
file_als = 'gait_data/als1.ts'
df_strides_als = extract_strides(file_als)
print(len(df_double_support_als))

file_hunt = 'gait_data/hunt1.ts'
df_strides_hunt = extract_strides(file_hunt)
print(len(df_double_support_hunt))

file_control = 'gait_data/control1.ts'
df_strides_control = extract_strides(file_control)
print(len(df_double_support_control))

file_park = 'gait_data/park1.ts'
df_strides_park = extract_strides(file_park)
print(len(df_double_support_park))

194
310
259
245


In [8]:
df_strides_als.head()

Unnamed: 0,left_stride,right_stride,left_swing,right_swing,filename,label
0,1.2833,1.3533,0.4067,0.4133,als1.ts,als
1,1.3233,1.2667,0.4833,0.4,als1.ts,als
2,1.3033,1.36,0.45,0.4267,als1.ts,als
3,1.4167,1.2833,0.5033,0.3667,als1.ts,als
4,1.2367,1.4067,0.3467,0.3567,als1.ts,als


In [10]:
# Determine the smallest number of rows among the four DataFrames
min_length = min(
    len(df_strides_als),
    len(df_strides_hunt),
    len(df_strides_control),
    len(df_strides_park)
)

# Trim each DataFrame to the minimum number of rows and reset their index
df_als_trim = df_strides_als.iloc[:min_length].reset_index(drop=True)
df_hunt_trim = df_strides_hunt.iloc[:min_length].reset_index(drop=True)
df_control_trim = df_strides_control.iloc[:min_length].reset_index(drop=True)
df_park_trim = df_strides_park.iloc[:min_length].reset_index(drop=True)



In [13]:
def save_to_json(df, filename):
    try:
        df_json = df.to_json(orient="records")
        with open(filename, "w") as f:
            f.write(df_json)
        print(f"File {filename} created successfully.")
    except Exception as e:
        print(f"Failed to create file {filename}: {e}")

# Save DataFrames to JSON files
save_to_json(df_strides_als, "als1.json")
save_to_json(df_strides_hunt, "hunt1.json")
save_to_json(df_strides_control, "control1.json")
save_to_json(df_strides_park, "park1.json")

File als1.json created successfully.
File hunt1.json created successfully.
File control1.json created successfully.
File park1.json created successfully.


So i was thinking as one interactive feature can be that when you click on a certain disease it will focus on it enlarge it and show on it the average for that group compared to average for control and average over all the variables

The next markdown is the summary/overview part of our prototype

We have decided to remain the same group for the final project after Project 3. Since we have already developed numerous ideas regarding the previous dataset, KoreanDB, we wanted to explore something new and more challenging. This led us to research various topics within a larger resource called PhysioNet. We were particularly drawn to this dataset because, at first glance, it presented many opportunities to develop new visualization skills.

So far, we have completed several key tasks. Firstly, we brainstormed ideas that interest us and could effectively convey the essence of our final project—telling a compelling story about how and why people with certain diseases struggle with gait abnormalities. Secondly, since this is a challenging dataset, we familiarized ourselves with the available tools, explored the different data types in the files, and gained a general understanding of how the variables interact. This step was moderately difficult because the dataset was formatted in a way that differed from the usual CSV files we are accustomed to, requiring us to revisit and refine our file manipulation skills.

We still need to finalize the exact direction of our project, which presents several challenges. One of the biggest difficulties is determining the most effective way to convey the physical disparities between different disease groups. An integral part of this challenge is incorporating animations on our website—whether through smooth transitions or fully interactive motion graphics. We have never created animations before, so we are unsure whether they will enhance or reduce the clarity of our visualization. While animations can improve engagement, they must serve the story—if they fail to do so, they become distracting rather than useful.

Last but not least, storytelling itself will be our biggest challenge. In past projects, we focused heavily on developing technical skills. However, for this project, we must go beyond the technical execution and focus on crafting a compelling narrative—because a truly great visualization is built on a strong story. Although we have put much thought into this aspect, we recognize that storytelling is what will make our visualization impactful, and it requires just as much effort as the technical implementation.

GOAL: Compute average walking speed using subject_description.txt

In [None]:
import pandas as pd
import numpy as np

def compute_avg_gait_speed(file_path, disease_name):
    try:
        df = pd.read_csv(file_path, sep='\t', engine='python')
    except:
        df = pd.read_csv(file_path, sep=r'\s+', engine='python')

    df.columns = ['Subject', 'Group', 'Age', 'Height', 'Weight', 'Gender', 'GaitSpeed', 'Severity']

    df['GaitSpeed'] = pd.to_numeric(df['GaitSpeed'], errors='coerce')

    if disease_name.lower() == "als":
        df_filtered = df[df['Subject'].str.contains("als", case=False, na=False)]
    else:
        df_filtered = df[df['Group'].str.contains(disease_name, case=False, na=False)]

    return df_filtered['GaitSpeed'].mean()

file_path = "gait_data/subject-description.txt"

avg_gait_speed_control = compute_avg_gait_speed(file_path, "control")
avg_gait_speed_hunt = compute_avg_gait_speed(file_path, "hunt")
avg_gait_speed_park = compute_avg_gait_speed(file_path, "park")
avg_gait_speed_als = compute_avg_gait_speed(file_path, "als")

print(f"Average Gait Speed for Control: {avg_gait_speed_control:.2f} m/s")
print(f"Average Gait Speed for Huntington's: {avg_gait_speed_hunt:.2f} m/s")
print(f"Average Gait Speed for Parkinson's: {avg_gait_speed_park:.2f} m/s")
print(f"Average Gait Speed for ALS: {avg_gait_speed_als:.2f} m/s")


Average Gait Speed for Control: 1.35 m/s
Average Gait Speed for Huntington's: 1.15 m/s
Average Gait Speed for Parkinson's: 1.00 m/s
Average Gait Speed for ALS: 1.05 m/s


GOAL: get positionality and time for each person

In [None]:
import pandas as pd
import numpy as np
import os
from scipy.interpolate import interp1d

# Helper function to open a .ts file and extract all columns, returning a DataFrame
def ts_file_extraction(ts_file):
    df = pd.read_csv('gait_data/' + ts_file, sep=r'\s+', header=None, engine='python')
    df.dropna(how='all', inplace=True)  # Remove empty rows
    df.columns = ['Time', 'Left_Stride_Time', 'Right_Stride_Time', 
                  'Left_Swing_Time', 'Right_Swing_Time', 
                  'Left_Stance_%', 'Right_Stance_%', 
                  'Left_Stance_Time', 'Right_Stance_Time', 
                  'Left_Support_%', 'Right_Support_%', 
                  'Double_Support_Time', 'Double_Support_%']
    return df

# Helper function to extract sampling frequency from header files
def extract_sampling_frequency(header_path):
    with open(header_path, 'r') as f:
        first_line = f.readline().strip().split()
        return float(first_line[2])

# Function to extract gait parameters from .ts and header files and return a DataFrame
def compute_gait_positions(hea_file, ts_file, avg_speed=1.2, step_lift=0.30, num_interpolated_points=100):
    # Extract sampling frequency from the header file
    sampling_freq = extract_sampling_frequency('gait_data/' + hea_file)

    # Load the .ts file using the helper function
    df = ts_file_extraction(ts_file)

    # Recalculate time based on the sample index and sampling frequency
    df['Time'] = df.index / sampling_freq

    # Compute estimated stride lengths (and clip them to realistic values, e.g., between 0.5 and 1.5 m)
    df['Left_Stride_Length'] = np.clip(df['Left_Stride_Time'] * avg_speed, 0.5, 1.5)
    df['Right_Stride_Length'] = np.clip(df['Right_Stride_Time'] * avg_speed, 0.5, 1.5)

    # Compute X-positions (forward movement) using cumulative sums
    df['X_Left'] = np.cumsum(df['Left_Stride_Length'])
    df['X_Right'] = np.cumsum(df['Right_Stride_Length'])

    # Compute Y-positions (alternating binary pattern: 0 for ground, step_lift for air)
    df['Y_Left'] = np.where(df.index % 2 == 0, 0, step_lift)
    df['Y_Right'] = np.where(df.index % 2 == 0, step_lift, 0)

    # Interpolate X positions to a common time vector
    original_time = df['Time']
    common_time = np.linspace(original_time.min(), original_time.max(), num=num_interpolated_points)
    
    interp_x_right = interp1d(original_time, df['X_Right'], kind='linear', fill_value="extrapolate")
    interp_x_left = interp1d(original_time, df['X_Left'], kind='linear', fill_value="extrapolate")
    
    new_X_right = interp_x_right(common_time)
    new_X_left = interp_x_left(common_time)
    
    # For Y positions, we generate a binary alternating pattern over the new time points,
    # rather than interpolating them (to preserve the stance/swing phase representation)
    new_Y_right = np.where(np.arange(num_interpolated_points) % 2 == 0, step_lift, 0)
    new_Y_left = np.where(np.arange(num_interpolated_points) % 2 == 0, 0, step_lift)
    
    # Create the final interpolated DataFrame
    interpolated_df = pd.DataFrame({
        'Time': common_time,
        'X_Right': new_X_right,
        'Y_Right': new_Y_right,
        'X_Left': new_X_left,
        'Y_Left': new_Y_left
    })

    return interpolated_df

# Example usage
exp_als_1_gait = compute_gait_positions('als1.hea', 'als1.ts')
print(exp_als_1_gait.head())

    


Unnamed: 0,Time,X_Right,Y_Right,X_Left,Y_Left
0,0.0,1.62396,0.3,1.53996,0.0
1,0.006498,4.693576,0.284848,4.612892,0.015152
2,0.012997,7.833491,0.269697,7.726057,0.030303
3,0.019495,10.779145,0.254545,10.70864,0.045455
4,0.025993,13.636395,0.239394,13.519587,0.060606
