In [None]:
# FINAL UPDATED CODE
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
from scipy.signal import resample  # Temporal resampling
from scipy.stats import zscore  # Z-Score normalization

# Define dictionary for leg lengths corresponding to each runthrough
leg_lengths = {
    "sub1_1_normal": 0.84,
    "sub1_2_normal": 0.84,
    "sub1_3_normal": 0.84,
    "sub2_1_normal": 0.94,
    "sub2_2_normal": 0.94,
    "sub3_1_normal": 0.91,
    "sub3_2_normal": 0.91,
    "sub3_3_normal": 0.91,
    "sub4_1_normal": 0.89,
    "sub4_2_normal": 0.89,
    "sub4_3_normal": 0.89,
    "sub5_1_normal": 0.74,
    "sub5_2_normal": 0.74,
    "sub5_3_normal": 0.74,
    "sub6_2_abnormal_stance": 0.74,
    "sub6_3_abnormal_stance": 0.74,
    "sub6_4_abnormal_swing": 0.74,
    "sub6_5_abnormal_swing": 0.74,
    "sub6_6_abnormal_swing": 0.74,
    "sub7_1_abnormal_stance": 0.79,
    "sub7_2_abnormal_stance": 0.79,
    "sub7_3_abnormal_stance": 0.79,
    "sub8_1_abnormal_swing": 0.74,
    "sub8_2_abnormal_swing": 0.74,
    "sub8_3_abnormal_swing": 0.74,
    "sub9_1_normal": 0.81,
    "sub9_2_normal": 0.81,
    "sub9_3_normal": 0.81,
    "sub10_2_normal": 0.74,
    "sub10_3_normal": 0.74
}

# Define function to calculate phase duration based on row count (default index)


def calculate_phase_duration(phase_data):
    return len(phase_data)  # Duration is the number of rows


def remove_outliers(phases_dict, threshold=1.5):
    # Iterate over "stance" and "swing" phases
    for phase_type in ["stance", "swing"]:
        # Get all durations for the specific phase type
        durations = [calculate_phase_duration(
            phase_data) for phase_data in phases_dict[phase_type]]
        durations = np.array(durations)

        # Calculate IQR for the durations
        Q1 = np.percentile(durations, 25)
        Q3 = np.percentile(durations, 75)
        IQR = Q3 - Q1
        lower_bound = Q1 - threshold * IQR
        upper_bound = Q3 + threshold * IQR

        # Filter phases within IQR bounds
        filtered_phases = [phase_data for i, phase_data in enumerate(
            phases_dict[phase_type]) if lower_bound <= durations[i] <= upper_bound]
        phases_dict[phase_type] = filtered_phases

    # Ensure equal number of stance and swing phases
    min_stance = len(phases_dict["stance"])
    min_swing = len(phases_dict["swing"])
    min_count = min(min_stance, min_swing)

    # Trim lists to equal length
    phases_dict["stance"] = phases_dict["stance"][:min_count]
    phases_dict["swing"] = phases_dict["swing"][:min_count]

    return phases_dict


# Temporal normalization for stance and swing phases separately
def temporal_normalization(phase_data, phase_type, stance_points=6, swing_points=4):
    # Determine number of points based on phase type
    num_points = stance_points if phase_type == "stance" else swing_points
    # Assuming data is a DataFrame, use .values to get the data as an array
    resampled_data = resample(phase_data.values, num_points, axis=0)
    return resampled_data

# Amplitude normalization (adjusted for stance and swing phases)


def amplitude_normalization(phase_data, method='zscore'):
    if method == 'zscore':
        return zscore(phase_data, axis=0)  # Z-Score normalization
    elif method == 'minmax':
        # Min-Max normalization
        return (phase_data - np.min(phase_data, axis=0)) / (np.max(phase_data, axis=0) - np.min(phase_data, axis=0))
    else:
        raise ValueError("Normalization method must be 'zscore' or 'minmax'.")

