In [12]:
import os

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import windaq as wdq

from scipy.signal import filtfilt, butter, find_peaks
from tkinter import Tk, filedialog
from collections import defaultdict
from openpyxl import load_workbook
from openpyxl.drawing.image import Image

# Helper Functions
def select_folder_and_find_files(extension=".WDH"):
    """
    Prompts the user to select a folder and retrieves all files with the specified extension.
    """
    root = Tk()
    root.withdraw()  # Hide Tkinter root window
    folder_path = filedialog.askdirectory(title="Select a Folder")
    if not folder_path:
        print("No folder selected.")
        return []
    
    matching_files = []
    for root_dir, _, files in os.walk(folder_path):
        for file in files:
            if file.endswith(extension):
                matching_files.append(os.path.join(root_dir, file))
    return matching_files

def wdh_to_df(file):
    """
    Converts a Windaq file to a DataFrame with specific columns.
    """
    wfile = wdq.windaq(file)
    return pd.DataFrame({
        'time, s': wfile.time(),
        'speed, rpm': wfile.data(3),
        'torque, Nm': wfile.data(1),
        'temp, degF': wfile.data(2)
    })

def determine_test_start(df, speed_threshold=1):
    """
    Determines the start point of the test based on speed exceeding a threshold.
    """
    start_index = df[df['speed, rpm'] > speed_threshold].index.min()
    if pd.isna(start_index):
        print("No test start detected.")
        return None
    start_time = df.loc[start_index, 'time, s']
    print(f"Test start detected at time: {start_time} seconds")
    return start_time

def evaluate_torque_at_stages(df, torque_steps):
    """
    Calculates the average filtered torque for specified torque steps.

    Parameters:
    - df: DataFrame containing test data.
    - torque_steps: List of dictionaries with 'start_time' and 'duration'.

    Returns:
    - A dictionary with step names and corresponding average filtered torque.
    """
    results = {}
    for step in torque_steps:
        step_start_time = step['start_time']
        step_end_time = step_start_time + step['duration']
        
        # Filter data within the step time range
        step_data = df[(df['time, s'] >= step_start_time) & (df['time, s'] <= step_end_time)]
        
        # Calculate average of filtered torque
        avg_torque = step_data['torque, Nm (filtered)'].mean()
        results[step['name']] = avg_torque
        
        print(f"{step['name']} avg torque: {avg_torque:.3f} Nm")
    return results

def plot_filter_torque_stand_data(df, test_name, start_time=0, duration=0, torque_lim=(-1.0, 1.0), speed_lim=(0, 2750), temp_lim=(70, 200), time_lim=(0, 1120)):
    """
    Plots speed, torque, and filtered torque data with legends and minor gridlines.
    """

    if duration:
        stop_time = start_time + duration
    else:
        stop_time = 0
    # Filter the dataframe for the specified time range
    df_plot = df[(df['time, s'] >= start_time) & (df['time, s'] <= stop_time)] if stop_time else df

    # Create subplots
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(16, 12), sharex=True)
    fig.suptitle(test_name, fontsize=16, y=0.99)

    # Plot Speed
    ax1.plot(df_plot['time, s'], df_plot['speed, rpm'], label='Speed', color="red")
    ax1.set_ylim(speed_lim)
    ax1.set_xlim(time_lim)
    ax1.set_ylabel("Speed, rpm", color="red")
    ax1.grid(which='major', linestyle='-', linewidth=0.75, alpha=0.7)
    ax1.grid(which='minor', linestyle='--', linewidth=0.5, alpha=0.5)
    ax1.legend(loc="upper left")
    ax1.xaxis.set_minor_locator(ticker.AutoMinorLocator(5))
    ax1.yaxis.set_minor_locator(ticker.AutoMinorLocator(5))

    # Plot Torque
    ax2.plot(df_plot['time, s'], df_plot['torque, Nm'], label='Original Torque', color="red")
    ax2.plot(df_plot['time, s'], df_plot['torque, Nm (filtered)'], label='Filtered Torque', color="blue")
    ax2.set_ylim(torque_lim)
    ax2.set_xlim(time_lim)
    ax2.set_ylabel("Torque, Nm")
    ax2.set_xlabel("Time, sec")
    ax2.grid(which='major', linestyle='-', linewidth=0.75, alpha=0.7)
    ax2.grid(which='minor', linestyle='--', linewidth=0.5, alpha=0.5)
    ax2.legend(loc="upper left")
    ax2.xaxis.set_minor_locator(ticker.AutoMinorLocator(5))
    ax2.yaxis.set_minor_locator(ticker.AutoMinorLocator(5))

    # Make the gridline at y=0 fully black
    ax2.axhline(0.0, color='grey', linewidth=0.75, linestyle='-')  # Custom line at y=0

    # Add Temperature on Secondary Y-Axis
    ax_temp = ax2.twinx()
    ax_temp.plot(df_plot['time, s'], df_plot['temp, degF'], label='Temperature', color="green", alpha=0.3)
    ax_temp.set_ylim(70, 200)
    ax_temp.set_ylabel("Temperature, degF", color="green")
    ax_temp.legend(loc="upper right")

    # Final layout adjustments
    plt.tight_layout(rect=[0, 0, 1, 0.99])  # Adjust top margin to fit the title
    return plt

