Play para incluir os inputs

In [20]:
# By Edilson Borba 
    # borba.edi@gmail.com
        # If you need suport, send me an email. I'm happy to help

# Import necessary libraries
import os
import pandas as pd
from scipy.signal import find_peaks, savgol_filter
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta
import ipywidgets as widgets
from IPython.display import display
import asyncio
from ipyfilechooser import FileChooser

# Function to calculate the dominant frequency of the signal
def calculate_frequency(signal, Frequency):
    n = len(signal)
    yf = np.fft.fft(signal)
    xf = np.fft.fftfreq(n, 1 / Frequency)
    idx = np.argmax(np.abs(yf))
    frequency = np.abs(xf[idx])
    return frequency, xf, yf

# Function to calculate the median frequency of the signal
def calculate_median_frequency(yf, xf):
    power_spectrum = np.abs(yf)**2
    cumulative_power = np.cumsum(power_spectrum)
    total_power = cumulative_power[-1]
    median_idx = np.where(cumulative_power >= total_power / 2)[0][0]
    median_frequency = np.abs(xf[median_idx])
    return median_frequency

Cropping_Duration = 30  # Define the center cropping duration in seconds - For example, crop to a 30-second duration

Window_Duration = 5 # Time in seconds for each analysis window

# Additional Parameters
Min_Distance = 20 # Minimum distance between points 
Positive_Height = 0.2 # Minimum cut-off line for marking positive points 
Negative_Height = 0.2 # Minimum cut-off line for marking negative points

# Applied Filter (Savitzky-Golay - Schafer, R. W. (2011). What is a Savitzky-Golay filter?. IEEE Signal processing magazine, 28(4), 111-117)
Savi_Length = 7  # The length of the Savitzky-Golay filter window (Always odd numbers and greater than polyorder)
Savi_Order = 3 # The order of the Savitzky-Golay filter

# Function to create a file chooser widget
def create_file_chooser(description):
    label = widgets.Label(description)
    chooser = FileChooser()
    return widgets.VBox([label, chooser]), chooser

# Create input widgets
Name = widgets.Text(description='Name:')
Leg = widgets.Dropdown(options=['Right', 'Left'], description='Leg:')
Data_Used = widgets.Dropdown(options=['Gyro', 'Accel'], description='Data Used:')
file_input_container, file_input_chooser = create_file_chooser('Select Input File')
file_output_container, file_output_chooser = create_file_chooser('Select Output Path')

global Name
global Leg
global Data_Used

def on_button_click(button):
    global File_Input, File_Output  # Declare File_Input as global if it's used elsewhere outside this function
    selected_input_file = file_input_chooser.selected
    selected_output_file = file_output_chooser.selected
    if selected_input_file and selected_output_file:
        File_Input = selected_input_file  # Set File_Input as the selected input file
        File_Output = selected_output_file
        print(f"Name: {Name.value}")
        print(f"Leg: {Leg.value}")
        print(f"Data Used: {Data_Used.value}")
        print(f"Selected Input File: {selected_input_file}")
        print(f"Selected Output Path: {selected_output_file}")

        
        # Reading the .csv file using pandas
        File_Output_Name = f"{Name.value}.xlsx"
        File_Path_Full = os.path.join(selected_input_file)
        df = pd.read_csv(File_Path_Full)

        # Saving the Data Frame as an Excel file
        global File_Output_Path_Full
        File_Output_Path_Full = os.path.join(selected_output_file, File_Output_Name)
        df.to_excel(File_Output_Path_Full, index=False)
        print(f"Excel file saved at: {File_Output_Path_Full}")
        
        # Reading the saved Excel file
        global data
        data = pd.read_excel(File_Output_Path_Full)
        
        # Continue with the rest of the code after confirmation
        print("Continuing with the rest of the code...")
    else:
        print("Please select both input and output files")

# Create a button widget
display_button = widgets.Button(description="Confirm and Continue")
display_button.on_click(on_button_click)

# Group widgets into a single container
container = widgets.VBox([Name, Leg, Data_Used, file_input_container, file_output_container, display_button])