# Spatial normalization (adjusted for stance and swing phases)


def spatial_normalization(phase_data, runthrough_name, method='scale'):
    # Retrieve the leg length for the current runthrough
    leg_length = leg_lengths.get(runthrough_name)
    if leg_length is None:
        raise ValueError(
            f"Leg length not found for runthrough: {runthrough_name}")
    if method == 'scale':
        return phase_data / leg_length
    else:
        raise ValueError("Normalization method must be 'scale'.")

# Validation: Plot the normalized phases to check consistency


def plot_normalized_phases(normalized_phases, phase_type):
    plt.figure(figsize=(10, 6))
    for i, phase in enumerate(normalized_phases):
        plt.plot(phase, label=f"{phase_type.capitalize()} Phase {i + 1}")
    plt.legend()
    plt.title(f"Normalized {phase_type.capitalize()} Phases")
    plt.show()

In [None]:
# Example directory paths and processing logic
# Folder containing all runthroughs
input_folder = r"C:\Users\diyav\.jupyter\Filtered Data"
# Folder to save normalized data
output_folder = r"C:\Users\diyav\.jupyter\Normalized Data"

# Create output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Process each runthrough folder
for runthrough in os.listdir(input_folder):
    runthrough_path = os.path.join(input_folder, runthrough)

    if os.path.isdir(runthrough_path):
        print(f"Processing {runthrough}...")

        # Create a folder for the runthrough in the output directory
        runthrough_output_folder = os.path.join(output_folder, runthrough)
        os.makedirs(runthrough_output_folder, exist_ok=True)

        # Load the stance and swing phase data
        phases_dict = {"stance": [], "swing": []}
        for phase_file in os.listdir(runthrough_path):
            if phase_file.endswith('.csv'):
                phase_path = os.path.join(runthrough_path, phase_file)

                # Load the phase data into DataFrame
                phase_data = pd.read_csv(phase_path)

                # Determine if the phase is stance or swing based on the filename
                if "stance" in phase_file.lower():
                    phases_dict["stance"].append(phase_data)
                elif "swing" in phase_file.lower():
                    phases_dict["swing"].append(phase_data)

        # Step 1: Outlier removal
        filtered_phases_dict = remove_outliers(phases_dict)

        # Step 2: Temporal normalization
        normalized_phases_dict = {}
        for phase_type in ["stance", "swing"]:
            normalized_phases_dict[phase_type] = [
                temporal_normalization(phase_data, phase_type) for phase_data in filtered_phases_dict[phase_type]
            ]

        # Step 3: Amplitude normalization
        for phase_type in ["stance", "swing"]:
            normalized_phases_dict[phase_type] = [
                amplitude_normalization(phase_data, method='zscore') for phase_data in normalized_phases_dict[phase_type]
            ]

        # Step 4: Spatial normalization
        for phase_type in ["stance", "swing"]:
            normalized_phases_dict[phase_type] = [
                spatial_normalization(phase_data, runthrough, method='scale') for phase_data in normalized_phases_dict[phase_type]
            ]

        # Step 5: Save the normalized data to new CSV files
        for phase_type, normalized_phases in normalized_phases_dict.items():
            for i, phase_data in enumerate(normalized_phases):
                output_file_path = os.path.join(
                    runthrough_output_folder, f"normalized_{phase_type}_phase_{i + 1}.csv")
                pd.DataFrame(phase_data).to_csv(output_file_path, index=False)
                print(
                    f"Saved normalized {phase_type} data for phase {i + 1} in {runthrough}")

        # Step 6: Validation (plot the normalized phases for visual check)
        for phase_type in ["stance", "swing"]:
            plot_normalized_phases(
                normalized_phases_dict[phase_type], phase_type)

print(
    f"All phases have been processed and normalized. Output saved to: {output_folder}")

In [None]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
from scipy.signal import resample  # Temporal resampling
from scipy.stats import zscore  # Z-Score normalization

