In [3]:
import os
import pandas as pd
import csv
import glob
import numpy as np
import warnings
from zipfile import BadZipFile

# ------------------- Configuration Section -------------------
# Define a unified base folder. All other folders are defined relative to this.
BASE_FOLDER = "/Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting"

# Folder containing the raw TXT files
TXT_INPUT_FOLDER = os.path.join(BASE_FOLDER, "BBVJAPR16-TXT")

# Folder to store the intermediate XLSX files produced from TXT conversion
XLSX_INTERMEDIATE_FOLDER = os.path.join(BASE_FOLDER, "BBVJAPR16-INT")

# Folder to store the final processed XLSX files
PROCESSED_XLSX_FOLDER = os.path.join(BASE_FOLDER, "BBVJAPR16-PROCCESED")

# Full path (including filename) for the analysis report output
ANALYSIS_REPORT_FILE = os.path.join(BASE_FOLDER, "BBVJAPR16_analysis_report_file.xlsx")
# ---------------------------------------------------------------

# Ignore FutureWarnings for cleaner output
warnings.simplefilter(action='ignore', category=FutureWarning)

# ---------- Functions for TXT-to-XLSX conversion --------------
def detect_delimiter(txt_file_path):
    with open(txt_file_path, 'r') as file:
        first_line = file.readline()
        # Check common delimiters: comma, tab, or space
        if ',' in first_line:
            return ','
        elif '\t' in first_line:
            return '\t'
        elif ' ' in first_line:
            return ' '
        else:
            return None

def convert_txt_to_xlsx(txt_file_path, output_folder):
    try:
        # Read the header lines (assumed to be the first 5 lines)
        header_lines = []
        with open(txt_file_path, 'r') as file:
            for i in range(5):  # Adjust if needed
                header_lines.append(file.readline().strip())
        
        # Determine the delimiter
        delimiter = detect_delimiter(txt_file_path)
        if not delimiter:
            raise ValueError(f"Unknown delimiter in file: {txt_file_path}")

        # Read the data portion (skipping the header lines)
        df = pd.read_csv(txt_file_path, delimiter=delimiter, skiprows=len(header_lines),
                         engine="python", quoting=csv.QUOTE_NONE)
        
        # Prepare the output file path (change .txt to .xlsx)
        base_name = os.path.basename(txt_file_path).replace('.txt', '.xlsx')
        xlsx_file_path = os.path.join(output_folder, base_name)
        
        # Write both the header (as text) and the data to the Excel file
        with pd.ExcelWriter(xlsx_file_path, engine='openpyxl') as writer:
            # Write header lines to the top of the sheet
            header_df = pd.DataFrame(header_lines)
            header_df.to_excel(writer, sheet_name='Sheet1', index=False, header=False)
            # Write the main data starting below the header (offset by header lines + 2)
            df.to_excel(writer, sheet_name='Sheet1', index=False, startrow=len(header_lines) + 2)
        
        print(f"Converted: {txt_file_path} -> {xlsx_file_path}")
    except pd.errors.ParserError as e:
        print(f"ParserError while reading {txt_file_path}: {e}")
    except Exception as e:
        print(f"Error converting {txt_file_path}: {e}")

