# SG Test Data Processing Tool - v1

### Import libraries

In [37]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display
from scipy.signal import butter, filtfilt


### Sample Data

In [38]:
# region

# Data generation
sampling_rate = 200  # 200 Hz
total_duration = 100  # Total duration in seconds
num_steps = 5  # Number of steps in the static loading test
base_noise_level = 0.1  # Base noise level
num_channels = 3  # Number of channels

# Generate time data
time = np.arange(0, total_duration, 1/sampling_rate)
base_strain = np.linspace(0, 1, num=num_steps).repeat(len(time) / num_steps)

# Create DataFrame with multiple channels
df = pd.DataFrame({'Time': time})
for channel in range(num_channels):
    noise_level = base_noise_level * (1 + 0.5 * channel)
    amplitude = 1 + 0.2 * channel
    channel_noise = np.random.normal(0, noise_level, size=base_strain.shape)
    channel_strain = amplitude * base_strain + channel_noise
    phase_shift = np.pi / 4 * channel
    channel_strain = channel_strain * np.cos(time + phase_shift)
    df[f'Strain_Channel_{channel + 1}'] = channel_strain
df.set_index('Time', inplace=True)

# endregion

### Select the CSV file of original test data

In [39]:
# Selecting the files through a dialog box
# region

# import sys
# from PyQt5.QtWidgets import QApplication, QFileDialog

# # Initialize the application
# app = QApplication(sys.argv)

# # File selection for raw test data of strain gauge rosettes
# file_path_measured, _ = QFileDialog().getOpenFileName(None, 'Open test data files', '', 'All Files (*);;CSV Files (*.csv)')

# # Check if a file was selected for test data
# if file_path_measured:
#     print("Selected test data:", file_path_measured)
# else:
#     print("No test data file selected. The program will quit...")
#     exit()

# endregion

### Parse the test data

In [40]:
# region

# if file_path_measured:

#     if "_raw_format" in file_path_measured:
#         data = pd.read_csv(file_path_measured)
#         data.drop([0,2,3,4,5,6], inplace=True)
#         data.columns=data.iloc[0]
#         new_columns = data.columns.tolist()
#         new_columns[1] = 'Time'
#         data.columns = new_columns
#         data.drop([1], inplace=True)
#         data.drop(data.columns[0], axis=1, inplace=True)
#         time = data['Time']
#         df = data.iloc[:, 1:].filter(regex='SG')
#         df.reset_index(drop=True, inplace=True)
#         time.reset_index(drop=True, inplace=True)
#         df
#         print("Selected test data from directory:   ", file_path_measured)
#     else:
#         print("""The input file is not read from raw format. Check whether it is in the correct directory or has the correct file name convention, with '_raw_data' suffix.
#               The program will proceed by assuming that the data is already in a clean format. """)
#         data = pd.read_csv(file_path_measured)
#         time = data['Time']
#         df = data.iloc[:, 1:].filter(regex='SG')
#         df.reset_index(drop=True, inplace=True)
#         time.reset_index(drop=True, inplace=True)
#         df

# endregion

In [41]:
df

Unnamed: 0_level_0,Strain_Channel_1,Strain_Channel_2,Strain_Channel_3
Time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0.000,0.093485,-0.021899,-1.124513e-17
0.005,0.063181,-0.140216,2.182869e-04
0.010,-0.074114,0.028591,-1.923511e-03
0.015,-0.032959,-0.038302,1.125406e-03
0.020,-0.021869,0.114608,5.926683e-03
...,...,...,...
99.975,0.940392,1.227673,6.741023e-01
99.980,0.751802,1.024726,6.971307e-01
99.985,0.797923,1.397362,7.252325e-01
99.990,0.965790,1.037332,7.719992e-01


### Plot & Modify Data

In [42]:
# region

# Copy the original DataFrame for plotting
original_df = df.copy()
downsampled_df_copy = None

# Function definitions
def find_divisors(num):
    return [i for i in range(1, num + 1) if num % i == 0]

