In [39]:
import csv
import numpy as np
import os
import panel as pn
import plotly.graph_objects as go
from scipy.fft import fft, fftfreq
from scipy.signal import spectrogram

# Define your manual default file
manual_default = r"C:\Users\Louis\PycharmProjects\Data_collection_conda\3. Machine Data\Collected Data\2025-03-19 Test coupon 2.csv"
pn.extension('plotly')

# Map each channel to the desired color
signal_color_map = {
    "FZValue": "green",
    "CurrentFeedPosition": "red",
    "CurrentFeedTorque": "yellow",
    "CurrentRotationTorque": "teal"
}

def read_sensor_data(file_path):
    """
    Reads a CSV file that may contain header lines or timestamped rows.
    
    This function extracts only the following columns:
      - Column 2: FZValue
      - Column 3: CurrentFeedPosition
      - Column 4: CurrentFeedTorque
      - Column 6: CurrentRotationTorque
    
    If a row starts with a timestamp (detected by a colon ':' in the first cell),
    the function extracts the numbers at indices 1,2,3,5.
    
    For rows without a timestamp:
      - If there are at least 6 columns, it extracts indices 1,2,3,5.
      - If there are exactly 4 columns, it assumes they are already in the correct order.
    
    Returns:
        A transposed NumPy array where each row represents one signal.
    """
    data = []
    with open(file_path, 'r') as file:
        # Use csv.Sniffer to detect the delimiter (supports comma and semicolon)
        sample = file.read(1024)
        file.seek(0)
        try:
            dialect = csv.Sniffer().sniff(sample)
            delimiter = dialect.delimiter
        except csv.Error:
            delimiter = ','
        reader = csv.reader(file, delimiter=delimiter)
        for row in reader:
            # Remove empty strings and trim whitespace
            row = [cell.strip() for cell in row if cell.strip() != '']
            if not row:
                continue
            try:
                if ':' in row[0]:
                    # Timestamped row: select columns 2,3,4,6 (indices 1,2,3,5)
                    if len(row) < 6:
                        continue
                    numbers = [float(row[1]), float(row[2]), float(row[3]), float(row[5])]
                else:
                    # Non-timestamp row
                    if len(row) >= 6:
                        numbers = [float(row[1]), float(row[2]), float(row[3]), float(row[5])]
                    elif len(row) == 4:
                        # Already in correct order
                        numbers = [float(x) for x in row]
                    else:
                        continue
            except ValueError:
                # Skip rows that cannot be fully converted to floats
                continue
            data.append(numbers)
    if data:
        arr = np.array(data)
        return arr.T  # Transpose so that each row is one signal
    else:
        return np.array([])

def generate_time_list(sampling_frequency, num_samples, start_time=0.0):
    dt = 1.0 / sampling_frequency  
    return [start_time + i * dt for i in range(num_samples)]

colors = {
    'neutral space': '#e8e8e4',
    'deep space': '#003247'
}

# Define grid settings for all plots
#grid_settings = dict(gridcolor="black", zerolinecolor="black")
grid_settings = dict(gridcolor="black", zerolinecolor="black", gridwidth=0.1, zerolinewidth=0.1)

def ajuster_liste_puissance_de_2(lst):
    n = len(lst)
    if n != 0 and (n & (n - 1)) == 0:
        return np.array(lst)
    next_power = 2 ** ((n - 1).bit_length())
    return np.pad(lst, (0, next_power - n), 'constant')

def corr(signal_ampl, fs_local):
    signal_ampl = np.array(signal_ampl)
    mean_signal = np.mean(signal_ampl)
    signal_centered = signal_ampl - mean_signal
    autocorr_values = np.correlate(signal_centered, signal_centered, mode='full')
    lags = np.arange(-len(signal_ampl) + 1, len(signal_ampl))
    return lags / fs_local, autocorr_values