def batch_convert_txt_to_xlsx(input_folder, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    for file_name in os.listdir(input_folder):
        if file_name.endswith('.txt'):
            txt_file_path = os.path.join(input_folder, file_name)
            convert_txt_to_xlsx(txt_file_path, output_folder)

# ------------- Functions for Processing XLSX Files --------------
def validate_excel_file(file_path):
    try:
        pd.read_excel(file_path, nrows=1, engine='openpyxl')
        return True
    except (BadZipFile, Exception):
        return False

def find_threshold_crossing(df, start_index, threshold, direction, filename):
    # Depending on direction, reverse the slice or not
    if direction == "backward":
        df_slice = df.loc[:start_index].iloc[::-1]
    else:
        df_slice = df.loc[start_index:]
    
    prev_value = df_slice.iloc[0]['Total_GRF']
    crossing_count = 0
    for index, row in df_slice.iterrows():
        current_value = row['Total_GRF']
        if (prev_value < threshold and current_value >= threshold) or \
           (prev_value > threshold and current_value <= threshold):
            crossing_count += 1
            if crossing_count == 3:
                return index
        prev_value = current_value
    if crossing_count == 2:
        return index
    return None

def process_files(input_folder, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    excel_files = glob.glob(os.path.join(input_folder, '*.xlsx'))
    processed_count = 0
    error_count = 0
    
    for file_path in excel_files:
        filename = os.path.basename(file_path)
        
        if not validate_excel_file(file_path):
            print(f"Skipping corrupted file: {filename}")
            error_count += 1
            continue
            
        try:
            # Read the converted file; note the header is in row 8 (index 7)
            df = pd.read_excel(file_path, header=7, engine='openpyxl')
            
            # Check for required columns in the data (as available in the TXT file)
            if "Sample #" not in df.columns or "Total_GRF" not in df.columns or "PeakJumpHeight" not in df.columns:
                print(f"Missing required columns in {filename}")
                error_count += 1
                continue
            
            # Initialize new columns
            df["Total_GRF"] = df["Total_GRF"].abs()
            df["VEM1"] = 0
            df["VEM2"] = 0
            df["VEM3"] = 0
            df["VEM4"] = 0
            df["VEM5"] = 0
            df["Power"] = 0
            df["Time"] = df["Sample #"] / 238.095  # Convert sample number to time
            
            # Mark VEM2 (takeoff) and VEM3 (landing)
            takeoff_idx = df[df["Total_GRF"] < 20].index.min()
            if pd.notna(takeoff_idx):
                df.at[takeoff_idx, "VEM2"] = 1
                weight_acceptance_df = df.loc[takeoff_idx + 1:]
                weight_acceptance_idx = weight_acceptance_df[weight_acceptance_df["Total_GRF"] > 20].index.min()
                if pd.notna(weight_acceptance_idx):
                    df.at[weight_acceptance_idx, "VEM3"] = 1
            
            # Calculate a baseline and threshold (95% of the baseline)
            baseline = df['Total_GRF'].iloc[:10].mean()
            threshold = baseline * 0.95

            # Find VEM1 (backward crossing) relative to VEM2
            vem2_indices = df[df['VEM2'] == 1].index
            if not vem2_indices.empty:
                vem1_index = find_threshold_crossing(df, vem2_indices[0], threshold, "backward", filename)
                if vem1_index is not None:
                    df.loc[vem1_index, 'VEM1'] = 1

            # Find VEM4 (forward crossing) relative to VEM3
            vem3_indices = df[df['VEM3'] == 1].index
            if not vem3_indices.empty:
                vem4_index = find_threshold_crossing(df, vem3_indices[0], threshold, "forward", filename)
                if vem4_index is not None:
                    df.loc[vem4_index, 'VEM4'] = 1

            # Calculate Power and mark VEM5 at the maximum power point (between VEM1 and VEM2)
            vem1_idx = df[df['VEM1'] == 1].index
            vem2_idx = df[df['VEM2'] == 1].index
            if not vem1_idx.empty and not vem2_idx.empty:
                vem1_idx = vem1_idx[0]
                vem2_idx = vem2_idx[0]
                delta_t = df.loc[vem2_idx, 'Time'] - df.loc[vem1_idx, 'Time']
                work = df['Total_GRF'] * df['PeakJumpHeight']
                df.loc[vem1_idx:vem2_idx, 'Power'] = work[vem1_idx:vem2_idx] / delta_t
                max_power_idx = df.loc[vem1_idx:vem2_idx, 'Power'].idxmax()
                df.at[max_power_idx, 'VEM5'] = 1

            # Save the processed file by adding a suffix before the file extension.
            base_name, ext = os.path.splitext(filename)
            output_file_name = f"{base_name}_proc{ext}"
            output_file_path = os.path.join(output_folder, output_file_name)
            df.to_excel(output_file_path, index=False, engine='openpyxl')
            processed_count += 1
        except Exception as e:
            print(f"Error processing {filename}: {e}")
            error_count += 1
            continue

    print(f"\nProcessing Complete:")
    print(f"Files processed: {processed_count}")
    print(f"Files with errors: {error_count}")

# -------- Function for Force Data Analysis --------
def analyze_force_data(file_path):
    """
    Reads a processed XLSX file and calculates the normalized GRF values
    and COM velocity at the VEM5 event.
    """
    # For final processed files, we assume a normal Excel file (no extra header rows)
    df = pd.read_excel(file_path, engine='openpyxl')
    
    # Ensure required columns exist
    required_columns = ['VEM1', 'VEM2', 'VEM3', 'VEM4', 'VEM5', 'Total_GRF', 'COM_Velocity']
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"Column {col} not found in {file_path}")
    
    # Assume the body weight is given by the absolute value of Total_GRF in the first row
    body_weight = abs(df.loc[0, 'Total_GRF'])
    
    # Ensure that the necessary VEM markers are present
    if df[df['VEM1'] == 1].empty or df[df['VEM2'] == 1].empty:
        raise ValueError(f"Missing VEM1 or VEM2 markers in {file_path}")
    if df[df['VEM3'] == 1].empty or df[df['VEM4'] == 1].empty:
        raise ValueError(f"Missing VEM3 or VEM4 markers in {file_path}")
    
    # Determine phase indices
    start_idx_1 = df[df['VEM1'] == 1].index[0]  # Phase 1 start
    end_idx_1 = df[df['VEM2'] == 1].index[0]      # Phase 1 end
    start_idx_2 = df[df['VEM3'] == 1].index[0]      # Phase 2 start
    end_idx_2 = df[df['VEM4'] == 1].index[0]        # Phase 2 end

    # Get the peak (most negative) Total_GRF during each phase
    peak_grf_1 = df['Total_GRF'][start_idx_1:end_idx_1 + 1].max()
    peak_grf_2 = df['Total_GRF'][start_idx_2:end_idx_2 + 1].max()

    # Normalize the peak GRF values by body weight
    normalized_peak_grf_1 = abs(peak_grf_1 / body_weight)
    normalized_peak_grf_2 = abs(peak_grf_2 / body_weight)
    avg_normalized_peak_force_1 = normalized_peak_grf_1  # (a single value)
    percent_difference_1 = 0  # With a single value, this is set to zero
    percent_difference_2 = 0

    # Get GRF and COM velocity at VEM5
    if df[df['VEM5'] == 1].empty:
        raise ValueError(f"Missing VEM5 marker in {file_path}")
    vem5_idx = df[df['VEM5'] == 1].index[0]
    vem5_grf = abs(df.loc[vem5_idx, 'Total_GRF'])
    vem5_com_velocity = df.loc[vem5_idx, 'COM_Velocity']
    normalized_vem5_grf = abs(vem5_grf / body_weight)

    return (avg_normalized_peak_force_1, normalized_peak_grf_1, normalized_peak_grf_2,
            percent_difference_1, percent_difference_2, vem5_grf, vem5_com_velocity, normalized_vem5_grf)

def run_analysis(processed_folder, report_file):
    """
    Loops over all processed XLSX files, analyzes the force data,
    and writes the combined results to an Excel report.
    """
    results = []
    # Loop through all XLSX files in the processed folder
    processed_files = glob.glob(os.path.join(processed_folder, '*.xlsx'))
    for file_path in processed_files:
        try:
            analysis_result = analyze_force_data(file_path)
            # Extract participant initials from the filename (first 3 characters)
            participant_initials = os.path.basename(file_path)[:3]
            results.append([participant_initials] + list(analysis_result))
        except Exception as e:
            print(f"Error processing file {os.path.basename(file_path)}: {e}")
    
    # Define column names for the report
    results_df = pd.DataFrame(results, columns=[
        'Participant', 
        'Avg Normalized Peak Force Phase 1', 
        'Normalized Peak GRF Phase 1', 'Normalized Peak GRF Phase 2', 
        'Percent Difference Phase 1 (%)', 'Percent Difference Phase 2 (%)',
        'GRF at VEM5', 'COM Velocity at VEM5', 'Normalized GRF at VEM5'
    ])
    
    # Ensure the directory for the report exists
    os.makedirs(os.path.dirname(report_file), exist_ok=True)
    
    # Write the results to an Excel file with two sheets:
    # one for individual trials and one for grouped (averaged) results
    with pd.ExcelWriter(report_file, engine='openpyxl') as writer:
        results_df.to_excel(writer, sheet_name='Individual Trials', index=False)
        grouped_results = results_df.groupby('Participant').mean().reset_index()
        grouped_results.to_excel(writer, sheet_name='Averaged by Participant', index=False)
    
    print(f"\nAnalysis report generated and saved to {report_file}")

# ----------------- Combined Pipeline Runner -----------------
def run_pipeline():
    print("Stage 1: Converting TXT files to XLSX...")
    batch_convert_txt_to_xlsx(TXT_INPUT_FOLDER, XLSX_INTERMEDIATE_FOLDER)
    
    print("\nStage 2: Processing XLSX files...")
    process_files(XLSX_INTERMEDIATE_FOLDER, PROCESSED_XLSX_FOLDER)
    
    print("\nStage 3: Running Force Data Analysis on Processed Files...")
    run_analysis(PROCESSED_XLSX_FOLDER, ANALYSIS_REPORT_FILE)
    
    print("\nComplete pipeline finished!")

if __name__ == "__main__":
    run_pipeline()


Stage 1: Converting TXT files to XLSX...
Converted: /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-TXT/CAV_4-16-25_VJ_T2 - PowerVJ.txt -> /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-INT/CAV_4-16-25_VJ_T2 - PowerVJ.xlsx
Converted: /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-TXT/JTH_4-16-25_VJ_T1 - PowerVJ.txt -> /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-INT/JTH_4-16-25_VJ_T1 - PowerVJ.xlsx
Converted: /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-TXT/LRB_4-16-25_VJ_T2 - PowerVJ.txt -> /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-INT/LRB_4-16-25_VJ_T2 - PowerVJ.xlsx
Converted: /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-TXT/JTH_4-16-25_VJ_T2 - PowerVJ.txt -> /Users/brenthokeness/Desktop/BiomechData/Fall2024Baseball-VJTesting/BBVJAPR16-INT/JTH_4-16-25_VJ_T2 - 