In [None]:
import os
import h5py
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pyarrow as pa
import pyarrow.feather as feather
from pycircstat2 import Circular

def load_dicts_from_h5(filename):
    d = {}
    with h5py.File(filename, 'r') as f:
        for key in f.keys():
            dataset = f[key]
            if dataset.dtype.char == 'S':  # String data
                d[key] = dataset[()].decode('utf-8') if isinstance(dataset[()], bytes) else dataset[()]
            else:  # Numeric data
                d[key] = dataset[()]
    return d

def save_dicts_to_h5(dicts, filename):
    with h5py.File(filename, 'w') as f:
        for i, d in enumerate(dicts):
            group = f.create_group(f'dict_{i}')
            
            for key, value in d.items():
                if isinstance(value, np.ndarray):
                    group.create_dataset(key, data=value)
                else:
                    # Store strings as fixed-length or variable-length
                    group.create_dataset(key, data=value)

main_dir = os.getcwd()
data_dir = os.path.join(main_dir, '..', 'localdata')

moths = [
    "2025-02-25_1",
    "2025-02-25",
    "2025-03-11",
    "2025-03-12_1",
    "2025-03-20",
    "2025-03-21"
]
fsamp = 30000

In [None]:
ref_muscle = 'ldlm'
wb_duration_thresholds = np.array([50, 70]) # ms ldlm

rows = []
# Loop over moths
for moth in moths:
    # Load each moth's data
    base_name = os.path.join(data_dir, moth)
    data_file = f"{base_name}_data.npz"
    labels_file = f"{base_name}_labels.npz"
    spikes = np.load(os.path.join(data_file))
    labels = {}
    if os.path.exists(labels_file):
        labels_data = np.load(labels_file)
        labels = {unit: labels_data[unit].item() for unit in labels_data.files}
    units = spikes.files  # List of unit labels
    neurons, neuron_quality, muscle_labels = [], [], []
    for unit in units:
        if unit.isnumeric():  # Numeric labels are neurons
            label = labels.get(unit, None)
            neurons.append(unit)
            neuron_quality.append(label)
        else:  # Alphabetic labels are muscles
            muscle_labels.append(unit)
    neurons = np.array(neurons)
    neuron_quality = np.array(neuron_quality)

    # Extract DLM phase, put all neuron spikes on that phase
    diffvec = np.diff(spikes[ref_muscle]) / fsamp * 1000 # units of ms
    mask = np.logical_and(diffvec > wb_duration_thresholds[0], diffvec < wb_duration_thresholds[1])
    start_inds = spikes[ref_muscle][np.where(np.hstack([False, mask]))[0]]
    wblen = np.hstack([np.diff(start_inds), 10000])

    # For each neuron, assign spikes to wingstrokes
    max_duration_thresh_samples = wb_duration_thresholds[1] / 1000 * fsamp
    phase_dict, wblen_dict = {}, {}
    for neur in neurons:
        wb_assign = np.searchsorted(start_inds, spikes[neur], side='right') - 1
        # Spikes which are in valid wingbeats (not before first wb or after last, not too long of a wingbeat)
        mask = np.logical_and(wb_assign >= 0, wb_assign <= len(start_inds))
        # New spike vector that's spike index - wingbeat start index / wingbeat length
        phase = (spikes[neur][mask] - start_inds[wb_assign[mask]]) / wblen[wb_assign[mask]]
        length_mask = wblen[wb_assign[mask]] < max_duration_thresh_samples
        phase = phase[length_mask]
        phase_dict[neur] = phase
        wblen_dict[neur] = wblen[wb_assign[mask]][length_mask] / fsamp * 1000

    # Loop over neurons
    for neur in neurons:
        # Too few spikes for good stats
        if len(phase_dict[neur]) < 100:
            rows.append({
                "moth": moth, 
                "neuron": int(neur),
                "r_sharpness": 0,
                "kappa_global": 0,
                "r_rayleigh": 0,
                "s_deviation": 0,
                "mean": 0,
                "mean_pvalue": 0,
                "median": 0,
                "n_clusters": 0,
                "mu": [0],
                "kappa": [0],
            })
        else:
            # Use mixture of von mises distributions and BIC to determine how many preferred directions each neuron has
            c = Circular(phase_dict[neur] * 2 * np.pi, unit="radian", n_clusters_max=7)
            # For each, save the following:
            rows.append({
                "moth": moth, 
                "neuron": int(neur),
                "r_sharpness": c.r, # r (resultant vector length, sharpness)
                "kappa_global": c.kappa, # kappa (concentration param) for all data
                "r_rayleigh": c.R, # R (Rayleigh's R stat)
                "s_deviation": c.s, # s (angular deviation, measure of dispersion)
                "mean": c.mean, # mean angle
                "mean_pvalue": c.mean_test_result.pval, # p value of mean angle from Rayleigh test
                "median": c.median,
                "n_clusters": c.mixture_opt.n_clusters, # Optimal number of clusters from mixture of Von Mises fitting + BIC
                "mu": [a['mu'] for a in c.mixture_opt.params_], # mean parameter for each von mises distribution
                "kappa": [a['kappa'] for a in c.mixture_opt.params_], # kappa parameter for each von mises distribution
            })

