### AIND Ephys Pipeline Stats

This notebook processes all ephys sessions with sorted data in the AIND pipeline, counts the number of probes and units per session, and summarizes the results.

In [None]:
import os
import re
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import seaborn as sns

from aind_data_access_api.document_db import MetadataDbClient


from utils import (
    get_all_ecephys_derived, 
    get_duration_from_session,
    get_unique_sessions_with_dates,
    parse_curation_notes
)

plt.rcParams['pdf.fonttype'] = 42

In [None]:
results_folder = Path("../results")
figures_folder = results_folder / "pipeline"
figures_folder.mkdir(parents=True, exist_ok=True)

In [None]:
# Constants for database connection
API_GATEWAY_HOST = os.environ.get("API_GATEWAY_HOST", "api.allenneuraldynamics.org")
DATABASE = os.environ.get("DATABASE", "metadata_index")
COLLECTION = os.environ.get("COLLECTION", "data_assets")

# Initialize the client
client = MetadataDbClient(
    host=API_GATEWAY_HOST,
    database=DATABASE,
    collection=COLLECTION,
)

In [None]:
pipeline_deployment_date = datetime(2024, 2, 20)
ks4_deployment_date = datetime(2024, 11, 10)
up_to_date = datetime(2025, 8, 1)

In [None]:
all_derived = get_all_ecephys_derived(client)

In [None]:
sorted_assets = [d for d in all_derived if "sorted" in d["data_description"]["name"]]
sorted_asset_with_session_metadata = [d for d in sorted_assets if d["session"] is not None]

print(f"Total sorted assets: {len(sorted_assets)} - with session metadata: {len(sorted_asset_with_session_metadata)}")

In [None]:
# Get unique sessions
unique_sessions_df = get_unique_sessions_with_dates(sorted_assets)
unique_sessions_df['created_date'] = pd.to_datetime(unique_sessions_df['created_date'])
print(f"Total unique sessions: {len(unique_sessions_df)}")
print(f"Date range: {unique_sessions_df['created_date'].min()} to {unique_sessions_df['created_date'].max()}")


# Only keep sessions after pipeline deployment
unique_sessions_df = unique_sessions_df[unique_sessions_df['created_date'] >= pipeline_deployment_date]
unique_sessions_df = unique_sessions_df[unique_sessions_df['created_date'] <= up_to_date]
print(f"\nUnique sessions after pipeline deployment: {len(unique_sessions_df)}")
print(f"Date range: {unique_sessions_df['created_date'].min()} to {unique_sessions_df['created_date'].max()}")

In [None]:
first_asset = unique_sessions_df.loc[unique_sessions_df['created_date'].idxmin()]
print(f"First asset with new pipeline:\n{first_asset['full_name']} created on {first_asset['created_date']}")

In [None]:
sorted_assets_entries = []
no_processing = 0
no_processing_pipeline = 0
no_data_processes = 0
no_curation = 0

