In [37]:
''' Set up for Jupyter Notebook to automatically reload modules before executing code - easier debugging and development '''
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [38]:
import matplotlib.pyplot as plt
import numpy as np
import spikeinterface.full as si
from MEA_Analysis.MEAProcessingLibrary import mea_processing_library as mea
import os


def plot_traces_with_spikes_per_well(
    h5_file_path: str,
    sorted_output_dir: str,
    analyzer_output_dir: str,
    well_id: str,
    start_time: float = 0.0,
    duration: float = 1.0,
    max_channels: int = 64,
):
    """
    Plot raw traces overlaid with spikes for a specific well.
    
    Args:
        h5_file_path (str): Path to the .h5 recording file.
        sorted_output_dir (str): Path to the sorter output base directory.
        analyzer_output_dir (str): Path to the analyzer output base directory.
        well_id (str): The well ID (e.g., 'A1') to plot.
        start_time (float): Start time (in seconds) for the trace.
        duration (float): Duration (in seconds) of the trace window.
        max_channels (int): Max number of channels to display.
    """
    # --- Load recording dictionary ---
    _, recordings, _, _ = mea.load_recordings(h5_file_path)
    if well_id not in recordings:
        print(f"⚠️ Well {well_id} not found in recording.")
        return
    recording = recordings[well_id]
    segment = recording[0]  # This is typically a single segment object

    # --- Load SortingAnalyzer ---
    sa_path = os.path.join(analyzer_output_dir, well_id)
    if not os.path.exists(sa_path):
        print(f"⚠️ Analyzer directory for well {well_id} not found at: {sa_path}")
        return
    sa = si.load_sorting_analyzer(sa_path)
    sorting = sa.sorting
    sampling_rate = segment.get_sampling_frequency()

    # --- Define frame range ---
    start_frame = int(start_time * sampling_rate)
    end_frame = int((start_time + duration) * sampling_rate)
    times = np.arange(start_frame, end_frame) / sampling_rate

    # --- Load traces ---
    traces = segment.get_traces(start_frame=start_frame, end_frame=end_frame)
    num_channels = min(traces.shape[1], max_channels)
    traces = traces[:, :num_channels]

    # --- Get spike trains and best channels ---
    unit_ids = sorting.get_unit_ids()
    unit_spike_trains = {
        unit: sorting.get_unit_spike_train(unit, start_frame=start_frame, end_frame=end_frame)
        for unit in unit_ids
    }
    best_channels = {
        unit: sa.get_best_channels(unit)[0] for unit in unit_ids
    }

    # --- Plot ---
    plt.figure(figsize=(14, max(6, num_channels // 2)))
    offset = 0
    yticks = []

    for ch in range(num_channels):
        trace = traces[:, ch]
        plt.plot(times, trace + offset, color='black', linewidth=0.5)

        for unit_id, spike_frames in unit_spike_trains.items():
            if best_channels[unit_id] == ch:
                spike_times = spike_frames / sampling_rate
                spike_amps = np.interp(spike_frames - start_frame, np.arange(len(trace)), trace) + offset
                plt.scatter(spike_times, spike_amps, s=10, label=f'Unit {unit_id}' if ch == 0 else "")

        yticks.append(offset)
        offset += np.ptp(trace) * 1.2

    plt.xlabel("Time (s)")
    plt.yticks(yticks, [f"Ch {i}" for i in range(num_channels)])
    plt.title(f"Raw Traces with Spikes for Well {well_id}")
    if unit_ids:
        plt.legend(loc='upper right', fontsize='small')
    plt.tight_layout()
    plt.show()


In [39]:
REC_PATH = '/global/homes/a/adammwea/pscratch/z_raw_data/irc_maxone_desktop/media/harddrive8tb/CDKL5-R59X_MaxOnePlus_T1_05202025_PS/CDKL5-R59X_MaxOnePlus_T1_05202025_PS/250620/P002779/Network/000030/data.raw.h5'
SORT_PATH = '/global/homes/a/adammwea/pscratch/z_analyzed_data/irc_maxone_desktop/media/harddrive8tb/CDKL5-R59X_MaxOnePlus_T1_05202025_PS/CDKL5-R59X_MaxOnePlus_T1_05202025_PS/250620/P002779/Network/000030/sorted/well000/sorter_output'
ANALYZER_PATH = '/global/homes/a/adammwea/pscratch/z_analyzed_data/irc_maxone_desktop/media/harddrive8tb/CDKL5-R59X_MaxOnePlus_T1_05202025_PS/CDKL5-R59X_MaxOnePlus_T1_05202025_PS/250620/P002779/Network/000030/analyzer/'

In [41]:
plot_traces_with_spikes_per_well(
    # h5_file_path="/data/run123.h5",
    # sorted_output_dir="/data/sorted_output",
    # analyzer_output_dir="/data/analyzer_output",
    h5_file_path=REC_PATH,
    sorted_output_dir=SORT_PATH,
    analyzer_output_dir=ANALYZER_PATH,
    well_id="well000",
    start_time=5.0,
    duration=2.0,
    max_channels=48
)


2025-07-15 12:40:30,546 - INFO - Extracting recording details from h5 directories: - mea_processing_library.extract_recording_details
2025-07-15 12:40:30,548 - INFO - Scan Type: Network - mea_processing_library.load_recordings
2025-07-15 12:40:30,578 - INFO - MaxTwo Detected. - mea_processing_library.load_recordings
2025-07-15 12:40:30,596 - DEBUG - Stream ID: well000, Recording: rec0000 loaded. - mea_processing_library.load_recordings


**********
Maxwell file format is based on HDF5.
The internal compression requires a custom plugin!!!
This is a big pain for the end user.
You, as a end user, should ask Maxwell company to change this.
Please visit this page and install the missing decompression libraries:
https://share.mxwbio.com/d/4742248b2e674a85be97/
Then, link the decompression library by setting the `HDF5_PLUGIN_PATH` to your
installation location, e.g. via
os.environ['HDF5_PLUGIN_PATH'] = '/path/to/custom/hdf5/plugin/'

Alternatively, you can use the auto_install_maxwell_hdf5_compression_plugin() below
function that do it automagically.

**********


OSError: Can't synchronously read data (can't open directory (/usr/local/hdf5/lib/plugin). Please verify its existence)