def psd(signal_ampl, fs_local):
    signal_ampl = np.array(signal_ampl)
    signal_padded = ajuster_liste_puissance_de_2(signal_ampl)
    N = len(signal_padded)
    fft_vals = np.fft.fft(signal_padded)
    freqs = np.fft.fftfreq(N, d=1/fs_local)
    psd_values = np.abs(fft_vals) ** 2 / N
    positive_mask = freqs > 0
    return freqs[positive_mask], psd_values[positive_mask]

def FFT_signal(signal_ampl, fs_local, limited_harmonique_number=100000):
    signal_ampl = np.array(signal_ampl)
    N = len(signal_ampl)
    yf = np.fft.fft(signal_ampl)
    xf = np.fft.fftfreq(N, d=1/fs_local)
    positive_mask = xf > 0
    return xf[positive_mask], np.abs(yf[positive_mask])

def plot_spectrogram(signal, fs_local, axis_label=""):
    f, t_spec, Sxx = spectrogram(signal, fs=fs_local)
    Sxx_db = 10 * np.log10(Sxx + 1e-10)
    fig = go.Figure(data=go.Heatmap(x=t_spec, y=f, z=Sxx_db, colorscale='Viridis', colorbar=dict(title="dB")))
    fig.update_layout(
        title=f"Spectrogram {axis_label}",
        xaxis_title="Time (sec)",
        yaxis_title="Frequency (Hz)",
        paper_bgcolor="white",
        plot_bgcolor="white",
        xaxis=grid_settings,
        yaxis=grid_settings
    )
    return fig

def update_dashboard(file_name):
    data_arrays = read_sensor_data(file_name)
    if data_arrays.size == 0:
        return pn.pane.Markdown("**Error: No data found in the selected file.**")
    
    # Unpack the 4 channels with their new names:
    FZValue, CurrentFeedPosition, CurrentFeedTorque, CurrentRotationTorque = data_arrays
    fs = 26  # (Adjust to the correct sampling frequency if needed)
    n_samples = len(CurrentRotationTorque)
    time = generate_time_list(fs, n_samples)
    
    # --- Time-Domain Plot ---
    fig_time = go.Figure()
    fig_time.add_trace(
        go.Scatter(x=time, y=FZValue, mode="lines", name="FZValue",
                   line=dict(color=signal_color_map["FZValue"]))
    )
    fig_time.add_trace(
        go.Scatter(x=time, y=CurrentFeedPosition, mode="lines", name="CurrentFeedPosition",
                   line=dict(color=signal_color_map["CurrentFeedPosition"]))
    )
    fig_time.add_trace(
        go.Scatter(x=time, y=CurrentFeedTorque, mode="lines", name="CurrentFeedTorque",
                   line=dict(color=signal_color_map["CurrentFeedTorque"]))
    )
    fig_time.add_trace(
        go.Scatter(x=time, y=CurrentRotationTorque, mode="lines", name="CurrentRotationTorque",
                   line=dict(color=signal_color_map["CurrentRotationTorque"]))
    )
    fig_time.update_layout(
        title="Time Domain Signals",
        xaxis_title="Time (sec)",
        yaxis_title="Amplitude",
        paper_bgcolor="white",
        plot_bgcolor="white",
        xaxis=grid_settings,
        yaxis=grid_settings
    )
    
    # --- FFT Plot ---
    fig_fft = go.Figure()
    for signal, label in zip(
        [FZValue, CurrentFeedPosition, CurrentFeedTorque, CurrentRotationTorque],
        ["FZValue", "CurrentFeedPosition", "CurrentFeedTorque", "CurrentRotationTorque"]
    ):
        freq, ampl = FFT_signal(signal, fs)
        fig_fft.add_trace(go.Scatter(
            x=freq, y=ampl, mode="lines", name=f"FFT {label}",
            line=dict(color=signal_color_map[label])
        ))
    fig_fft.update_layout(
        title="FFT of Signals",
        xaxis_title="Frequency (Hz)",
        yaxis_title="Amplitude",
        paper_bgcolor="white",
        plot_bgcolor="white",
        xaxis=grid_settings,
        yaxis=grid_settings
    )
    
    # --- Autocorrelation Plot ---
    fig_corr = go.Figure()
    for signal, label in zip(
        [FZValue, CurrentFeedPosition, CurrentFeedTorque, CurrentRotationTorque],
        ["FZValue", "CurrentFeedPosition", "CurrentFeedTorque", "CurrentRotationTorque"]
    ):
        lag, corr_val = corr(signal, fs)
        fig_corr.add_trace(go.Scatter(
            x=lag, y=corr_val, mode="lines", name=f"Autocorr {label}",
            line=dict(color=signal_color_map[label])
        ))
    fig_corr.update_layout(
        title="Autocorrelation of Signals",
        xaxis_title="Time (sec)",
        yaxis_title="Amplitude",
        paper_bgcolor="white",
        plot_bgcolor="white",
        xaxis=grid_settings,
        yaxis=grid_settings
    )
    
    # --- Spectrogram Plots ---
    fig_spec1 = plot_spectrogram(FZValue, fs, axis_label="FZValue")
    fig_spec2 = plot_spectrogram(CurrentFeedPosition, fs, axis_label="CurrentFeedPosition")
    fig_spec3 = plot_spectrogram(CurrentFeedTorque, fs, axis_label="CurrentFeedTorque")
    fig_spec4 = plot_spectrogram(CurrentRotationTorque, fs, axis_label="CurrentRotationTorque")
    
    # Layout the dashboard
    dashboard = pn.Column(
        pn.pane.Markdown("## Time Domain Plot"),
        pn.pane.Plotly(fig_time, sizing_mode="stretch_width"),
        pn.pane.Markdown("## FFT Plot"),
        pn.pane.Plotly(fig_fft, sizing_mode="stretch_width"),
        pn.pane.Markdown("## Autocorrelation Plot"),
        pn.pane.Plotly(fig_corr, sizing_mode="stretch_width"),
        pn.pane.Markdown("## Spectrograms"),
        pn.Row(
            pn.pane.Plotly(fig_spec1, sizing_mode="stretch_width"),
            pn.pane.Plotly(fig_spec2, sizing_mode="stretch_width")
        ),
        pn.Row(
            pn.pane.Plotly(fig_spec3, sizing_mode="stretch_width"),
            pn.pane.Plotly(fig_spec4, sizing_mode="stretch_width")
        ),
        sizing_mode="stretch_width", 
        styles={'background': "white"}
    )
    return dashboard