def apply_butterworth_filter(data, cutoff, order, sampling_rate):
    nyquist = 0.5 * sampling_rate
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    filtered_data = filtfilt(b, a, data)
    return filtered_data

def downsample_data(df, original_rate, new_rate):
    if new_rate >= original_rate:
        return df
    step_size = int(original_rate / new_rate)
    downsampled_df = df.iloc[::step_size, :].copy()
    downsampled_df.index = np.linspace(df.index[0], df.index[-1], len(downsampled_df))
    return downsampled_df

def apply_time_offset(df, offset):
    # Find the index closest to the specified offset time
    closest_time_point = np.abs(df.index - offset).argmin()
    offset_df = df.iloc[closest_time_point:].copy()
    offset_df.index = offset_df.index - offset_df.index[0]
    return offset_df

def export_data(b):
    global downsampled_df_copy
    new_rate = sampling_rate_dropdown.value
    apply_filter = apply_filter_checkbox.value
    apply_offset = apply_time_offset_checkbox.value
    offset_time = time_offset_input.value

    # Check if any processing is applied
    is_processing_applied = (new_rate != sampling_rate) or apply_filter or (apply_offset and offset_time != 0)

    if downsampled_df_copy is not None and is_processing_applied:
        processed_df = downsampled_df_copy.copy()

        # Apply filter if needed
        if apply_filter:
            filter_cutoff = filter_cutoff_input.value
            filter_order = filter_order_input.value
            for col in processed_df.columns:
                processed_df[col] = apply_butterworth_filter(processed_df[col], filter_cutoff, filter_order, new_rate)

        # Apply time offset if needed
        if apply_offset and offset_time != 0:
            processed_df = apply_time_offset(processed_df, offset_time)
        
        # Export to CSV
        filename = 'exported_data.csv'
        processed_df.to_csv(filename)
        print(f'Data exported as {filename}')
    else:
        print('No modified data to export.')

def create_initial_plot(df):
    fig = go.Figure()
    for col in df.columns:
        fig.add_trace(go.Scatter(x=df.index, y=df[col], mode='lines', name=f'{col}', visible=False))
        fig.add_trace(go.Scatter(x=df.index, y=df[col], mode='lines', name=f'{col} Processed '))
    fig.update_layout(title='Data Plot', xaxis_title='Time (s)', yaxis_title='Values')
    return fig

def toggle_original_data(b):
    for i in range(len(original_df.columns)):
        plot_widget.data[i*2].visible = not plot_widget.data[i*2].visible

# Interactive widgets setup
divisors = find_divisors(sampling_rate)

# Downsampling Tab
sampling_rate_dropdown = widgets.Dropdown(options=divisors, value=sampling_rate, description='New Rate (Hz):', style={'description_width': 'initial'})
toggle_original_data_button = widgets.Button(description='Toggle Original Data', button_style='info', tooltip='Click to show/hide original data')
downsampling_tab = widgets.VBox([sampling_rate_dropdown, toggle_original_data_button])

# Butterworth Filter Tab
filter_cutoff_input = widgets.FloatText(value=3, description='Cutoff Frequency (Hz):', style={'description_width': 'initial'})
filter_order_input = widgets.IntText(value=2, description='Filter Order:', style={'description_width': 'initial'})
apply_filter_checkbox = widgets.Checkbox(value=False, description='Apply Butterworth Filter')
butterworth_tab = widgets.VBox([filter_cutoff_input, filter_order_input, apply_filter_checkbox])

# Time Offset Tab
time_offset_input = widgets.FloatText(value=0, description='Time Offset (s):', style={'description_width': 'initial'})
apply_time_offset_checkbox = widgets.Checkbox(value=False, description='Apply Time Offset')
time_offset_tab = widgets.VBox([time_offset_input, apply_time_offset_checkbox])

# Export Tab
export_button = widgets.Button(description='Export as CSV', button_style='success', tooltip='Click to export data')
export_tab = widgets.VBox([export_button])