# Define dictionary for leg lengths corresponding to each runthrough
leg_lengths = {
    "sub1_1_normal": 0.84,
    "sub1_2_normal": 0.84,
    "sub1_3_normal": 0.84,
    "sub2_1_normal": 0.94,
    "sub2_2_normal": 0.94,
    "sub3_1_normal": 0.91,
    "sub3_2_normal": 0.91,
    "sub3_3_normal": 0.91,
    "sub4_1_normal": 0.89,
    "sub4_2_normal": 0.89,
    "sub4_3_normal": 0.89,
    "sub5_1_normal": 0.74,
    "sub5_2_normal": 0.74,
    "sub5_3_normal": 0.74,
    "sub6_2_abnormal_stance": 0.74,
    "sub6_3_abnormal_stance": 0.74,
    "sub6_4_abnormal_swing": 0.74,
    "sub6_5_abnormal_swing": 0.74,
    "sub6_6_abnormal_swing": 0.74,
    "sub7_1_abnormal_stance": 0.79,
    "sub7_2_abnormal_stance": 0.79,
    "sub7_3_abnormal_stance": 0.79,
    "sub8_1_abnormal_swing": 0.74,
    "sub8_2_abnormal_swing": 0.74,
    "sub8_3_abnormal_swing": 0.74,
    "sub9_1_normal": 0.81,
    "sub9_2_normal": 0.81,
    "sub9_3_normal": 0.81,
    "sub10_2_normal": 0.74,
    "sub10_3_normal": 0.74
}

# Define function to calculate phase duration based on row count (default index)


def calculate_phase_duration(phase_data):
    return len(phase_data)  # Duration is the number of rows


def remove_outliers(phases_dict, threshold=1.5):
    # Iterate over "stance" and "swing" phases
    for phase_type in ["stance", "swing"]:
        # Get all durations for the specific phase type
        durations = [calculate_phase_duration(
            phase_data) for phase_data in phases_dict[phase_type]]
        durations = np.array(durations)

        # Calculate IQR for the durations
        Q1 = np.percentile(durations, 25)
        Q3 = np.percentile(durations, 75)
        IQR = Q3 - Q1
        lower_bound = Q1 - threshold * IQR
        upper_bound = Q3 + threshold * IQR

        # Filter phases within IQR bounds
        filtered_phases = [phase_data for i, phase_data in enumerate(
            phases_dict[phase_type]) if lower_bound <= durations[i] <= upper_bound]
        phases_dict[phase_type] = filtered_phases

    # Ensure equal number of stance and swing phases
    min_stance = len(phases_dict["stance"])
    min_swing = len(phases_dict["swing"])
    min_count = min(min_stance, min_swing)

    # Trim lists to equal length
    phases_dict["stance"] = phases_dict["stance"][:min_count]
    phases_dict["swing"] = phases_dict["swing"][:min_count]

    return phases_dict


# Temporal normalization for stance and swing phases separately
def temporal_normalization(phase_data, phase_type, stance_points=6, swing_points=4):
    # Determine number of points based on phase type
    num_points = stance_points if phase_type == "stance" else swing_points
    # Assuming data is a DataFrame, use .values to get the data as an array
    resampled_data = resample(phase_data.values, num_points, axis=0)
    return resampled_data

# Amplitude normalization (adjusted for stance and swing phases)


def amplitude_normalization(phase_data, method='zscore'):
    if method == 'zscore':
        return zscore(phase_data, axis=0)  # Z-Score normalization
    elif method == 'minmax':
        # Min-Max normalization
        return (phase_data - np.min(phase_data, axis=0)) / (np.max(phase_data, axis=0) - np.min(phase_data, axis=0))
    else:
        raise ValueError("Normalization method must be 'zscore' or 'minmax'.")

# Spatial normalization (adjusted for stance and swing phases)