for index, row in unique_sessions_df.iterrows():
    sorted_asset = row["entry"]
    # create a dict for each entry with:
    # - number of probes
    # - number of streams
    # - sorter
    # - curation output
    processing = sorted_asset.get("processing", {})
    if not processing:
        no_processing += 1
        no_processing_pipeline += 1
        no_data_processes += 1
        no_curation += 1
        continue
    processing_pipeline = processing.get("processing_pipeline", {})
    if not processing_pipeline:
        no_processing_pipeline += 1
        no_data_processes += 1
        no_curation += 1
        continue

    data_processes = processing_pipeline.get("data_processes", [])
    # if an entry of data_processes is a list, we have to flatten it
    flattened_data_processes = []
    for dp in data_processes:
        if isinstance(dp, list):
            flattened_data_processes.extend(dp)
        else:
            flattened_data_processes.append(dp)
    data_processes = flattened_data_processes

    # get unique stream names and probe names using regex (from Spike sorting data_process)
    curation_processes = [
        dp for dp in data_processes if "curation" in dp["name"]
    ]
    if len(curation_processes) == 0:
        no_curation += 1
        continue
    spike_sorting_processes = [
        dp for dp in data_processes if "Spike sorting" in dp["name"]
    ]
    if len(spike_sorting_processes) == 0:
        no_curation += 1
        print(f"No spike sorting process for {sorted_asset['data_description']['name']} (# curation: {len(curation_processes)})")
        continue
    recording_names = [
        dp["parameters"]["recording_name"] for dp in curation_processes 
        if "recording_name" in dp["parameters"]
    ]
    num_streams = len(set(recording_names))
    probe_names = []
    for recording_name in recording_names:
        match = re.search(r'\.Probe([A-Z]|\d+)(?:-AP)?[_.]', recording_name)
        if match:
            probe_name = f"Probe{match.group(1)}"
            probe_names.append(probe_name)
    probe_names = sorted(list(set(probe_names)))
    num_probes = len(probe_names)
    sorter_name = spike_sorting_processes[0]["parameters"].get("sorter_name", "kilosort2_5")
    curation_outputs = {}
    for dp in curation_processes:
        # curation_output = dp["outputs"]
        curation_notes = dp.get("notes", "")
        curation_output = parse_curation_notes(curation_notes)
            
        for k, v in curation_output.items():
            if k not in curation_outputs:
                curation_outputs[k] = v
            else:
                curation_outputs[k] += v

    entry_dict = {
        "name": sorted_asset["data_description"]["name"],
        "num_probes": num_probes,
        "num_streams": num_streams,
        "sorter": sorter_name,
        "created_date": row["created_date"],
    }
    for k in curation_outputs:
        entry_dict[k] = curation_outputs[k]

    sorted_assets_entries.append(entry_dict)

sessions_df = pd.DataFrame(sorted_assets_entries)

sessions_df = sessions_df.query(
    "sorter in ['kilosort2_5', 'kilosort4']"
)
sessions_df['created_date'] = pd.to_datetime(sessions_df['created_date'])
sessions_df = sessions_df.sort_values('created_date')


print(f"Total number of assets: {len(unique_sessions_df)}")
print(f"\tNumber of sorted assets without processing info: {no_processing}")
print(f"\tNumber of sorted assets without processing pipeline info: {no_processing_pipeline}")
print(f"\tNumber of sorted assets without no curation: {no_curation}")
print(f"\tNumber of sorted assets to analyze: {len(sessions_df)}")

In [None]:
# Create weekly aggregation
sessions_df['week'] = sessions_df['created_date'].dt.to_period('W')
weekly_counts = sessions_df.groupby('week').size().reset_index(name='count')
weekly_counts['week_start'] = weekly_counts['week'].dt.start_time

# Create cumulative count (post-pipeline only)
sessions_df_sorted = sessions_df.sort_values('created_date').reset_index(drop=True)
sessions_df_sorted['cumulative_count'] = range(1, len(sessions_df_sorted) + 1)

print(f"Pipeline weekly data points: {len(weekly_counts)}")
print(f"Date range for pipeline data: {weekly_counts['week_start'].min()} to {weekly_counts['week_start'].max()}")
print(f"Pipeline sessions: {len(sessions_df)}")
print(f"\nWeekly stats (pipeline only):")
print(f"  Mean sessions per week: {weekly_counts['count'].mean():.1f}")
print(f"  Max sessions in a week: {weekly_counts['count'].max()}")
print(f"  Min sessions in a week: {weekly_counts['count'].min()}")

In [None]:
sessions_df_sorted.columns

In [None]:
# Calculate cumulative probe counts over time
sessions_df_sorted['cumulative_probes'] = sessions_df_sorted['num_probes'].cumsum()

# Calculate cumulative unit statistics over time
sessions_df_sorted['cumulative_total_units'] = sessions_df_sorted['total_units'].cumsum()
sessions_df_sorted['cumulative_passing_qc'] = sessions_df_sorted['passing_qc'].cumsum()
sessions_df_sorted['cumulative_failing_qc'] = sessions_df_sorted['failing_qc'].cumsum()
sessions_df_sorted['cumulative_noise_units'] = sessions_df_sorted['noise_units'].cumsum()
sessions_df_sorted['cumulative_neural_units'] = sessions_df_sorted['neural_units'].cumsum()
sessions_df_sorted['cumulative_sua_units'] = sessions_df_sorted['sua_units'].cumsum()
sessions_df_sorted['cumulative_mua_units'] = sessions_df_sorted['mua_units'].cumsum()