# Display the container
display(container)

VBox(children=(Text(value='', description='Name:'), Dropdown(description='Leg:', options=('Right', 'Left'), va…

Play para realizar a análise

In [None]:
# Convert data to numpy array
Data = np.array(data)

# Extract columns
Time = Data[:,0]
Accelerometer = Data[:,12]
Gyroscope = Data[:,14]

# Convert the inverted Accelerometer data from g to m/s^2
Accelerometer = Accelerometer * -9.81
offset_value = Accelerometer[0]
Accelerometer = Accelerometer - offset_value

# List to store processed times
processed_times = []

# Iterate over each element in the Time list
for t in Time:
    # Split the string using "T" as separator and take the second part (after "T")
    processed_time = t.split("T")[1]
    # Remove the time zone offset (-03:00)
    processed_time = processed_time.split("-")[0]
    # Add the processed time to the list
    processed_times.append(processed_time)

# Convert the processed times into datetime objects
datetime_times = [datetime.strptime(time, "%H:%M:%S.%f") for time in processed_times]

# Calculate the time differences between each consecutive pair of times
differences = [datetime_times[i+1] - datetime_times[i] for i in range(len(datetime_times)-1)]

# Calculate the mean of the time differences in seconds
total_difference = sum(differences, timedelta())
mean_difference = total_difference.total_seconds() / len(differences)

# Calculate frequency based on mean time difference
Time_Diff = mean_difference
Frequency = 1 / Time_Diff

# Apply the Savitzky-Golay filter to the accelerometer data
Accelerometer_filtered = savgol_filter(Accelerometer, Savi_Length, Savi_Order)

# Apply the Savitzky-Golay filter to the gyroscope data
Gyro_filtered = savgol_filter(Gyroscope, Savi_Length, Savi_Order)

if Data_Used == 'Accel':
    Filtered_Data = Accelerometer_filtered
else:
    Filtered_Data = Gyro_filtered
    
if Data_Used == 'Accel':
    Positive_Height *= 10
    Negative_Height *= 10
else:
    pass

peaks, _ = find_peaks(Filtered_Data, height=Positive_Height, distance=Min_Distance)  # Limit the positive threshold
neg_peaks, _ = find_peaks(-Filtered_Data, height=Negative_Height, distance=Min_Distance) # Limit the negative threshold

plt.figure(figsize=(13, 5))
plt.plot(Filtered_Data, label='Signal')
plt.axhline(y=0, color='black', linewidth=0.6)
plt.plot(peaks, Filtered_Data[peaks], 'gx', label='Peaks')
plt.plot(neg_peaks, Filtered_Data[neg_peaks], 'rx', label='Troughs')

zero_crossings = np.where(np.diff(np.sign(Filtered_Data)))[0]

onset_times = []
completion_times = []
zero_after_peak_times = []
ascent_times = []
descent_times = []
cycle_durations = []
transition_times = []

# Find peaks
for peak in peaks:
    # Find the onset time (last zero crossing before the peak)
    zero_before = zero_crossings[zero_crossings < peak]
    if zero_before.size > 0:
        onset_time = zero_before[-1]
        onset_times.append(onset_time)
        plt.plot(onset_time, 0, 'o', color='green', label='Onset', markersize=5)

        # Find the next trough after the peak
        troughs_after_peak = neg_peaks[neg_peaks > peak]
        if troughs_after_peak.size > 0:
            trough = troughs_after_peak[0]
            # Find the completion time (first zero crossing after the trough)
            zero_after = zero_crossings[zero_crossings > trough]
            if zero_after.size > 0:
                completion_time = zero_after[0]
                completion_times.append(completion_time)
                plt.plot(completion_time, 0, 'o', color='red', label='Completion', markersize=5)

    # Find the first zero crossing after the peak
    zero_after_peak = zero_crossings[zero_crossings > peak]
    if zero_after_peak.size > 0:
        first_zero_after_peak = zero_after_peak[0]
        zero_after_peak_times.append(first_zero_after_peak)
        plt.plot(first_zero_after_peak, 0, 'o', color='black', label='Intermediate', markersize=5)

for i, peak in enumerate(peaks):
    plt.text(peak, Filtered_Data[peak] + (max(Filtered_Data) - min(Filtered_Data)) * 0.03, str(i + 1),
            color='black', fontsize=9, ha='center')

peak_count = len(peaks)
print(f"Number total of cycles: {peak_count}")

if onset_times:
    first_onset_time = onset_times[0]
    cutoff_index = first_onset_time + int(Cropping_Duration * Frequency)
    Filtered_Data = Filtered_Data[first_onset_time:cutoff_index]

peaks, _ = find_peaks(Filtered_Data, height=Positive_Height, distance=Min_Distance)  # Limit the positive threshold
neg_peaks, _ = find_peaks(-Filtered_Data, height=Negative_Height, distance=Min_Distance) # Limit the negative threshold

plt.figure(figsize=(13, 5))
plt.plot(Filtered_Data, label='Signal')
plt.axhline(y=0, color='black', linewidth=0.6)
plt.plot(peaks, Filtered_Data[peaks], 'gx', label='Peaks')
plt.plot(neg_peaks, Filtered_Data[neg_peaks], 'rx', label='Troughs')

zero_crossings = np.where(np.diff(np.sign(Filtered_Data)))[0]

onset_times = []
completion_times = []
zero_after_peak_times = []
ascent_times = []
descent_times = []
cycle_durations = []
transition_times = []

# Find peaks
for peak in peaks:
    # Find the onset time (last zero crossing before the peak)
    zero_before = zero_crossings[zero_crossings < peak]
    if zero_before.size > 0:
        onset_time = zero_before[-1]
        onset_times.append(onset_time)
        plt.plot(onset_time, 0, 'o', color='green', label='Onset', markersize=5)

        # Find the next trough after the peak
        troughs_after_peak = neg_peaks[neg_peaks > peak]
        if troughs_after_peak.size > 0:
            trough = troughs_after_peak[0]
            # Find the completion time (first zero crossing after the trough)
            zero_after = zero_crossings[zero_crossings > trough]
            if zero_after.size > 0:
                completion_time = zero_after[0]
                completion_times.append(completion_time)
                plt.plot(completion_time, 0, 'o', color='red', label='Completion', markersize=5)

    # Find the first zero crossing after the peak
    zero_after_peak = zero_crossings[zero_crossings > peak]
    if zero_after_peak.size > 0:
        first_zero_after_peak = zero_after_peak[0]
        zero_after_peak_times.append(first_zero_after_peak)
        plt.plot(first_zero_after_peak, 0, 'o', color='black', label='Intermediate', markersize=5)

for i, peak in enumerate(peaks):
    plt.text(peak, Filtered_Data[peak] + (max(Filtered_Data) - min(Filtered_Data)) * 0.03, str(i + 1),
            color='black', fontsize=9, ha='center')

# Remove duplicate legend entries
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))
plt.legend(by_label.values(), by_label.keys(), loc='upper left', bbox_to_anchor=(0.989, 1), frameon=False)

# Calculate ascent and descent times
for i in range(min(len(onset_times), len(zero_after_peak_times), len(completion_times))):
    ascent_time = (zero_after_peak_times[i] - onset_times[i]) / Frequency
    descent_time = (completion_times[i] - zero_after_peak_times[i]) / Frequency
    ascent_times.append(ascent_time)
    descent_times.append(descent_time)

# Calculate the average ascent and descent times
avg_ascent_time = sum(ascent_times) / len(ascent_times) if ascent_times else 0
avg_descent_time = sum(descent_times) / len(descent_times) if descent_times else 0

# Calculate the average time difference between each onset and the subsequent completion
if len(onset_times) > 0 and len(completion_times) > 0:
    time_differences = [completion_times[i] - onset_times[i] for i in
                        range(min(len(onset_times), len(completion_times)))]

    # Filter out any negative differences
    time_differences = [td for td in time_differences if td > 0]
    if time_differences:
        average_time_difference = sum(time_differences) / len(time_differences)
        average_time_difference_seconds = average_time_difference / Frequency
        #print(f"Average cycle duration: {average_time_difference_seconds:.2f}s")
        cycle_durations = [td / Frequency for td in time_differences]
    else:
        average_time_difference_seconds = 0

# Calculate the transition times starting from the second cycle
for i in range(1, len(completion_times)):
    transition_time = (onset_times[i] - completion_times[i - 1]) / Frequency
    transition_times.append(transition_time)

# Calculate the average transition time
avg_transition_time = sum(transition_times) / len(transition_times) if transition_times else 0

# Calculate the standard deviation and CV for cycle durations
avg_cycle_duration = average_time_difference_seconds
std_cycle_duration = np.std(cycle_durations) if cycle_durations else 0
cv_cycle_duration = std_cycle_duration / avg_cycle_duration * 100 if avg_cycle_duration else 0

# Calculate the standard deviation for ascent, descent, and transition times
std_ascent_time = np.std(ascent_times) if ascent_times else 0
std_descent_time = np.std(descent_times) if descent_times else 0
std_transition_time = np.std(transition_times) if transition_times else 0

# Calculate the coefficient of variation (CV) for ascent, descent, and transition times
cv_ascent_time = std_ascent_time / avg_ascent_time * 100 if avg_ascent_time else 0
cv_descent_time = std_descent_time / avg_descent_time * 100 if avg_descent_time else 0
cv_transition_time = std_transition_time / avg_transition_time * 100 if avg_transition_time else 0

avg_peak_value = sum(Filtered_Data[peaks]) / len(peaks) if len(peaks) > 0 else 0
avg_trough_value = sum(Filtered_Data[neg_peaks]) / len(neg_peaks) if len(neg_peaks) > 0 else 0
peak_count = len(peaks)
trough_count = len(neg_peaks)

# Calculate the standard deviation and CV for peak and trough accelerations
std_peak_value = np.std(Filtered_Data[peaks]) if len(peaks) > 0 else 0
std_trough_value = np.std(Filtered_Data[neg_peaks]) if len(neg_peaks) > 0 else 0
cv_peak_value = std_peak_value / avg_peak_value * 100 if avg_peak_value else 0
cv_trough_value = std_trough_value / avg_trough_value * 100 if avg_trough_value else 0

plt.title(f'\nMean Values: Peak {avg_peak_value:.3f} | Trough: {avg_trough_value:.3f} | Cycle: {average_time_difference_seconds:.3f}s | Ascent: {avg_ascent_time:.3f}s | Descent: {avg_descent_time:.3f}s | Transition: {avg_transition_time:.3f}s')

plt.xlabel('')
plt.ylabel('')
plt.show()
print(f"Number of cycles: {peak_count}")

# Determine the number of samples per window
Samples_Window = int(Window_Duration * Frequency)

# Calculate the number of windows
Num_Windows = len(Filtered_Data) // Samples_Window

# Lists to store median amplitude and frequency for each window
median_amplitudes = []
median_frequencies = []

# Lists to store peak frequency and amplitude for each window
peak_frequencies = []
peak_amplitudes = []

# Figure for subplots
fig, axs = plt.subplots(Num_Windows, 2, figsize=(12, 2 * Num_Windows))

# Fourier analysis in 5-second windows
for i in range(Num_Windows):
    start_idx = i * Samples_Window
    end_idx = start_idx + Samples_Window
    window_signal = Filtered_Data[start_idx:end_idx]
    
    # Calculate time for the current window
    t = np.linspace(0, Window_Duration, Samples_Window, endpoint=False)
    
    # Calculate the dominant frequency of the signal
    calculated_freq, xf, yf = calculate_frequency(window_signal, Frequency)
    
    # Find the index of the peak frequency
    peak_idx = np.argmax(np.abs(yf))
    peak_frequency = xf[peak_idx]
    peak_amplitude = np.abs(yf)[peak_idx]
    if peak_frequency < 0:
        peak_frequency = peak_frequency *-1
    else:
        pass

    # Add peak frequency and amplitude to the lists
    peak_frequencies.append(peak_frequency)
    peak_amplitudes.append(peak_amplitude)
    
    # Calculate the median frequency of the signal
    median_frequency = calculate_median_frequency(yf, xf)
    median_frequencies.append(median_frequency)
    
    # Calculate the median amplitude of the signal
    median_amplitude = np.median(window_signal)
    median_amplitudes.append(median_amplitude)
    
    # Plot of the signal in the time domain
    plt.subplots_adjust(hspace = 0.8, wspace = 0.2)
    
    axs[i, 0].plot(t, window_signal)
    axs[i, 0].set_title(f'Signal in Time Domain - Window {i+1}')
    axs[i, 0].set_xlabel('Time [s]')
    axs[i, 0].set_ylabel('Amplitude')
    
    # Plot of the signal in the frequency domain
    axs[i, 1].plot(xf, np.abs(yf))
    axs[i, 1].axvline(peak_frequency, color='r', linestyle='--')  # Add a vertical line at the peak frequency
    axs[i, 1].set_title(f'Signal in Frequency Domain - Window {i+1}')
    axs[i, 1].set_xlabel('Frequency [Hz]')
    axs[i, 1].set_ylabel('Magnitude')
    axs[i, 1].set_xlim(0, Frequency / 2)

fig, ax1 = plt.subplots(figsize=(10, 5))

color = 'tab:blue'
ax1.set_xlabel('Window')
ax1.set_ylabel('Peak Frequency (Hz)', color=color)
ax1.plot(range(1, Num_Windows + 1), peak_frequencies, color=color, marker='o')
ax1.tick_params(axis='y', labelcolor=color)

ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis

color = 'tab:red'
ax2.set_ylabel('Peak Amplitude', color=color)  # we already handled the x-label with ax1
ax2.plot(range(1, Num_Windows + 1), peak_amplitudes, color=color, marker='o')
ax2.tick_params(axis='y', labelcolor=color)

fig.tight_layout()  # otherwise the right y-label is slightly clipped
plt.title('Peak Frequencies and Amplitudes per Window')
plt.grid(True)
plt.show()

# Defining the DataFrame with the data
df_Export_Data = pd.DataFrame({
    'Name': [Name.value],
    'Leg': [Leg.value],
    'Frequency': [round(Frequency, 3)],
    'Delta Time': [round(Time_Diff, 3)],
    'Used Data': [Data_Used.value],
    'Number of Cycles': [peak_count],
    'Transition Time': [round(avg_transition_time, 3)],
    'Cycle Duration': [round(avg_cycle_duration, 3)],
    'SD Cycle Duration': [round(std_cycle_duration, 3)],
    'Ascent Time': [round(avg_ascent_time, 3)],
    'SD Ascent Time': [round(std_ascent_time, 3)],
    'Descent Time': [round(avg_descent_time, 3)],
    'SD Descent Time': [round(std_descent_time, 3)],
    'Peak': [round(avg_peak_value, 3)],
    'SD Peak': [round(std_peak_value, 3)],
    'Trough': [round(avg_trough_value, 3)],
    'SD Trough': [round(std_trough_value, 3)],
    **{f'Median Amplitude {i}': round(amp, 3) for i, amp in enumerate(median_amplitudes, 1)},
    **{f'Median Frequency {i}': round(freq, 3) for i, freq in enumerate(median_frequencies, 1)},
    **{f'Peak Frequency {i}': round(freq, 3) for i, freq in enumerate(peak_frequencies, 1)},
    **{f'Peak Amplitude {i}': round(amp, 3) for i, amp in enumerate(peak_amplitudes, 1)}
})

file_name = f"{Name.value}_{Leg.value}_{Data_Used.value}.xlsx"
file_output_path_full = os.path.join(File_Output, file_name)

# Saving the DataFrame as an XLSX file
df_Export_Data.to_excel(file_output_path_full, index=False)

print(f'Exportado para {file_output_path_full}')