def plot_speed_vs_torque(sweep_data, output_path):
    """
    Plots combined speed vs filtered torque for sweep data.

    Parameters:
    - sweep_data: Combined DataFrame containing speed and filtered torque for all sweeps.
    - output_path: Path to save the plot.
    """
    plt.figure(figsize=(10, 6))
    plt.plot(sweep_data["speed, rpm"], sweep_data["torque, Nm (filtered)"], label="Combined Sweeps")
    plt.title("Speed vs Filtered Torque (Combined)")
    plt.xlabel("Speed (RPM)")
    plt.ylabel("Filtered Torque (Nm)")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()
    print(f"Combined Speed vs Torque plot saved to: {output_path}")

def plot_speed_vs_torque_multiple_tests(data_files, output_path, max_speed_view=2000):
    """
    Plots speed vs filtered torque for multiple tests on the same plot.
    Speeds are assumed to have proper directionality (positive for CW, negative for CCW).
    
    Parameters:
    - data_files: Dictionary of DataFrames with test names as keys.
    - output_path: Path to save the combined plot.
    """
    plt.figure(figsize=(12, 8))

    for test_name, df in data_files.items():
        plt.plot(df['speed, rpm'], df['torque, Nm (filtered)'], label=test_name)

    plt.xlim(-max_speed_view, max_speed_view)
    plt.title("Speed vs Filtered Torque for All Tests")
    plt.xlabel("Speed (RPM)")
    plt.ylabel("Filtered Torque (Nm)")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()
    print(f"Combined Speed vs Torque plot saved to: {output_path}")

def extract_and_combine_sweeps(df, test_steps):
    """
    Extracts all sweeps (CW and CCW) from the DataFrame and combines them.

    Parameters:
    - df: DataFrame containing the test data.
    - test_steps: List of test step definitions.
    - start_time: Time when the test starts.

    Returns:
    - A combined DataFrame containing speed and filtered torque for all sweeps.
    """
    combined_sweeps = pd.DataFrame()

    for step in test_steps:
        step_start_time = step['start_time']  # Align step times to detected start
        step_end_time = step_start_time + step['duration']
        print(f"Extracting {step['name']} from {step_start_time}s to {step_end_time}s")

        # Extract data for this sweep step
        sweep_data = df[(df['time, s'] >= step_start_time) & (df['time, s'] <= step_end_time)].copy()

        # Adjust speed for CCW sweeps
        if step['direction'] == "CCW":
            sweep_data["speed, rpm"] = -sweep_data["speed, rpm"]

        combined_sweeps = pd.concat([combined_sweeps, sweep_data], ignore_index=True)

    return combined_sweeps