print(f"Total cumulative probes: {sessions_df_sorted['cumulative_probes'].iloc[-1]:,.0f}")
print(f"Total cumulative units: {sessions_df_sorted['cumulative_total_units'].iloc[-1]:,.0f}")
print(f"Units passing QC: {sessions_df_sorted['cumulative_passing_qc'].iloc[-1]:,.0f}")
print(f"Units labeled as noise: {sessions_df_sorted['cumulative_noise_units'].iloc[-1]:,.0f}")
print(f"Units labeled as neural: {sessions_df_sorted['cumulative_neural_units'].iloc[-1]:,.0f}")
print(f"Units labeled as SUA: {sessions_df_sorted['cumulative_sua_units'].iloc[-1]:,.0f}")
print(f"Units labeled as MUA: {sessions_df_sorted['cumulative_mua_units'].iloc[-1]:,.0f}")
print("\n\nCumulative Rates:")
print(f"QC pass rate: {sessions_df_sorted['cumulative_passing_qc'].iloc[-1] / sessions_df_sorted['cumulative_total_units'].iloc[-1] * 100:.1f}%")
print(f"Neural unit rate: {sessions_df_sorted['cumulative_neural_units'].iloc[-1] / sessions_df_sorted['cumulative_total_units'].iloc[-1] * 100:.1f}%")
print(f"SUA rate: {sessions_df_sorted['cumulative_sua_units'].iloc[-1] / sessions_df_sorted['cumulative_total_units'].iloc[-1] * 100:.1f}%")
print(f"MUA rate: {sessions_df_sorted['cumulative_mua_units'].iloc[-1] / sessions_df_sorted['cumulative_total_units'].iloc[-1] * 100:.1f}%")

### Session plots

In [None]:
# Create color-coded weekly sessions plot by sorter
fig_counts, ax = plt.subplots(figsize=(15, 5))

# Define colors for each sorter
sorter_colors = {
    'kilosort2_5': 'C3',      # Blue
    'kilosort4': 'C4',        # Red
}

# Create weekly aggregation by sorter
sessions_df_sorted['week'] = sessions_df_sorted['created_date'].dt.to_period('W')
weekly_sorter_counts = sessions_df_sorted.groupby(['week', 'sorter']).size().unstack(fill_value=0)
weekly_sorter_counts['week_start'] = weekly_sorter_counts.index.to_timestamp()

# Create stacked bar chart
bottom = None
sorter_handles = []

for sorter in ['kilosort2_5', 'kilosort4']:
    if sorter in weekly_sorter_counts.columns:
        bars = ax.bar(weekly_sorter_counts['week_start'], 
                     weekly_sorter_counts[sorter],
                     bottom=bottom, 
                     width=6, 
                     alpha=0.8, 
                     color=sorter_colors[sorter],
                     label=sorter.capitalize(),
                     edgecolor='white', 
                     linewidth=0.5)
        sorter_handles.append(bars)
        
        if bottom is None:
            bottom = weekly_sorter_counts[sorter]
        else:
            bottom += weekly_sorter_counts[sorter]

# Customize the plot
ax.set_xlabel('Week Starting Date', fontsize=12)
ax.set_ylabel('Number of Sessions Processed', fontsize=12)

# Format x-axis
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
ax.xaxis.set_minor_locator(mdates.MonthLocator())

# Rotate x-axis labels
plt.xticks(rotation=45, ha='right')

# Add grid and legend
ax.grid(True, alpha=0.3, axis='y')
ax.set_axisbelow(True)
ax.legend(loc='upper left', fontsize=11)


# Add statistics
total_sessions = len(sessions_df_sorted)
ks25_count = len(sessions_df_sorted[sessions_df_sorted['sorter'] == 'kilosort2_5'])
ks4_count = len(sessions_df_sorted[sessions_df_sorted['sorter'] == 'kilosort4'])

stats_text = f"""Session Statistics by Sorter:
• Kilosort2.5: {ks25_count:,} sessions ({ks25_count/total_sessions*100:.1f}%)
• Kilosort4: {ks4_count:,} sessions ({ks4_count/total_sessions*100:.1f}%)
• Total: {total_sessions:,} sessions
• KS4 deployment: June 11, 2024"""
print(stats_text)