# Tab widget
tab = widgets.Tab(children=[downsampling_tab, butterworth_tab, time_offset_tab])
tab.set_title(0, 'Downsampling')
tab.set_title(1, 'Butterworth Filter')
tab.set_title(2, 'Time Offset')

# Add the export tab
tab.children = list(tab.children) + [export_tab]
tab.set_title(3, 'Export Data')

# Plotting setup
initial_plot = create_initial_plot(original_df)
plot_widget = go.FigureWidget(initial_plot)

# Update plot function
def update_plot(change):
    global downsampled_df_copy
    new_rate = sampling_rate_dropdown.value
    apply_filter = apply_filter_checkbox.value
    filter_cutoff = filter_cutoff_input.value
    filter_order = filter_order_input.value
    apply_offset = apply_time_offset_checkbox.value
    offset_time = time_offset_input.value

    if downsampled_df_copy is None or change['owner'] == sampling_rate_dropdown:
        downsampled_df_copy = downsample_data(original_df, sampling_rate, new_rate)

    if apply_filter:
        processed_df = downsampled_df_copy.copy()
        for col in processed_df.columns:
            processed_df[col] = apply_butterworth_filter(processed_df[col], filter_cutoff, filter_order, new_rate)
    else:
        processed_df = downsampled_df_copy

    if apply_offset:
        offset_original_df = apply_time_offset(original_df, offset_time)
        offset_processed_df = apply_time_offset(processed_df, offset_time)
    else:
        offset_original_df = original_df
        offset_processed_df = processed_df

    with plot_widget.batch_update():
        for i, col in enumerate(original_df.columns):
            plot_widget.data[i * 2].x = offset_original_df.index
            plot_widget.data[i * 2].y = offset_original_df[col]
            plot_widget.data[i * 2 + 1].x = offset_processed_df.index
            plot_widget.data[i * 2 + 1].y = offset_processed_df[col]

# Observers and event handling
sampling_rate_dropdown.observe(update_plot, names='value')
apply_filter_checkbox.observe(update_plot, names='value')
filter_cutoff_input.observe(update_plot, names='value')
filter_order_input.observe(update_plot, names='value')
apply_time_offset_checkbox.observe(update_plot, names='value')
time_offset_input.observe(update_plot, names='value')
toggle_original_data_button.on_click(toggle_original_data)
export_button.on_click(export_data)

# Display the widgets and the plot
display(tab, plot_widget)

# endregion

Tab(children=(VBox(children=(Dropdown(description='New Rate (Hz):', index=11, options=(1, 2, 4, 5, 8, 10, 20, …

FigureWidget({
    'data': [{'mode': 'lines',
              'name': 'Strain_Channel_1',
              'type': 'scatter',
              'uid': '6935eba7-a39f-47b8-a298-142ba6a05119',
              'visible': False,
              'x': array([0.0000e+00, 5.0000e-03, 1.0000e-02, ..., 9.9985e+01, 9.9990e+01,
                          9.9995e+01]),
              'y': array([ 0.09348509,  0.063181  , -0.07411405, ...,  0.79792325,  0.96579027,
                           0.85225297])},
             {'mode': 'lines',
              'name': 'Strain_Channel_1 Processed ',
              'type': 'scatter',
              'uid': 'a90ff1cd-ef7a-4765-b2cb-75ed773d68a6',
              'x': array([0.0000e+00, 5.0000e-03, 1.0000e-02, ..., 9.9985e+01, 9.9990e+01,
                          9.9995e+01]),
              'y': array([ 0.09348509,  0.063181  , -0.07411405, ...,  0.79792325,  0.96579027,
                           0.85225297])},
             {'mode': 'lines',
              'name': 'Strain_Channel_2

No modified data to export.
No modified data to export.
Data exported as exported_data.csv
No modified data to export.
No modified data to export.
Data exported as exported_data.csv