# Signal Processing Functions
def analyze_fft(df, column, sampling_rate, start_time=None, stop_time=None, sort_by_magnitude=False):
    """
    Performs FFT analysis on a specified column of the dataframe.
    """
    # Filter by time range
    if start_time or stop_time:
        df = df[(df['time, s'] >= (start_time if start_time else df['time, s'].min())) & 
                (df['time, s'] <= (stop_time if stop_time else df['time, s'].max()))]

    # Perform FFT
    data = df[column] - df[column].mean()  # Remove DC offset
    fft_result = np.fft.fft(data)
    fft_magnitude = np.abs(fft_result)[:len(data) // 2]
    fft_freqs = np.fft.fftfreq(len(data), d=1/sampling_rate)[:len(data) // 2]

    # Find peaks in the FFT magnitude data
    peaks, _ = find_peaks(fft_magnitude, height=1)  # Adjust height parameter as needed

    # Sort peaks by magnitude
    peak_magnitudes = fft_magnitude[peaks]
    sorted_indices = np.argsort(peak_magnitudes)[::-1]  # Sort indices by magnitude descending
    sorted_peaks = peaks[sorted_indices]

    # Print the frequencies and magnitudes of the identified peaks
    print("Identified peaks (sorted by magnitude):")
    for peak in sorted_peaks:
        if 34 < fft_freqs[peak] < 37:  # Only consider frequencies above 5 Hz
            print(f"Frequency: {fft_freqs[peak]:.2f} Hz, Magnitude: {fft_magnitude[peak]:.2f}")

    return fft_freqs, fft_magnitude

def lowpass_filter(data, sampling_rate, cutoff_freq, order=4):
    """
    Applies a low-pass filter to the data.
    """
    nyquist = 0.5 * sampling_rate
    normal_cutoff = cutoff_freq / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return filtfilt(b, a, data)

def plot_fft(df, column, sampling_rate, test_name, start_time=None, stop_time=None, sort_by_magnitude=False):
    """
    Plots the FFT of a specified column of the dataframe within a given time range.
    """
    fft_freqs, fft_magnitude = analyze_fft(df, column, sampling_rate, start_time, stop_time, sort_by_magnitude)
    
    plt.figure(figsize=(10, 6))
    plt.plot(fft_freqs, fft_magnitude)
    plt.title(f"{test_name} - FFT of {column}")
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Magnitude")
    plt.grid(True)
    plt.show()

def save_torque_averages_to_excel_old(all_torque_averages, file_path):
    """
    Saves torque averages from all tests to an Excel file.

    Parameters:
    - all_torque_averages: Dictionary of test names with torque averages for steps.
    - output_path: Path to save the Excel file.
    """
    # Convert the nested dictionary into a structured DataFrame
    rows = []
    for test_name, step_averages in all_torque_averages.items():
        for step_name, avg_torque in step_averages.items():
            rows.append({"Test Name": test_name, "Step Name": step_name, "Avg Torque (Nm)": avg_torque})
    
    df = pd.DataFrame(rows)
    df.to_excel(file_path, index=False, engine="xlsxwriter")
    print(f"Torque averages saved to Excel file: {file_path}")

def save_torque_averages_to_excel(all_torque_averages, file_path):
    """
    Saves torque averages from all tests to an Excel file and plots the data.

    Parameters:
    - all_torque_averages: Dictionary of test names with torque averages for steps.
    - output_path: Path to save the Excel file.
    """
    # Convert the nested dictionary into a structured DataFrame
    rows = []
    for test_name, step_averages in all_torque_averages.items():
        for step_name, avg_torque in step_averages.items():
            rows.append({"Test Name": test_name, "Step Name": step_name, "Avg Torque (Nm)": avg_torque})
    
    df = pd.DataFrame(rows)
    df.to_excel(file_path, index=False, engine="xlsxwriter")
    print(f"Torque averages saved to Excel file: {file_path}")

    # Plot the torque averages using matplotlib
    plt.figure(figsize=(12, 8))
    step_names = df['Step Name'].unique()
    test_names = df['Test Name'].unique()
    bar_width = 0.8 / len(test_names)
    index = np.arange(len(step_names))

    for i, test_name in enumerate(test_names):
        subset = df[df['Test Name'] == test_name]
        plt.bar(index + i * bar_width, subset['Avg Torque (Nm)'].abs(), bar_width, label=test_name)

    plt.xlabel('Step Name')
    plt.ylabel('Avg Torque (Nm)')
    plt.title('Torque Averages')
    plt.xticks(index + bar_width * (len(test_names) - 1) / 2, step_names)
    plt.legend(title='Test Name')

    # Add major gridlines
    plt.grid(which='major', linestyle='-', linewidth='0.5', color='grey')

    plot_path = file_path.replace('.xlsx', '.png')
    plt.savefig(plot_path)
    plt.close()
    print(f"Torque averages plot saved to: {plot_path}")

    # Insert the plot into the Excel sheet
    workbook = load_workbook(file_path)
    sheet = workbook.active
    img = Image(plot_path)
    sheet.add_image(img, 'E1')  # Adjust the cell location as needed
    workbook.save(file_path)
    print(f"Plot inserted into Excel file: {file_path}")


In [15]:
files = select_folder_and_find_files()
for file in files:
    print(f"Processing file: {file}")
    df = wdh_to_df(file)

    # Filter the torque data
    df['torque, Nm (filtered)'] = lowpass_filter(df['torque, Nm'], sampling_rate=1000, cutoff_freq=10)

    # Determine test start time
    test_start_time = determine_test_start(df)
    if test_start_time is None:
        continue  # Skip this file if no test start is detected

    # drop all data before the test start time and re-zero the time column
    df = df[df['time, s'] >= test_start_time].reset_index(drop=True)
    df['time, s'] = df['time, s'] - df['time, s'].iloc[0]

    # plot the filtered torque data
    test_name = os.path.basename(file).replace('.WDH', '')
    plot = plot_filter_torque_stand_data(df, test_name, start_time=0, duration=0, torque_lim=(0, 1.0), speed_lim=(0, 2750), temp_lim=(70, 200), time_lim=(0, 1200))
    plot_path = os.path.join(os.path.dirname(file), f"{test_name}_filtered_torque.png")
    plot.savefig(plot_path)
    plot.close()

    df.to_csv(os.path.join(os.path.dirname(file), f"{test_name}_filtered_torque.csv"), index=False)
    print(f"Filtered torque data saved to: {os.path.join(os.path.dirname(file), f'{test_name}_filtered_torque.csv')}")

Processing file: D:/Dana Torque\Sample 2 Dry-Wet.WDH
Test start detected at time: 4.01 seconds
Filtered torque data saved to: D:/Dana Torque\Sample 2 Dry-Wet_filtered_torque.csv
Processing file: D:/Dana Torque\Sample 3 Dry-wet.WDH
Test start detected at time: 2.87 seconds
Filtered torque data saved to: D:/Dana Torque\Sample 3 Dry-wet_filtered_torque.csv
Processing file: D:/Dana Torque\Sample 4 Dry-wet.WDH
Test start detected at time: 10.77 seconds
Filtered torque data saved to: D:/Dana Torque\Sample 4 Dry-wet_filtered_torque.csv
Processing file: D:/Dana Torque\Sample 5 Dry-wet.WDH
Test start detected at time: 2.1 seconds
Filtered torque data saved to: D:/Dana Torque\Sample 5 Dry-wet_filtered_torque.csv
Processing file: D:/Dana Torque\Sample 6 Dry-wet.WDH
Test start detected at time: 4.11 seconds
Filtered torque data saved to: D:/Dana Torque\Sample 6 Dry-wet_filtered_torque.csv
