# Setup

In [None]:
!pip install plotly==5.10



In [None]:
import os
from google.colab import drive
from pathlib import Path
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.signal import butter, filtfilt, find_peaks

In [None]:
import plotly.io as pio
pio.renderers.default = 'colab'

# Mount Drive

In [None]:
mount_point = '/gdrive'
drive.mount(mount_point)

data_dir = os.path.join(mount_point, 'MyDrive', 'Adom-Tech', 'Received From AdOM', 'sample_data')

for file in Path(data_dir).rglob('*.txt'):
    if file.is_file():
      print(file)

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/readme.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/1/(Run)spectra_09-46-14-250.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/1/(Run)spectra_09-46-14-250_BestFit.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/2/(Run)spectra_09-46-17-302.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/2/(Run)spectra_09-46-17-302_BestFit.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/3/(Run)spectra_09-46-18-153.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/3/(Run)spectra_09-46-18-153_BestFit.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/4/(Run)spectra_09-46-21-692.txt
/gdrive/MyDrive/Adom-Tech/Received From AdOM/sample_data/good_fit/4/(Run)spectra_09-46-21-692_BestFit.txt
/

# Load Signals

In [None]:
def load_txt_file(file_path):
    """
    Load spectral data from a given txt file, automatically detecting headers or metadata.

    Args:
        file_path (str or Path): Path to the text file.

    Returns:
        pd.DataFrame: DataFrame with two columns ['Wavelength', 'Intensity'].
    """
    data_started = False
    wavelengths = []
    intensities = []

    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()

            # Skip empty lines
            if not line:
                continue

            # Detect the spectral data start (for non-best-fit files)
            if line.startswith('>>>>>Begin Spectral Data<<<<<'):
                data_started = True
                continue

            # Check if line contains two numerical columns
            parts = line.split()
            if len(parts) == 2:
                try:
                    wavelength, intensity = float(parts[0]), float(parts[1])
                    wavelengths.append(wavelength)
                    intensities.append(intensity)
                    data_started = True  # for BestFit files, data immediately starts
                except ValueError:
                    # Skip lines that don't have numerical data
                    if data_started:
                        # If data previously started but now hit non-numerical, break
                        break
                    else:
                        continue

    return pd.DataFrame({"Wavelength": wavelengths, "Intensity": intensities})


In [None]:
def load_patient_signals(data_dir, patient):
    """
    Loads the spectra and best fit signals for a given patient.

    Args:
        data_dir (str): The base directory containing the patient data.
        patient (tuple): A tuple containing the fit type ('good_fit' or 'bad_fit') and patient number (str).

    Returns:
        tuple: A tuple containing the spectra DataFrame and the best fit DataFrame.
    """
    patient_dir = os.path.join(data_dir, patient[0], patient[1])

    best_fit_path = None
    for file in Path(patient_dir).rglob('*BestFit.txt'):
        if file.is_file():
            best_fit_path = file
            break

    if best_fit_path is None:
        raise FileNotFoundError(f"No BestFit file found for patient {patient} in {patient_dir}")

    spectra_path = best_fit_path.with_name(best_fit_path.name.replace('_BestFit', ''))
    if not spectra_path.is_file():
         raise FileNotFoundError(f"Spectra file not found for patient {patient} at {spectra_path}")


    spectra_df = load_txt_file(spectra_path)
    best_fit_df = load_txt_file(best_fit_path)

    return spectra_df, best_fit_df

# Detrend

