In [1]:
import pathlib
from PIL import Image
import numpy as np
import skunk
import json
import matplotlib.pyplot as plt
import xmltodict
import sqlite3
from sqlalchemy import create_engine
import pandas as pd
import npc_session
from annotation_qc import get_surface_image_and_insertion_json_paths

In [2]:
# allow integers >8 bytes to be stored in sqlite3
sqlite3.register_adapter(np.int64, lambda val: int(val))
sqlite3.register_adapter(np.int32, lambda val: int(val))
# ------------------------------------------------------------------------------------
DB_PATH = pathlib.Path('//allen/programs/mindscope/workgroups/dynamicrouting/dynamic_gating_insertions/dr_master.db')
# with contextlib.suppress(OSError):
#     DB_PATH.unlink()
sqlite3.connect(DB_PATH).close()
DB = f"sqlite:///{DB_PATH}"
ENGINE = create_engine(DB, echo=False)

session_metadata = pd.read_sql('session_metadata', con=ENGINE)

In [None]:
min_distance = pd.read_sql('min_distance')

In [9]:
mid = '660023'
sessions = session_metadata[session_metadata['MID'] == int(mid)]['session'].values
templeton = False

In [10]:
def get_surface_image(session_id: str, templeton: bool) -> Image:  
    surface_image_path = get_surface_image_and_insertion_json_paths(session_id, templeton=templeton)
    surface_image = Image.open(surface_image_path)
    return surface_image

In [11]:
def get_hole_letters(session_id: str) -> dict[str, str]:
    session = npc_session.SessionRecord(session_id)
    probes = [column for column in session_metadata if 'Probe' in column]
    holes_letters: dict = {}

    for probe in probes:
        hole = session_metadata[session_metadata['session'] == f'{str(session.subject.id)}_{session.date.id}'][probe].values[0]
        holes_letters[hole] = probe[-1]
    
    return holes_letters

In [12]:
if templeton:
    #TODO: get right one depending on templeton or dynamic routing, right now just hard-coded for templeton
    probe_hole_idx = {
        "A": [1, 2, 3],
        "B": [1, 2, 3],
        "C": [1, 2, 3, 4],
        "D": [1,],
        "E": [],
        "F": [1, 2],
    }
    "1-indexed hole indices available for each probe."
    hole_labels: tuple[str, ...] = tuple(
        f"{probe}{index}"
        for probe, indices in probe_hole_idx.items()
        for index in indices
    )
else:
    probe_hole_idx = {
        "A": [1, 2, 3],
        "B": [1, 2, 3, 4],
        "C": [1, 2, 3, 4],
        "D": [1, 2, 3],
        "E": [1, 2, 3, 4],
        "F": [1, 2, 3],
    }
    "1-indexed hole indices available for each probe."
    hole_labels: tuple[str, ...] = tuple(
        f"{probe}{index}"
        for probe, indices in probe_hole_idx.items()
        for index in indices
    )

In [13]:
if not templeton:
    data = pathlib.Path(r"\\allen\programs\mindscope\workgroups\dynamicrouting\arjun\DR1_no_shading.svg").read_text()
else:
    data = data = pathlib.Path(r"\\allen\programs\mindscope\workgroups\dynamicrouting\arjun\Templeton_probes.svg").read_text()

In [14]:
print(sessions)

['660023_2023-08-08' '660023_2023-08-09']


In [15]:
session_svg = {}
data_svg_replaced = ''

for session in sessions:
    holes_letters = get_hole_letters(session)
    data_svg_replaced = ''
    for hole in hole_labels:
        if hole in holes_letters:
            if data_svg_replaced == '':
                data_svg_replaced = data.replace(
                    f">{hole}</tspan>", f"> {holes_letters[hole]}</tspan>"
                )
            else:
                data_svg_replaced = data_svg_replaced.replace(
                    f">{hole}</tspan>", f"> {holes_letters[hole]}</tspan>"
                )            
        else:
            if data_svg_replaced == '':
                data_svg_replaced = data.replace(f">{hole}</tspan>", f">""</tspan>")
            else:
                data_svg_replaced = data_svg_replaced.replace(f">{hole}</tspan>", f">""</tspan>")
    
    session_svg[session] = data_svg_replaced


In [16]:
for session in sessions:
    fig, ax = plt.subplots(ncols=2)
    fig.set_size_inches(16, 8)

    surface_image = get_surface_image(session.replace('-', ''), templeton)
    fig.suptitle(f'{session}')
    ax[0].imshow(surface_image)
    skunk.connect(ax[1], 'sk')
    svg = skunk.insert({'sk': session_svg[session]})

    plt.close()
    skunk.display(svg)