def spatial_normalization(phase_data, runthrough_name, method='scale'):
    # Retrieve the leg length for the current runthrough
    leg_length = leg_lengths.get(runthrough_name)
    if leg_length is None:
        raise ValueError(
            f"Leg length not found for runthrough: {runthrough_name}")
    if method == 'scale':
        return phase_data / leg_length
    else:
        raise ValueError("Normalization method must be 'scale'.")

# Validation: Plot the normalized phases to check consistency


def plot_normalized_phases(normalized_phases, phase_type):
    plt.figure(figsize=(10, 6))
    for i, phase in enumerate(normalized_phases):
        plt.plot(phase, label=f"{phase_type.capitalize()} Phase {i + 1}")
    plt.legend()
    plt.title(f"Normalized {phase_type.capitalize()} Phases")
    plt.show()


# Example directory paths and processing logic
# Folder containing all runthroughs
input_folder = r"C:\Users\diyav\.jupyter\Filtered Data"
# Folder to save normalized data
output_folder = r"C:\Users\diyav\.jupyter\Normalized Data"

# Create output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

# Process each runthrough folder
for runthrough in os.listdir(input_folder):
    runthrough_path = os.path.join(input_folder, runthrough)

    if os.path.isdir(runthrough_path):
        print(f"Processing {runthrough}...")

        # Create a folder for the runthrough in the output directory
        runthrough_output_folder = os.path.join(output_folder, runthrough)
        os.makedirs(runthrough_output_folder, exist_ok=True)

        # Load the stance and swing phase data
        phases_dict = {"stance": [], "swing": []}
        for phase_file in os.listdir(runthrough_path):
            if phase_file.endswith('.csv'):
                phase_path = os.path.join(runthrough_path, phase_file)

                # Load the phase data into DataFrame
                phase_data = pd.read_csv(phase_path)

                # Save the original column names before transformation
                original_columns = phase_data.columns

                # Determine if the phase is stance or swing based on the filename
                if "stance" in phase_file.lower():
                    phases_dict["stance"].append(phase_data)
                elif "swing" in phase_file.lower():
                    phases_dict["swing"].append(phase_data)

        # Step 1: Outlier removal
        filtered_phases_dict = remove_outliers(phases_dict)

        # Step 2: Temporal normalization
        normalized_phases_dict = {}
        for phase_type in ["stance", "swing"]:
            normalized_phases_dict[phase_type] = [
                temporal_normalization(phase_data, phase_type) for phase_data in filtered_phases_dict[phase_type]
            ]

        # Step 3: Amplitude normalization
        for phase_type in ["stance", "swing"]:
            normalized_phases_dict[phase_type] = [
                amplitude_normalization(phase_data, method='zscore') for phase_data in normalized_phases_dict[phase_type]
            ]

        # Step 4: Spatial normalization
        for phase_type in ["stance", "swing"]:
            normalized_phases_dict[phase_type] = [
                spatial_normalization(phase_data, runthrough, method='scale') for phase_data in normalized_phases_dict[phase_type]
            ]

        # Step 5: Save the normalized data to new CSV files
        for phase_type, normalized_phases in normalized_phases_dict.items():
            for i, phase_data in enumerate(normalized_phases):
                # Re-apply the original column names after transformation
                normalized_phase_df = pd.DataFrame(
                    phase_data, columns=original_columns)
                output_file_path = os.path.join(
                    runthrough_output_folder, f"normalized_{phase_type}_phase_{i + 1}.csv")
                normalized_phase_df.to_csv(output_file_path, index=False)
                print(
                    f"Saved normalized {phase_type} data for phase {i + 1} in {runthrough}")

        # Step 6: Validation (plot the normalized phases for visual check)
        for phase_type in ["stance", "swing"]:
            plot_normalized_phases(
                normalized_phases_dict[phase_type], phase_type)

print(
    f"All phases have been processed and normalized. Output saved to: {output_folder}")