sns.despine(fig_counts)

plt.tight_layout()
plt.show()

fig_counts.savefig(figures_folder / "session_counts.pdf")

In [None]:
# Create color-coded cumulative sessions plot by sorter (excluding spykingcircus2)
fig_cumulative, axs = plt.subplots(figsize=(15, 5), ncols=3)

ax = axs[0]
# Calculate cumulative counts for each sorter
for sorter in ['kilosort2_5', 'kilosort4']:
    sorter_data = sessions_df_sorted[sessions_df_sorted['sorter'] == sorter].copy()
    if len(sorter_data) > 0:
        sorter_data = sorter_data.reset_index(drop=True)
        sorter_data['cumulative_count'] = range(1, len(sorter_data) + 1)
        
        # Plot cumulative line for this sorter
        ax.plot(sorter_data['created_date'], sorter_data['cumulative_count'], 
                linewidth=3, color=sorter_colors[sorter], alpha=0.8, 
                label={sorter.capitalize()})
        final_count = len(sorter_data)
        final_date = sorter_data['created_date'].max()
        final_date = final_date - timedelta(days=70)
        
        ax.annotate(f'{final_count:,}', 
                    xy=(final_date, final_count), xytext=(10, 10),
                    textcoords='offset points', fontsize=10, fontweight='bold',
                    bbox=dict(boxstyle='round', facecolor=sorter_colors[sorter], alpha=0.3),
                    color=sorter_colors[sorter])

# Customize the plot
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Cumulative Number of Sessions', fontsize=12)

# Format x-axis
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
ax.xaxis.set_minor_locator(mdates.MonthLocator())

# Add grid and legend
ax.grid(True, alpha=0.3)
ax.set_axisbelow(True)
ax.legend(loc='upper left', fontsize=11)


# Add statistics
ks25_sessions = len(sessions_df_sorted[sessions_df_sorted['sorter'] == 'kilosort2_5'])
ks4_sessions = len(sessions_df_sorted[sessions_df_sorted['sorter'] == 'kilosort4'])
ks4_days = (sessions_df_sorted['created_date'].max() - ks4_deployment_date).days
ks25_rate = ks4_sessions / ks4_days if ks4_days > 0 else 0
ks4_rate = ks4_sessions / ks4_days if ks4_days > 0 else 0

session_stats_text = f"""Session Statistics:
• KS2.5 total: {ks25_sessions:,} sessions
• KS4 total: {ks4_sessions:,} sessions
• KS4 daily rate: {ks4_rate:.1f} sessions/day
• KS4 adoption: {ks4_sessions/(ks25_sessions+ks4_sessions)*100:.1f}% of total"""
print(session_stats_text)

# Plot cumulative probe line
ax = axs[1]
ax.plot(sessions_df_sorted['created_date'], sessions_df_sorted['cumulative_probes'], 
        linewidth=2.5, color='darkgreen', alpha=0.8, label='Cumulative Probes')

# Customize the plot
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Cumulative Number of Probes', fontsize=12)

# Format x-axis
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
ax.xaxis.set_minor_locator(mdates.MonthLocator())

# Add final count annotation
final_probes = sessions_df_sorted['cumulative_probes'].iloc[-1]
final_date = sessions_df_sorted['created_date'].iloc[-1] - timedelta(90)
ax.annotate(f'{final_probes:,}', 
            xy=(final_date, final_probes), xytext=(10, 10),
            textcoords='offset points', fontsize=10, fontweight='bold',
            bbox=dict(boxstyle='round', facecolor='green', alpha=0.3),
            color="darkgreen")

# Add grid
ax.grid(True, alpha=0.3)
ax.set_axisbelow(True)

# Add statistics
avg_probes_per_session = sessions_df_sorted['num_probes'].mean()
probe_stats_text = f"""\n\nProbe Statistics:
• Total probes processed: {final_probes:,}
• Sessions processed: {len(sessions_df_sorted):,}
• Average probes per session: {avg_probes_per_session:.1f}
• Processing period: {(final_date - pipeline_deployment_date).days} days"""
print(probe_stats_text)

