In [None]:
import nibabel as nib
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd
from collections import defaultdict
from IPython.display import clear_output

import sys
sys.path.append("ComputeCanada/frequency_tagging")
from utils import (
    get_roi_colour_codes,
    change_font,
    HCP_ATLAS as hcp_label,
    dfm_generate_single_subject_maps,
    NORMAL_3T_SUB_IDS,
    NORMAL_7T_SUB_IDS,
    VARY_SUB_IDS,
    VARY_020_ENTRAIN_CONDITIONS,
    VARY_021_ENTRAIN_CONDITIONS,
    PICKLE_DIR,
    MAIN
)
change_font()

tmpdir = Path("/tmp")

#### Get HCP info
- `hcp_mappings`: dict of ROI: dscalars

In [None]:
"""Get HCP labels
"""
_HCP_INFO = !wb_command -file-information {hcp_label}
HCP_LABELS = []
HCP_COUNTER = 0
for i in _HCP_INFO:
    if len(i) == 60 and any(["L_" in i, "R_" in i]):
        hcp_colors = tuple([float(f"0.{k}") for k in [j.split(' ') [0] for j in i.split('0.')][-3:]] + [1])
        if ' R_' in i:
            roi = i.split("_ROI")[0].split(' R_')[1]
            HCP_LABELS.append(f"R_{roi}_ROI")
        if ' L_' in i:
            roi = i.split("_ROI")[0].split(' L_')[1]
            HCP_LABELS.append(f"L_{roi}_ROI")
        HCP_COUNTER += 1

"""Get HCP label coordinates
"""
hcp_mapping = {}
for roi_label in HCP_LABELS:
    out_dscalar = tmpdir / f"{roi_label}.dscalar.nii"
    if out_dscalar.exists():
        hcp_mapping[roi_label] = out_dscalar
        continue
    !wb_command -cifti-label-to-roi {hcp_label} {out_dscalar} -name {roi_label}
    assert out_dscalar.exists(), f"{out_dscalar.stem} does not exist."
    hcp_mapping[roi_label] = out_dscalar
hcp_rois = list(set([k.split('_')[1] for k in hcp_mapping.keys()]))

#### Generate dual frequency tagging maps:
- Runtime: `corr_types = "uncp"`: ~45 minutes
- Runtime: `corr_types = "fdrp"`: TBD

In [None]:
"""Set up for visualizing dual frequency tagging across each subject using fractional overlap
"""
ROI_FO = .8
corr_type = "fdrp"
FORCE_RUN = False

"""Save png
"""
df_map_pkl = PICKLE_DIR / f"df_map_corr-{corr_type}.pkl"
if df_map_pkl.exists() and not FORCE_RUN:
    df = pd.read_pickle(df_map_pkl)
else:
    # 3T control/entrain
    label = "3TNormal"
    df_data = None
    for _roi_task_id in ["entrain","control"]:
        experiment_id = "1_frequency_tagging" 
        mri_id = "3T"
        n_subjects = len(NORMAL_3T_SUB_IDS)
        roi_task_ids = [_roi_task_id] * n_subjects
        roi_f_1s = [.125] * n_subjects
        roi_f_2s = [.2] * n_subjects
        df_data = dfm_generate_single_subject_maps(
            hcp_mapping,
            label, experiment_id, mri_id, NORMAL_3T_SUB_IDS, 
            roi_task_ids, roi_f_1s, roi_f_2s, ROI_FO,
            df_data=df_data,
            corr_type=corr_type,
            ROI_FO=ROI_FO,
        )
        clear_output()
    # 7T AttendAway
    label = "7TNormal"
    experiment_id = "1_attention" 
    mri_id = "7T"
    n_subjects = len(NORMAL_7T_SUB_IDS)
    roi_task_ids = ["AttendAway"] * n_subjects
    roi_f_1s = [.125] * n_subjects
    roi_f_2s = [.2] * n_subjects
    df_data = dfm_generate_single_subject_maps(
        hcp_mapping,
        label, experiment_id, mri_id, NORMAL_7T_SUB_IDS, 
        roi_task_ids, roi_f_1s, roi_f_2s, ROI_FO,
        df_data=df_data,
        corr_type=corr_type,
        ROI_FO=ROI_FO
    )
    clear_output()
    # 3T/7T vary
    experiment_id = "1_frequency_tagging"
    for label, mri_id in zip(["3TVary","7TVary"], ["3T","7T"]):
        sub_ids = [VARY_SUB_IDS[0]] * 3 + [VARY_SUB_IDS[1]] * 3
        roi_task_ids = [i[0] for i in VARY_020_ENTRAIN_CONDITIONS] + [i[0] for i in VARY_021_ENTRAIN_CONDITIONS]
        roi_f_1s = [i[1] for i in VARY_020_ENTRAIN_CONDITIONS] + [i[1] for i in VARY_021_ENTRAIN_CONDITIONS]
        roi_f_2s = [i[2] for i in VARY_020_ENTRAIN_CONDITIONS] + [i[2] for i in VARY_021_ENTRAIN_CONDITIONS]
        df_data = dfm_generate_single_subject_maps(
            hcp_mapping,
            label, experiment_id, mri_id, sub_ids,
            roi_task_ids, roi_f_1s, roi_f_2s, ROI_FO,
            df_data=df_data,
            corr_type=corr_type,
            ROI_FO=ROI_FO
        )
        clear_output()
    df = pd.DataFrame(df_data)
    df.to_pickle(df_map_pkl)