# Set the directory to look for .csv files
data_dir = r"C:\Users\Louis\PycharmProjects\Data_collection_conda\3. Machine Data\Collected Data"
csv_files = sorted([os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.lower().endswith('.csv')])

if manual_default not in csv_files:
    csv_files.insert(0, manual_default)

default_file = manual_default
file_selector = pn.widgets.Select(name="Select a CSV file", options=csv_files, value=default_file)
dashboard_panel = pn.bind(update_dashboard, file_name=file_selector)

header = pn.Row(
    pn.pane.PNG('ESA_LOGO.png', width=200),
    pn.pane.Markdown(
        "# ESA Analyzer Dashboard",
        align='center',
        styles={
            'margin-top': '30px',
            'background': colors['deep space'],
            'color': colors['neutral space'],
            'text-align': 'center',
            'width': '90%',
            'font-size': '25px',
            'border-radius': '10px'
        },
        sizing_mode='stretch_width'
    ),
    align='center',
    styles={'background': colors['deep space']},
    sizing_mode='stretch_width'
)

layout = pn.Column(
    header,
    pn.Row(file_selector, sizing_mode="stretch_width"),
    dashboard_panel,
    sizing_mode='stretch_width',
    styles={'background': "white"}
)

template = pn.template.MaterialTemplate(
    title="ESA Analyzer Dashboard",
    header_background=colors['deep space'],
    main=[layout]
)

pn.serve(template)


Launching server at http://localhost:55574


<panel.io.server.Server at 0x234c220e850>