df = pd.DataFrame(rows)
table = pa.Table.from_pandas(df)
feather.write_feather(table, os.path.join(data_dir, "circular_stats.arrow"))

Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>10000), it will take a while to find the median.
Or set `kwargs_median={'method': None}` to skip.
Sample size is large (n>

In [None]:
ref_muscle = 'ldlm'
wb_duration_thresholds = np.array([50, 70]) # ms ldlm

rows = []
# Loop over moths
for moth in moths:
    # Load each moth's data
    base_name = os.path.join(data_dir, moth)
    data_file = f"{base_name}_data.npz"
    labels_file = f"{base_name}_labels.npz"
    spikes = np.load(os.path.join(data_file))
    labels = {}
    if os.path.exists(labels_file):
        labels_data = np.load(labels_file)
        labels = {unit: labels_data[unit].item() for unit in labels_data.files}
    units = spikes.files  # List of unit labels
    neurons, neuron_quality, muscle_labels = [], [], []
    for unit in units:
        if unit.isnumeric():  # Numeric labels are neurons
            label = labels.get(unit, None)
            neurons.append(unit)
            neuron_quality.append(label)
        else:  # Alphabetic labels are muscles
            muscle_labels.append(unit)
    neurons = np.array(neurons)
    neuron_quality = np.array(neuron_quality)

    # Extract DLM phase, put all neuron spikes on that phase
    diffvec = np.diff(spikes[ref_muscle]) / fsamp * 1000 # units of ms
    mask = np.logical_and(diffvec > wb_duration_thresholds[0], diffvec < wb_duration_thresholds[1])
    start_inds = spikes[ref_muscle][np.where(np.hstack([False, mask]))[0]]
    wblen = np.hstack([np.diff(start_inds), 10000])

    # For each neuron, assign spikes to wingstrokes
    max_duration_thresh_samples = wb_duration_thresholds[1] / 1000 * fsamp
    phase_dict, wblen_dict = {}, {}
    for neur in neurons:
        wb_assign = np.searchsorted(start_inds, spikes[neur], side='right') - 1
        # Spikes which are in valid wingbeats (not before first wb or after last, not too long of a wingbeat)
        mask = np.logical_and(wb_assign >= 0, wb_assign <= len(start_inds))
        # New spike vector that's spike index - wingbeat start index / wingbeat length
        phase = (spikes[neur][mask] - start_inds[wb_assign[mask]]) / wblen[wb_assign[mask]]
        length_mask = wblen[wb_assign[mask]] < max_duration_thresh_samples
        phase = phase[length_mask]
        phase_dict[neur] = phase
        wblen_dict[neur] = wblen[wb_assign[mask]][length_mask] / fsamp * 1000

    # Loop over neurons
    for neur in neurons:
        # Too few spikes for good stats
        if len(phase_dict[neur]) < 100:
            rows.append({
                "moth": moth, 
                "neuron": int(neur),
                "r_sharpness": 0,
                "kappa_global": 0,
                "r_rayleigh": 0,
                "s_deviation": 0,
                "mean": 0,
                "mean_pvalue": 0,
                "median": 0,
                "n_clusters": 0,
                "mu": [0],
                "kappa": [0],
            })
        else:
            # Use mixture of von mises distributions and BIC to determine how many preferred directions each neuron has
            c = Circular(phase_dict[neur] * 2 * np.pi, unit="radian", n_clusters_max=7)
            # For each, save the following:
            rows.append({
                "moth": moth, 
                "neuron": int(neur),
                "r_sharpness": c.r, # r (resultant vector length, sharpness)
                "kappa_global": c.kappa, # kappa (concentration param) for all data
                "r_rayleigh": c.R, # R (Rayleigh's R stat)
                "s_deviation": c.s, # s (angular deviation, measure of dispersion)
                "mean": c.mean, # mean angle
                "mean_pvalue": c.mean_test_result.pval, # p value of mean angle from Rayleigh test
                "median": c.median,
                "n_clusters": c.mixture_opt.n_clusters, # Optimal number of clusters from mixture of Von Mises fitting + BIC
                "mu": [a['mu'] for a in c.mixture_opt.params_], # mean parameter for each von mises distribution
                "kappa": [a['kappa'] for a in c.mixture_opt.params_], # kappa parameter for each von mises distribution
            })

df = pd.DataFrame(rows)
table = pa.Table.from_pandas(df)
feather.write_feather(table, os.path.join(data_dir, "circular_stats.arrow"))