In [None]:
def detrend_signal(df, cutoff_frequency, filter_order=3):
    """
    Detrends a signal using a high-pass Butterworth filter.

    Parameters:
    df (DataFrame): Input DataFrame with 'Wavelength' and 'Intensity' columns.
    cutoff_frequency (float): Cutoff frequency for the high-pass filter (Hz).
    filter_order (int): Order of the Butterworth filter (default=3).

    Returns:
    DataFrame: DataFrame with an additional 'Detrended_Intensity' column.
    """
    # Ensure the DataFrame is sorted by Wavelength
    df = df.sort_values(by="Wavelength").reset_index(drop=True)

    # Sampling frequency calculation from Wavelength spacing
    sampling_interval = df['Wavelength'].diff().mean()
    sampling_frequency = 1 / sampling_interval

    # Normalize cutoff frequency
    nyquist_freq = 0.5 * sampling_frequency
    normal_cutoff = cutoff_frequency / nyquist_freq

    # High-pass Butterworth filter design
    b, a = butter(filter_order, normal_cutoff, btype='high', analog=False)

    # Apply filter using filtfilt for zero-phase filtering
    detrended_intensity = filtfilt(b, a, df['Intensity'])

    # Add detrended data to DataFrame
    df['Detrended'] = detrended_intensity

    return df


# Peak Detection

In [None]:
def detect_peaks(df, column, prominence=0.005):
    peaks_indices, _ = find_peaks(df[column], prominence=prominence)
    peaks_df = df.iloc[peaks_indices].reset_index(drop=True)
    return peaks_df

def detect_valleys(df, column, prominence=0.005):
    valleys_indices, _ = find_peaks(-df[column], prominence=prominence)
    valleys_df = df.iloc[valleys_indices].reset_index(drop=True)
    return valleys_df

# Plot Signals

In [None]:
import ipywidgets as widgets
from IPython.display import display

# Define dropdown options
patient_dropdown = widgets.Dropdown(
    options = [f'{fit}_{i}_{col}' for fit in ['good_fit', 'bad_fit'] for i in range(1, 11) for col in ['Intensity', 'Detrended']],
    value = 'good_fit_1_Detrended',
    description = 'Patient:',
)

plot_output = widgets.Output()

display(patient_dropdown, plot_output)

