In [2]:
#import packages and set global variables
import numpy as np
import pathlib
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats

# Global variables
PARENT_PATH = r"G:\Files_from_npx_bonzai_pc\raw_data"
N_CHANNELS = 5
channel_labels = ['Camera_trigger', 'Audio_output', 'Photodiode', 'Probe_sync', 'Microphone']
threshold = 0.5

In [27]:
#Getting all the analog signals

def get_all_data(mouse_id):
    """
    Retrieve data for a given mouse identified by its ID.

    The function searches for a directory matching the mouse_id and reads
    "analogy.bin," "block_timers.csv," and "cam_metadata.csv" files in that directory.

    Parameters:
    - mouse_id (int or str): Unique identifier for the mouse.

    Returns:
    - tuple: A tuple containing data from "analogy.bin," "block_timers.csv," and "cam_metadata.csv"
             as separate variables, and the path to the mouse directory.

    Raises:
    - ValueError: If no directory is found for the given mouse ID.
    """
    directory = pathlib.Path(PARENT_PATH)
    mouse_directory = [folder for folder in directory.iterdir() if f"mouseid-{mouse_id}" in folder.name]

    if not mouse_directory:
        raise ValueError(f"No directory found for mouse ID: {mouse_id}")

    # Read analogy.bin
    analogy_bin_path = list(mouse_directory[0].rglob('*.bin'))[0]
    analogy_bin_data = np.fromfile(str(analogy_bin_path))
    reshaped_analogy_data = analogy_bin_data.reshape(int(analogy_bin_data.shape[0] / N_CHANNELS), N_CHANNELS)
    #Assign channels to the analogy files
    binary_channels = assign_channels_to_dict(reshaped_analogy_data)

    # Read block_timers.csv
    block_timers_path = list(mouse_directory[0].rglob('block_timers.csv'))[0]
    block_timers_data = pd.read_csv(block_timers_path)

    # Read cam_metadata.csv
    cam_metadata_path = list(mouse_directory[0].rglob('cam_metadata.csv'))[0]
    cam_metadata_data = pd.read_csv(cam_metadata_path, usecols=[0, 1, 2, 4], 
                                    header=None, 
                                    names=['frame number', 'zero', 'timestamp_in_tick', 'stage_id'],
                                    skiprows=1)
    # This is because the units are by ticks, where 1 tick = 1/125000 miliseconds, or 8 nanoseconds

    return binary_channels, block_timers_data, cam_metadata_data, mouse_directory[0]


def assign_channels_to_dict(bonsai_binary_data):
    """
    Assign binary data from multiple channels to a dictionary with channel labels.

    Parameters:
    - bonsai_binary_data (numpy.ndarray): 2D array with binary data from multiple channels.

    Returns:
    - dict: A dictionary mapping channel labels to their respective binary data.
    """
    binary_channels = {}
    for channel, channel_label in zip(bonsai_binary_data.T, channel_labels):
        binary_channels[channel_label] = channel
    return binary_channels

In [4]:
#Read the binary data from the specified file.
mouse_id = "1119623"
binary_channels, block_timers_data, cam_metadata, mouse_directory  = get_all_data(mouse_id)
audio_data = binary_channels['Audio_output']

In [5]:
def get_clustered_audio_stimulus_timing(data: np.ndarray, sampling_rate: int, threshold: float, min_interval: float) -> tuple[np.ndarray, np.ndarray]:
    """
    Identify the onsets and durations of audio stimuli from a signal, clustering close events as one.

    :param data: Audio signal data as a numpy array.
    :param sampling_rate: The sampling rate of the audio data (in Hz).
    :param threshold: Threshold value to determine stimulus 'on' state.
    :param min_interval: Minimum interval in seconds to separate distinct audio stimuli.
    :return: A tuple of two numpy arrays: clustered onset times and durations, both in seconds.
    """
    # Detecting stimulus presence
    stimulus_present = data > threshold
    changes = np.diff(stimulus_present.astype(int))
    onset_indices = np.where(changes == 1)[0] + 1
    offset_indices = np.where(changes == -1)[0] + 1

    # Handling signal start or end with ongoing stimulus
    if stimulus_present[0]:
        onset_indices = np.insert(onset_indices, 0, 0)
    if stimulus_present[-1]:
        offset_indices = np.append(offset_indices, len(data) - 1)

    # Convert indices to time
    onset_times = onset_indices / sampling_rate
    offset_times = offset_indices / sampling_rate

    # Clustering closely spaced onsets
    clustered_onsets = []
    clustered_offsets = []

    for i in range(len(onset_times)):
        if not clustered_onsets:
            # Add the first onset and corresponding offset
            clustered_onsets.append(onset_times[i])
            clustered_offsets.append(offset_times[i])
        else:
            if onset_times[i] - clustered_offsets[-1] < min_interval:
                # Extend the current event
                clustered_offsets[-1] = offset_times[i]
            else:
                # Start a new event
                clustered_onsets.append(onset_times[i])
                clustered_offsets.append(offset_times[i])

    # Calculating durations
    durations = np.array(clustered_offsets) - np.array(clustered_onsets)

    return np.array(clustered_onsets), durations

In [6]:
clustered_onset_times, clustered_durations = get_clustered_audio_stimulus_timing(audio_data, 30000,0.1, 0.5)  # 0.04 seconds as an example min_interval

# Display the first few clustered onsets and durations
clustered_onset_times[:10], clustered_durations[:10]

(array([ 795.95023333, 3088.2766    , 3668.90103333, 7563.58256667]),
 array([ 2.10136667, 10.00063333, 10.00066667,  0.72116667]))

In [1]:
### save these arrays
np.save('clustered_onset_times.npy', clustered_onset_times)
np.save('clustered_durations.npy', clustered_durations)

NameError: name 'np' is not defined