# Cumulative units
ax = axs[2]
line1 = ax.plot(sessions_df_sorted['created_date'], sessions_df_sorted['cumulative_total_units'], 
                linewidth=3, color='darkblue', alpha=0.8, label='All')

# Add final count annotation
final_total = int(sessions_df_sorted['cumulative_total_units'].iloc[-1])
final_date = sessions_df_sorted['created_date'].iloc[-1] - timedelta(90)
ax.annotate(f'{final_total:,}', 
            xy=(final_date, final_total), xytext=(10, 10),
            textcoords='offset points', fontsize=10, fontweight='bold',
            bbox=dict(boxstyle='round', facecolor='blue', alpha=0.3),
            color="darkblue")

# Overlay: Neural units
line2 = ax.plot(sessions_df_sorted['created_date'], sessions_df_sorted['cumulative_neural_units'], 
                linewidth=2, color='darkorange', alpha=0.8, label='Neural')

final_neural = int(sessions_df_sorted['cumulative_neural_units'].iloc[-1])
final_date = sessions_df_sorted['created_date'].iloc[-1] - timedelta(90)
ax.annotate(f'{final_neural:,}', 
            xy=(final_date, final_neural), xytext=(10, 10),
            textcoords='offset points', fontsize=10, fontweight='bold',
            bbox=dict(boxstyle='round', facecolor='orange', alpha=0.3),
            color="darkorange")

# Overlay: Passing QC
line3 = ax.plot(sessions_df_sorted['created_date'], sessions_df_sorted['cumulative_passing_qc'], 
                linewidth=2, color='darkgreen', alpha=0.8, label='Passing QC')

final_passing = int(sessions_df_sorted['cumulative_passing_qc'].iloc[-1])
final_date = sessions_df_sorted['created_date'].iloc[-1] - timedelta(90)
ax.annotate(f'{final_passing:,}', 
            xy=(final_date, final_passing), xytext=(10, 10),
            textcoords='offset points', fontsize=10, fontweight='bold',
            bbox=dict(boxstyle='round', facecolor='green', alpha=0.3),
            color="darkgreen")


# Customize the plot
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Cumulative Number of Units (M)', fontsize=12)

# Format x-axis
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
ax.xaxis.set_minor_locator(mdates.MonthLocator())

# Add grid
ax.grid(True, alpha=0.3)
ax.set_axisbelow(True)

# Add legend
ax.legend(loc='upper left', fontsize=11)

# Final statistics
final_total = sessions_df_sorted['cumulative_total_units'].iloc[-1]
final_passing = sessions_df_sorted['cumulative_passing_qc'].iloc[-1]
final_failing = sessions_df_sorted['cumulative_failing_qc'].iloc[-1]
final_noise = sessions_df_sorted['cumulative_noise_units'].iloc[-1]
final_neural = sessions_df_sorted['cumulative_neural_units'].iloc[-1]
final_date = sessions_df_sorted['created_date'].iloc[-1]

# Calculate percentages
qc_pass_rate = (final_passing / final_total) * 100
qc_fail_rate = (final_failing / final_total) * 100
noise_rate = (final_noise / final_total) * 100
neural_rate = (final_neural / final_total) * 100

# Add comprehensive statistics
units_stats_text = f"""\n\nUnit Processing Statistics:
• Total units processed: {final_total:,.0f}
• Units passing QC: {final_passing:,.0f} ({qc_pass_rate:.1f}%)
• Units failing QC: {final_failing:,.0f} ({qc_fail_rate:.1f}%)
• Noise units: {final_noise:,.0f} ({noise_rate:.1f}%)
• Neural units: {final_neural:,.0f} ({neural_rate:.1f}%)
• Processing period: {(final_date - pipeline_deployment_date).days} days
• Average units per session: {final_total/len(sessions_df_sorted):,.0f}"""
print(units_stats_text)

ax.set_yticklabels([f"{t:.1f}" for t in np.arange(0, 2, 0.2)])

for i, ax in enumerate(axs):
    # Rotate x-axis labels
    ax.tick_params(axis='x', rotation=45)

    # Only keep date in central plot
    if i != 1:
        ax.set_xlabel("")

sns.despine(fig_cumulative)

plt.tight_layout()
plt.show()

fig_cumulative.savefig(figures_folder / "cumulative.pdf")