#### Generate f1, f2 locked maps across MRI strength and varying entrainment experiments
- sub_ids = 020, 021

In [None]:
from utils import (
    load_mean_dtseries,
    WORKING_DIR,
    crop_and_save,
)

from wbplot import dscalar

DFM_VARY_LOCKED_MAPS = WORKING_DIR / "figures" / "dual_frequency_mapping" / "locked_vary_maps"
if not DFM_VARY_LOCKED_MAPS.exists():
    DFM_VARY_LOCKED_MAPS.mkdir(parents=True, exist_ok=True)

VERTEX_TO = 59412
PALETTE_PARAMS = {
    "disp-zero": False,
    "disp-neg": False,
    "disp-pos": True,
    "pos-user": (0, 1.),
    "neg-user": (-1,0),
    "interpolate": True,
}
# Cropped settings
LEFT=590;TOP=80;RIGHT=1140;BOTTOM=460


In [None]:
expected_n_runs = 6

for sub_id in ["020", "021"]:

    if sub_id == "020":
        entrain_conditions = [i[0] for i in VARY_020_ENTRAIN_CONDITIONS]
    elif sub_id == "021":
        entrain_conditions = [i[0] for i in VARY_021_ENTRAIN_CONDITIONS]

    entrain_f1 = []
    entrain_f2 = []
    entrain1_activations = !ls /scratch/fastfmri/*merged*basic*roi-entrain*uncp*/sub-{sub_id}/bootstrap/*task-{entrain_conditions[0]}*data-train*activations*
    entrain_f1.append(entrain1_activations[0])
    entrain_f2.append(entrain1_activations[1])
    entrain_f1.append(entrain1_activations[2])
    entrain_f2.append(entrain1_activations[3])
    entrain2_activations = !ls /scratch/fastfmri/*merged*basic*roi-entrain*uncp*/sub-{sub_id}/bootstrap/*task-{entrain_conditions[1]}*data-train*activations*
    entrain_f1.append(entrain2_activations[0])
    entrain_f2.append(entrain2_activations[1])
    entrain_f1.append(entrain2_activations[2])
    entrain_f2.append(entrain2_activations[3])
    entrain3_activations = !ls /scratch/fastfmri/*merged*basic*roi-entrain*uncp*/sub-{sub_id}/bootstrap/*task-{entrain_conditions[2]}*data-train*activations*
    entrain_f1.append(entrain3_activations[0])
    entrain_f2.append(entrain3_activations[1])
    entrain_f1.append(entrain3_activations[2])
    entrain_f2.append(entrain3_activations[3])

    assert len(entrain_f1) == expected_n_runs, f"{len(entrain_f1)} vs {expected_n_runs}"
    assert len(entrain_f2) == expected_n_runs

    for entrain_descriptor, entrain_f in zip(["f1","f2"],[entrain_f1, entrain_f2]):
        png_out = DFM_VARY_LOCKED_MAPS / f"sub-{sub_id}_{entrain_descriptor}.png"
        for ix, activation in enumerate(entrain_f):
            _data = (load_mean_dtseries(activation)[:VERTEX_TO] > .8).astype(float)
            if ix == 0:
                data = _data
            else:
                data += _data
        data /= 6
        dscalar(
            png_out, data, 
            orientation="portrait", 
            hemisphere='right',
            palette="magma", 
            palette_params=PALETTE_PARAMS,
            transparent=False,
            flatmap=True,
            flatmap_style='plain',
        )
        crop_and_save(png_out, str(png_out).replace("png", "cropped.png"), LEFT, TOP, RIGHT, BOTTOM)
        clear_output()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Create some example data
data = np.random.rand(10, 10)

# Create a figure and axis
fig, ax = plt.subplots(figsize=(1,1), dpi=300)

# Plot data on the axis with the 'magma' colormap
cax = ax.imshow(data, cmap='magma')

# Add a colorbar to the figure
cbar = fig.colorbar(cax, ax=ax, fraction=.5, pad=.2)
cbar.set_ticks([0,1])
cbar.set_ticklabels([0,1], fontsize=6)
for spine in cbar.ax.spines.values():
    spine.set_visible(False)

cbar.ax.tick_params("both",pad=0, length=0,)
fig.savefig(MAIN / f"Fig1_magma_cbar.png", dpi=300)
#plt.close()