Dropdown(description='Patient:', index=1, options=('good_fit_1_Intensity', 'good_fit_1_Detrended', 'good_fit_2â€¦

Output()

In [None]:
def render_plot(patient_id):

    unpacked = tuple(patient_id.rsplit('_', 2))
    patient = unpacked[0:2]
    y_column = unpacked[2]
    x_column = 'Wavelength'

    spectra_df, best_fit_df = load_patient_signals(data_dir, patient)

    cutoff_frequency = 0.008
    filter_order = 3 # default of 3
    peak_prominence = 0.0001

    spectra_df = detrend_signal(spectra_df, cutoff_frequency, filter_order)
    best_fit_df = detrend_signal(best_fit_df, cutoff_frequency, filter_order)

    spectra_peaks = detect_peaks(spectra_df, y_column, peak_prominence)
    best_fit_peaks = detect_peaks(best_fit_df, y_column, peak_prominence)

    spectra_valleys = detect_valleys(spectra_df, y_column, peak_prominence)
    best_fit_valleys = detect_valleys(best_fit_df, y_column, peak_prominence)

    with plot_output:
        plot_output.clear_output(wait=True)

        print(f"Rendering plot for patient: {patient}")  # Optional debug print

        fig = go.Figure()

        # Plot Spectra data
        fig.add_trace(go.Scatter(
            x=spectra_df[x_column],
            y=spectra_df[y_column],
            mode='lines',
            name='Spectra',
            line=dict(color='blue')
        ))

        # Plot BestFit data
        fig.add_trace(go.Scatter(
            x=best_fit_df[x_column],
            y=best_fit_df[y_column],
            mode='lines',
            name='Best Fit',
            line=dict(dash='dot', color='red')
        ))

        # Plot detected peaks
        fig.add_trace(go.Scatter(
            x=spectra_peaks[x_column],
            y=spectra_peaks[y_column],
            mode='markers',
            name='Detected Peaks',
            marker=dict(size=8, symbol='circle', color='blue')
        ))

        # Plot detected peaks
        fig.add_trace(go.Scatter(
            x=best_fit_peaks[x_column],
            y=best_fit_peaks[y_column],
            mode='markers',
            name='Detected Peaks',
            marker=dict(size=8, symbol='circle', color='red')
        ))

        # Plot detected peaks
        fig.add_trace(go.Scatter(
            x=spectra_valleys[x_column],
            y=spectra_valleys[y_column],
            mode='markers',
            name='Detected Valleys',
            marker=dict(size=8, symbol='circle', color='cyan')
        ))

        # Plot detected peaks
        fig.add_trace(go.Scatter(
            x=best_fit_valleys[x_column],
            y=best_fit_valleys[y_column],
            mode='markers',
            name='Detected Valleys',
            marker=dict(size=8, symbol='circle', color='pink')
        ))

        # Update layout for better interactivity
        fig.update_layout(
            height=600,
            width=900,
            title=f"Spectra and Best Fit Data Overlay for {patient}",
            xaxis_title="Wavelength",
            yaxis_title="Intensity",
            hovermode='x unified'
        )

        fig.show()

# Initial render
render_plot(patient_dropdown.value)

# Observer for updating plot when selection changes
patient_dropdown.observe(lambda change: render_plot(change['new']), names='value')

# [DEPRECATED] Non-Widget Plotly Plotting

In [None]:
patient = ('good_fit', '10')

In [None]:
spectra_df, best_fit_df = load_patient_signals(data_dir, patient)

cutoff_frequency = 0.008
filter_order = 3 # default of 3
peak_prominence = 0.0001

x_column = 'Wavelength'
y_column = 'Detrended_Intensity'

spectra_df = detrend_signal(spectra_df, cutoff_frequency, filter_order)
best_fit_df = detrend_signal(best_fit_df, cutoff_frequency, filter_order)

# Detect peaks
spectra_peaks = detect_peaks(spectra_df, y_column, peak_prominence)
best_fit_peaks = detect_peaks(best_fit_df, y_column, peak_prominence)

spectra_valleys = detect_valleys(spectra_df, y_column, peak_prominence)
best_fit_valleys = detect_valleys(best_fit_df, y_column, peak_prominence)

# Create interactive plot
fig = go.Figure()

# Plot Spectra data
fig.add_trace(go.Scatter(
    x=spectra_df[x_column],
    y=spectra_df[y_column],
    mode='lines',
    name='Spectra',
    line=dict(color='blue')
))

# Plot BestFit data
fig.add_trace(go.Scatter(
    x=best_fit_df[x_column],
    y=best_fit_df[y_column],
    mode='lines',
    name='Best Fit',
    line=dict(dash='dot', color='red')
))

# Plot detected peaks
fig.add_trace(go.Scatter(
    x=spectra_peaks[x_column],
    y=spectra_peaks[y_column],
    mode='markers',
    name='Detected Peaks',
    marker=dict(size=8, symbol='circle', color='blue')
))

# Plot detected peaks
fig.add_trace(go.Scatter(
    x=best_fit_peaks[x_column],
    y=best_fit_peaks[y_column],
    mode='markers',
    name='Detected Peaks',
    marker=dict(size=8, symbol='circle', color='red')
))

# Plot detected peaks
fig.add_trace(go.Scatter(
    x=spectra_valleys[x_column],
    y=spectra_valleys[y_column],
    mode='markers',
    name='Detected Valleys',
    marker=dict(size=8, symbol='circle', color='cyan')
))

# Plot detected peaks
fig.add_trace(go.Scatter(
    x=best_fit_valleys[x_column],
    y=best_fit_valleys[y_column],
    mode='markers',
    name='Detected Valleys',
    marker=dict(size=8, symbol='circle', color='pink')
))

# Update layout for better interactivity
fig.update_layout(
    height=600,
    width=900,
    title=f"Spectra and Best Fit Data Overlay for {patient}",
    xaxis_title="Wavelength",
    yaxis_title="Intensity",
    hovermode='x unified'
)

fig.show()