In [None]:
import glob
import numpy as np
import random
import torch
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


##### Data Loading

In [None]:
def load_and_split_traces(npy_path, label, seed=1024):
    data = np.load(npy_path, allow_pickle=True)  
    traces = list(data)
    random.seed(seed)
    random.shuffle(traces)
    n_total = len(traces)
    n_train = int(n_total * 0.7)
    n_val = int(n_total * 0.15)
    n_test = n_total - n_train - n_val
    train_samples = [(t, label) for t in traces[:n_train]]
    val_samples   = [(t, label) for t in traces[n_train:n_train + n_val]]
    test_samples  = [(t, label) for t in traces[n_train + n_val:]]
    return train_samples, val_samples, test_samples

cup_paths = glob.glob("../data/CuP-0.1mM.npy")
cup_train_samples = []
cup_val_samples = []
cup_test_samples = []

for path in cup_paths:
    train_part, val_part, test_part = load_and_split_traces(path, label=0, seed=1024)
    cup_train_samples.extend(train_part)
    cup_val_samples.extend(val_part)
    cup_test_samples.extend(test_part)



In [None]:
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import os

caf_paths = glob.glob("../data/CuP+CAF-1_2(0.2mM).npy")
caf_train_samples = []
caf_val_samples = []

for path in caf_paths:
    data = np.load(path, allow_pickle=True)
    traces = list(data)  

    random.seed(1024)
    random.shuffle(traces)
    n_total = len(traces)
    n_train = int(n_total * 0.8)
    n_val = n_total - n_train
    caf_train_samples.extend([(t, 1) for t in traces[:n_train]])
    caf_val_samples.extend([(t, 1) for t in traces[n_train:]])

train_samples = cup_train_samples + caf_train_samples
val_samples   = cup_val_samples   + caf_val_samples


In [None]:
from torch.utils.data import Dataset, DataLoader
class TraceDataset(Dataset):
    def __init__(self, samples, max_len=1500):
        self.samples = samples
        self.max_len = max_len

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        trace, label = self.samples[idx]
        trace = np.array(trace, dtype=np.float32)

        if len(trace) > self.max_len:
            trace = trace[:self.max_len]
        else:
            trace = np.pad(trace, (0, self.max_len - len(trace)), constant_values=0)
        x_time = torch.tensor(trace).unsqueeze(0)  
        freq = np.fft.rfft(trace)
        mag = np.abs(freq)
        x_freq = torch.tensor(mag[np.newaxis, :], dtype=torch.float32)

        return x_time, x_freq, label

##### Model

In [None]:
import torch.nn as nn
import torch.nn.functional as F
class TFC(nn.Module):
    def __init__(self, configs):
        super(TFC, self).__init__()
        self.encoder_time = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=9, padding=4),
            nn.GELU(),
            nn.LayerNorm([64, 1500]),
            nn.MaxPool1d(2), 

            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.GELU(),
            nn.LayerNorm([128, 750]),

            nn.Conv1d(128, 128, kernel_size=3, padding=1),
            nn.GELU(),
            nn.LayerNorm([128, 750]),

            nn.Flatten(),
            nn.Linear(128 * 750, 128),
            nn.GELU(),
            nn.LayerNorm(128),
            nn.Dropout(0.25)
        )

        self.projector_t = nn.Sequential(
            nn.Conv1d(128, 64, kernel_size=1),
            nn.LayerNorm([64, 1]),
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten()
        )
        self.encoder_freq = nn.Sequential(
            nn.Conv1d(1, 64, kernel_size=7, padding=3),
            nn.GELU(),
            nn.LayerNorm([64, 751]),
            nn.MaxPool1d(2), 

            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.GELU(),
            nn.LayerNorm([128, 375]),

            nn.Conv1d(128, 128, kernel_size=3, padding=1),
            nn.GELU(),
            nn.LayerNorm([128, 375]),

            nn.Flatten(),
            nn.Linear(128 * 375, 128),
            nn.GELU(),
            nn.LayerNorm(128),
            nn.Dropout(0.25)
        )

        self.projector_f = nn.Sequential(
            nn.Conv1d(128, 64, kernel_size=1),
            nn.LayerNorm([64, 1]),
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten()
        )

    def forward(self, x_in_t, x_in_f):
        h_t = self.encoder_time(x_in_t)      
        h_t = h_t.unsqueeze(-1)              
        z_t = self.projector_t(h_t)           

        h_f = self.encoder_freq(x_in_f)       
        h_f = h_f.unsqueeze(-1)               
        z_f = self.projector_f(h_f)           

        return z_t, z_f

class target_classifier(nn.Module):
    def __init__(self, configs):
        super(target_classifier, self).__init__()
        self.logits = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 2)
        )

    def forward(self, z_t, z_f):
        z = torch.cat([z_t, z_f], dim=1)  
        out = self.logits(z)
        return out


##### Model Training

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay,classification_report,accuracy_score
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from collections import Counter
import matplotlib.colors as mcolors
def ConsistencyLoss(z1, z2):
    z1 = F.normalize(z1, dim=1)
    z2 = F.normalize(z2, dim=1)
    return F.mse_loss(z1, z2)
def train_one_epoch(tfc_model, classifier, dataloader, optimizer, criterion, contrastive_weight=0.1):
    tfc_model.train()
    classifier.train()
    total_loss = 0
    for x_t, x_f, y in dataloader:
        x_t, x_f, y = x_t.to(device), x_f.to(device), y.to(device)
        z_t, z_f = tfc_model(x_t, x_f)
        logits = classifier(z_t, z_f)
        ce_loss = criterion(logits, y)
        loss_c = ConsistencyLoss(F.normalize(z_t, dim=1), F.normalize(z_f, dim=1))
        loss = ce_loss + 0.2 * loss_c
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate(tfc_model, classifier, dataloader, return_acc=False):

    tfc_model.eval()
    classifier.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for x_t, x_f, y in dataloader:
            x_t, x_f = x_t.to(device), x_f.to(device)
            z_t, z_f = tfc_model(x_t, x_f)
            logits = classifier(z_t, z_f)
            
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(y.numpy())

    print("\n Classification Report:")
    print(classification_report(all_labels, all_preds, digits=4))
    acc = accuracy_score(all_labels, all_preds)
    print(f"\n Accuracy: {acc:.4f}")
    
    cm_normalized = confusion_matrix(all_labels, all_preds, normalize='true')
    
    display_labels = ["CuP", "CuP+CAF"]
    fig, ax = plt.subplots(figsize=(8, 6))
    colors = ["#FFFFFF", "#F3A697", "#F17D65"]
    cmap_custom = mcolors.LinearSegmentedColormap.from_list("white_to_red", colors)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm_normalized,
                                  display_labels=display_labels)

    disp.plot(cmap=cmap_custom, values_format='.3f', ax=ax, colorbar=False)
    im = disp.im_
    im.set_clim(0, 1)
    plt.rcParams.update({
        'font.size': 20,
        # 'font.family': 'Arial',
        'xtick.labelsize': 20,
        'ytick.labelsize': 20
    })

    cbar = fig.colorbar(im, ax=ax)
    cbar.ax.tick_params(labelsize=20)     
    cbar.ax.yaxis.label.set_size(22)      
    
    plt.setp(ax.get_yticklabels(), ha="right", multialignment="center", fontsize=20)
    ax.set_xlabel("Predicted label", fontsize=22, fontweight='normal')
    ax.set_ylabel("True label", fontsize=22, fontweight='normal')
    plt.setp(ax.get_xticklabels(), ha='center', fontsize=20)
    plt.setp(ax.get_yticklabels(), rotation=90, va='center', fontsize=20)

    
    for text in disp.ax_.texts:
        text.set_fontsize(22)
        bg_val = disp.confusion_matrix[int(text.get_position()[1])][int(text.get_position()[0])]
        if bg_val > 0.5:
            text.set_color('white')
        else:
            text.set_color('black')

    plt.tight_layout()
        
    plt.show()
    
    if return_acc:
        return acc


def predict_class_distribution(tfc_model, classifier, dataloader, device, class_names=["CuP", "CAF"]):
    tfc_model.eval()
    classifier.eval()
    all_preds = []
    with torch.no_grad():
        for x_t, x_f, _ in dataloader:
            x_t, x_f = x_t.to(device), x_f.to(device)
            z_t, z_f= tfc_model(x_t, x_f)  
            logits = classifier(z_t, z_f)            
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            all_preds.extend(preds)

    counter = Counter(all_preds)
    total = sum(counter.values())
    for cls_idx in sorted(counter.keys()):
        count = counter[cls_idx]
        percent = 100 * count / total
        cls_name = class_names[cls_idx] if cls_idx < len(class_names) else f"Class {cls_idx}"
        print(f"{cls_name:>6} ：Proportion {percent:.2f}%")
    return counter

In [None]:
set_seed(42)
train_loader = DataLoader(TraceDataset(train_samples), batch_size=32, shuffle=True, drop_last=True)
val_loader   = DataLoader(TraceDataset(val_samples), batch_size=32, shuffle=False, drop_last=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class Config:
    TSlength_aligned_t = 1500
    TSlength_aligned_f = 751
    num_classes_target = 2
configs = Config()

tfc_model = TFC(configs).to(device)
classifier = target_classifier(configs).to(device)
optimizer = torch.optim.Adam(list(tfc_model.parameters()) + list(classifier.parameters()), lr=4e-4)
num_cup = sum(label == 0 for _, label in train_samples)
num_caf = sum(label == 1 for _, label in train_samples)
total = num_cup + num_caf
print("Train set: CuP=", num_cup, ", CAF=", num_caf)

weights = torch.tensor([
    total / num_cup,
    total / num_caf
], dtype=torch.float32).to(device)
print(f"Using class weights: CuP={weights[0]:.2f}, CAF={weights[1]:.2f}")

criterion = nn.CrossEntropyLoss(weight=weights)

for epoch in range(1, 21):
    loss = train_one_epoch(tfc_model, classifier, train_loader, optimizer, criterion)
    print(f"[Epoch {epoch}] Loss: {loss:.4f}")
    evaluate(tfc_model, classifier, val_loader)



In [None]:
def load_trace_level_samples(file_label_pairs, max_len):
    samples = []
    for path, label in file_label_pairs:
        data = np.load(path, allow_pickle=True) 
        for trace in data:
            trace = np.array(trace, dtype=np.float32)
            if len(trace) > max_len:
                trace = trace[:max_len]
            else:
                pad_len = max_len - len(trace)
                trace = np.concatenate([trace, np.zeros(pad_len, dtype=np.float32)])
            samples.append((trace, label))
    return samples

In [None]:
import random
from torch.utils.data import DataLoader

final_test_samples = [(trace, 0) for trace, _ in cup_test_samples]
final_test_loader = DataLoader(TraceDataset(final_test_samples), batch_size=32, shuffle=False)
evaluate(tfc_model, classifier, final_test_loader)
predict_class_distribution(tfc_model, classifier, final_test_loader, device)

##### Prediction

In [None]:
from torch.utils.data import DataLoader
from collections import Counter
manual_paths = [
    ("../data/CuP+CAF-1_1(0.1mM).npy", 1E-4),
    ("../data/CuP+CAF-1_0.5(0.05mM).npy", 5E-5),
    ("../data/CuP+CAF-10_1(0.01mM).npy", 1E-5),
    ("../data/CuP+CAF-100_1(0.001mM).npy", 1E-6),
    ("../data/CuP+CAF-1000_1(0.0001mM).npy", 1E-7),
    ("../data/CuP+CAF-10000_1(0.00001mM).npy", 1E-8),
    ("../data/CuP+CAF-100000_1(0.000001mM).npy", 1E-9),
    ("../data/CuP+CAF-1000000_1(0.0000001mM).npy", 1E-10),
    ("../data/CuP+CAF-10000000_1(0.00000001mM).npy", 1E-11),
    ("../data/CuP+CAF-10^10_1(10^-14mM).npy", 1E-14),
    ("../data/CuP+CAF-10^12_1(10^-16mM).npy", 1E-16),
    ("../data/CuP+CAF-10^14_1(10^-18mM).npy", 1E-18),

]

def load_trace_level_samples(file_label_pairs, max_len=1500):
    samples = []
    for path, label in file_label_pairs:
        data = np.load(path, allow_pickle=True)
        for trace in data:
            trace = np.array(trace, dtype=np.float32)
            if len(trace) > max_len:
                trace = trace[:max_len]
            else:
                trace = np.pad(trace, (0, max_len - len(trace)), constant_values=0)
            samples.append((trace, label))
    return samples
concentration_list = []
caf_proportion_list = []

for path, conc in manual_paths:
    try:
        samples = load_trace_level_samples([(path, 1)], max_len=1500)
        if len(samples) == 0:
            continue
        loader = DataLoader(TraceDataset(samples), batch_size=32, shuffle=False, drop_last=True)

        tfc_model.eval()
        classifier.eval()
        preds = []
        with torch.no_grad():
            for x_t, x_f, _ in loader:
                x_t, x_f = x_t.to(device), x_f.to(device)
                z_t, z_f = tfc_model(x_t, x_f)
                out = classifier(z_t, z_f)
                pred = torch.argmax(out, dim=1).cpu().numpy()
                preds.extend(pred.tolist())
        count = Counter(preds)
        total = len(preds)
        ratio = count.get(1, 0) / total
        print(f"{conc:.1e} mM:  CAF Ratio = {ratio:.2f}")

        concentration_list.append(conc)
        caf_proportion_list.append(ratio)

    except Exception as e:
        print(f"error")

##### Limit of Detection

In [None]:
import matplotlib.ticker as ticker
from scipy.stats import linregress
from matplotlib.ticker import FuncFormatter, LogLocator, NullFormatter

valid_indices = [i for i, c in enumerate(concentration_list) if c > 0]
concentration_array = np.array(concentration_list)[valid_indices]
proportion_array = np.array(caf_proportion_list)[valid_indices]

log_conc = np.log10(concentration_array)
slope, intercept, r_value, _, _ = linregress(log_conc, proportion_array)
log_x_min = np.log10(min(concentration_array))
log_x_max = np.log10(max(concentration_array))
fit_x = np.linspace(log_x_min, log_x_max, 100)
fit_y = slope * fit_x + intercept


plt.rcParams.update({
    'font.size': 20,
    # 'font.family': 'Arial',
    'xtick.labelsize': 20,
    'ytick.labelsize': 20
})

fig, ax = plt.subplots(figsize=(8, 6))
ax.scatter(concentration_array, proportion_array, 
           marker='o', s=180, c='#F17D65', alpha=0.8, edgecolors='none',zorder=10, label='CuP+CAF')
ax.plot(10**fit_x, fit_y, '-', color='black', lw=2.5)


ax.set_xscale("log")
ax.set_ylim(0, 1.05)
ax.set_xlim(min(concentration_array) * 0.1, max(concentration_array) * 10)
ax.set_xlabel("CAF concentration / mol/L", fontsize=24, fontweight='normal')
ax.set_ylabel("CAF ratio", fontsize=24, fontweight='normal')

def sci_notation_formatter(x, pos):
    if x == 0:
        return "0"
    exponent = int(np.log10(x))
    return r"$10^{{{}}}$".format(exponent)
ax.xaxis.set_major_locator(LogLocator(base=10.0, subs=[1.0]))
ax.xaxis.set_major_formatter(FuncFormatter(sci_notation_formatter))
plt.setp(ax.get_xticklabels(), rotation=0, ha="center")

for spine in ['top', 'bottom', 'left', 'right']:
    ax.spines[spine].set_visible(True)
    ax.spines[spine].set_linewidth(1.5)
ax.tick_params(width=1.5, length=6)

sign = '+' if intercept >= 0 else '-'
equation_text = (
    f"y = {slope:.2f}x {sign} {abs(intercept):.2f}\n"
    f"$R^2$ = {r_value**2:.2f}"
)
ax.text(0.05, 0.65, equation_text,
        transform=ax.transAxes,
        fontsize=24,
        verticalalignment='top')

ax.legend(loc='upper left', fontsize=24, frameon=False, handletextpad=0.1, markerscale=1.5)
plt.tight_layout()
plt.show()

##### Response Time

In [None]:
import matplotlib.ticker as ticker
from scipy.stats import linregress
from collections import Counter

sample_sizes_to_test = [40,50,66, 79,200, 'all']
set_seed(43)
output_dir = "analysis_by_sample_size"
os.makedirs(output_dir, exist_ok=True)

for n_samples in sample_sizes_to_test:
    
    concentration_list = []
    caf_proportion_list = []
    for path, conc in manual_paths:

        all_samples = load_trace_level_samples([(path, 1)], max_len=1500)
        if n_samples == 'all':
            samples_to_use = all_samples
        else:
            num_to_take = min(n_samples, len(all_samples))
            samples_to_use = random.sample(all_samples, num_to_take)

        if not samples_to_use:
            continue

        loader = DataLoader(TraceDataset(samples_to_use), batch_size=32, shuffle=False, drop_last=True)
        
        tfc_model.eval()
        classifier.eval()
        preds = []

        with torch.no_grad():
            for x_t, x_f, _ in loader:
                x_t, x_f = x_t.to(device), x_f.to(device)
                z_t, z_f = tfc_model(x_t, x_f)
                out = classifier(z_t, z_f)
                pred = torch.argmax(out, dim=1).cpu().numpy()
                preds.extend(pred.tolist())
        

        count = Counter(preds)
        total = len(preds)
        ratio = count.get(1, 0) / total 
        print(f"{conc:.1e} mM: CAF Ratio = {ratio:.3f}")

        concentration_list.append(conc)
        caf_proportion_list.append(ratio)

    valid_indices = [i for i, c in enumerate(concentration_list) if c > 0]
    concentration_array = np.array(concentration_list)[valid_indices]
    proportion_array = np.array(caf_proportion_list)[valid_indices]
    

    log_conc = np.log10(concentration_array)
    slope, intercept, r_value, _, _ = linregress(log_conc, proportion_array)
    log_x_min = np.log10(min(concentration_array))
    log_x_max = np.log10(max(concentration_array))
    fit_x = np.linspace(log_x_min, log_x_max, 100)
    fit_y = slope * fit_x + intercept

    plt.rcParams.update({
        'font.size': 20,
        # 'font.family': 'Arial',
        'xtick.labelsize': 20,
        'ytick.labelsize': 20
    })

    fig, ax = plt.subplots(figsize=(8, 6))

    label = f'CuP+CAF (n = {n_samples})' if n_samples != 'all' else 'CuP+CAF (entire dataset)'

    ax.scatter(concentration_array, proportion_array, 
               marker='o', s=180, c='#F17D65', alpha=0.8, edgecolors='none',zorder=10, label=label)
    ax.plot(10**fit_x, fit_y, '-', color='black', lw=2.5)

    ax.set_xscale("log")
    ax.set_ylim(0, 1.05)
    ax.set_xlim(min(concentration_array) * 0.1, max(concentration_array) * 10)
    ax.set_xlabel("CAF concentration / mol/L", fontsize=24)
    ax.set_ylabel("CAF ratio", fontsize=24) 

    def sci_notation_formatter(x, pos):
        if x == 0:
            return "0"
        exponent = int(np.log10(x))
        return r"$10^{{{}}}$".format(exponent)

    ax.xaxis.set_major_locator(LogLocator(base=10.0, subs=[1.0]))
    ax.xaxis.set_major_formatter(FuncFormatter(sci_notation_formatter))
    plt.setp(ax.get_xticklabels(), rotation=0, ha="center")

    for spine in ['top', 'bottom', 'left', 'right']:
        ax.spines[spine].set_visible(True)
        ax.spines[spine].set_linewidth(1.5)
    ax.tick_params(width=1.5, length=6)

    sign = '+' if intercept >= 0 else '-'
    equation_text = (
        f"y = {slope:.2f}x {sign} {abs(intercept):.2f}\n"
        f"$R^2$ = {r_value**2:.2f}"
    )
    ax.text(0.05, 0.65, equation_text,
            transform=ax.transAxes,
            fontsize=24,
            verticalalignment='top')

    ax.legend(loc='upper left',fontsize=24, frameon=False, handletextpad=0.1, markerscale=1.5)
    plt.tight_layout()
    plt.show()



##### Response Time-plot

In [None]:
from scipy.stats import linregress
from torch.utils.data import DataLoader
from collections import Counter

set_seed(42)
MAX_N              = 1600                          
SAMPLING_FREQ      = 20000              
TRACE_LENGTH       = 1500               
DECISION_TIME_S    = 5.0                
START_N_MIN        = 10                 
all_data_by_conc = {}
min_samples_across_all_files = float('inf')

for path, conc in manual_paths:
    samples = load_trace_level_samples([(path, 1)], max_len=TRACE_LENGTH)
    all_data_by_conc[conc] = samples
    min_samples_across_all_files = min(min_samples_across_all_files, len(samples))

MAX_N = min(MAX_N, min_samples_across_all_files)
time_per_trace = TRACE_LENGTH / SAMPLING_FREQ   

def build_sample_points(max_n,
                        start_n=START_N_MIN,
                        time_per_trace=time_per_trace,
                        decision_time_s=DECISION_TIME_S,
                        num_points=38,      
                        power=2.4,          
                        min_step=4):        
    t_min = start_n * time_per_trace
    t_max = max_n  * time_per_trace
    u = np.linspace(0.0, 1.0, num_points)
    t = t_min + (t_max - t_min) * (u ** power)
    n = np.rint(t / time_per_trace).astype(int)
    decision_n = int(round(decision_time_s / time_per_trace))
    n = np.concatenate(([start_n], n, [max_n, decision_n]))
    n = np.unique(n)
    n = n[(n >= start_n) & (n <= max_n)]
    n_sorted = [int(n[0])]
    for k in n[1:]:
        if k - n_sorted[-1] >= min_step:
            n_sorted.append(int(k))
    return n_sorted

sample_points = build_sample_points(MAX_N)
time_points   = [n * time_per_trace for n in sample_points]

def _predict_ratio_for_conc(n, all_samples):
    k = min(n, len(all_samples))
    samples_to_use = random.sample(all_samples, k)
    loader = DataLoader(TraceDataset(samples_to_use), batch_size=32, shuffle=False, drop_last=False)
    if len(loader) == 0:
        return None
    preds = []
    with torch.no_grad():
        for x_t, x_f, _ in loader:
            x_t, x_f = x_t.to(device), x_f.to(device)
            z_t, z_f = tfc_model(x_t, x_f)
            out = classifier(z_t, z_f)
            pred = torch.argmax(out, dim=1).cpu().numpy()
            preds.extend(pred.tolist())
    if not preds:
        return None
    return Counter(preds).get(1, 0) / len(preds)


def get_fit_params_at_n_samples(n, data_source):
    concentration_list, caf_proportion_list = [], []
    for conc, all_samples in data_source.items():
        ratio = _predict_ratio_for_conc(n, all_samples)
        if ratio is None:
            continue
        concentration_list.append(conc)
        caf_proportion_list.append(ratio)
    if len(concentration_list) < 2:
        return None, None, None
    logc = np.log10(concentration_list)
    slope, intercept, r_value, *_ = linregress(logc, caf_proportion_list)
    r2 = r_value**2
    return slope, intercept, r2
tfc_model.eval(); classifier.eval()
slopes, intercepts, r2s = [], [], []

for n in sample_points:
    s, b, r2 = get_fit_params_at_n_samples(n, all_data_by_conc)
    slopes.append(np.nan if s  is None else s)
    intercepts.append(np.nan if b  is None else b)
    r2s.append(np.nan if r2 is None else r2)
    print(f"  - n={n}: slope={s:.6f}, intercept={b:.6f}, R^2={r2:.4f}")

final_slope, final_intercept, final_r2 = get_fit_params_at_n_samples(min_samples_across_all_files, all_data_by_conc)
print(f"  - slope_all={final_slope:.6f}, intercept_all={final_intercept:.6f}, R^2_all={final_r2:.4f}")

eps = 1e-12
fs = final_slope     if abs(final_slope)     > eps else eps
fi = final_intercept if abs(final_intercept) > eps else eps
fr = final_r2        if abs(final_r2)        > eps else eps  

norm_slopes     = np.array(slopes, dtype=float)     / fs
norm_intercepts = np.array(intercepts, dtype=float) / fi
r2_array        = np.array(r2s, dtype=float)
norm_r2         = r2_array / fr                    

In [None]:
from matplotlib.ticker import MultipleLocator, FixedLocator
import matplotlib.gridspec as gridspec
from math import floor

plt.rcParams.update({
    'font.size': 24,
    # 'font.family': 'Arial', 
    'font.weight': 'normal',
    'xtick.labelsize': 24, 
    'ytick.labelsize': 24
})

def _interp_at_break(tp, yA, yB, break_point):
    idxR = int(np.searchsorted(tp, break_point))
    idxL = max(idxR - 1, 0)
    valid_indices_A = np.where(~np.isnan(yA))[0]
    valid_indices_B = np.where(~np.isnan(yB))[0]
    
    if idxR == 0 or idxR >= len(tp) or np.isclose(tp[idxL], tp[min(idxR, len(tp)-1)]):
        yB_A = yA[idxL] if idxL in valid_indices_A else np.nan
        yB_B = yB[idxL] if idxL in valid_indices_B else np.nan
        tB = break_point
    else:
        t0, t1 = tp[idxL], tp[idxR]
        w = (break_point - t0) / (t1 - t0)
        yB_A = yA[idxL] + w * (yA[idxR] - yA[idxL]) if idxL in valid_indices_A and idxR in valid_indices_A else np.nan
        yB_B = yB[idxL] + w * (yB[idxR] - yB[idxL]) if idxL in valid_indices_B and idxR in valid_indices_B else np.nan
        tB = break_point

    tL, A_L, B_L = np.r_[tp[:idxR], tB], np.r_[yA[:idxR], yB_A], np.r_[yB[:idxR], yB_B]
    tR, A_R, B_R = np.r_[tB, tp[idxR:]], np.r_[yB_A, yA[idxR:]], np.r_[yB_B, yB[idxR:]]
    return tB, (tL, A_L, B_L), (tR, A_R, B_R)

fig = plt.figure(figsize=(10, 6))
gs = gridspec.GridSpec(1, 2, width_ratios=[1, 1])
ax_left  = fig.add_subplot(gs[0])
ax_right = fig.add_subplot(gs[1], sharey=ax_left) 

ax_left.spines['right'].set_visible(False)
ax_right.spines['left'].set_visible(False)

tp = np.asarray(time_points, dtype=float)
ys = np.asarray(norm_slopes, dtype=float)
yi = np.asarray(norm_intercepts, dtype=float)

BAND = 0.05
y_min = min(np.nanmin(ys), np.nanmin(yi), np.nanmin(norm_r2), 1 - BAND) - 0.025
y_max = max(np.nanmax(ys), np.nanmax(yi), np.nanmax(norm_r2), 1 + BAND) + 0.025
ax_left.set_ylim(y_min, y_max)
ax_left.set_ylabel("Normalized Value", fontsize=24, labelpad=6)

for ax in (ax_left, ax_right):
    ax.axhspan(1 - BAND, 1 + BAND, facecolor='gray', alpha=0.15, zorder=-20)
    ax.axhline(1.0, color='black', linestyle='--', lw=1.3, zorder=-5)
    ax.set_facecolor('none')

BREAK_POINT = 20.0
tB,  (tL,  yL,  iL), (tR,  yR,  iR)  = _interp_at_break(tp, ys,      yi,       BREAK_POINT)
tB2, (tL2, r2L, _ ), (tR2, r2R, _ ) = _interp_at_break(tp, norm_r2,  norm_r2,  BREAK_POINT)

SLOPE_COLOR     = 'crimson'     
INTERCEPT_COLOR = 'royalblue'  
R2_COLOR = 'forestgreen'
ALPHA_VAL = 0.8
line_s_left, = ax_left.plot(tL,  yL, 'o-',  lw=2.0, ms=7, color=SLOPE_COLOR,
                            label='Slope',     zorder=5, alpha=ALPHA_VAL)
line_i_left, = ax_left.plot(tL,  iL, 's--', lw=2.0, ms=6, color=INTERCEPT_COLOR,
                            label='Intercept', zorder=5, alpha=ALPHA_VAL)
line_r2_left,= ax_left.plot(tL2, r2L, '^-.', lw=2.0, ms=7, color=R2_COLOR,
                            label=r'$R^2$',    zorder=4, alpha=ALPHA_VAL)

ax_right.plot(tR,  yR,  'o-',  lw=2.0, ms=7, color=SLOPE_COLOR,     zorder=5, alpha=ALPHA_VAL)
ax_right.plot(tR,  iR,  's--', lw=2.0, ms=6, color=INTERCEPT_COLOR, zorder=5, alpha=ALPHA_VAL)
ax_right.plot(tR2, r2R, '^-.', lw=2.0, ms=7, color=R2_COLOR,        zorder=4, alpha=ALPHA_VAL)
ax_left.plot(tB,  yL[-1],  'o', ms=8, color=SLOPE_COLOR,     zorder=6, alpha=ALPHA_VAL)
ax_left.plot(tB,  iL[-1],  's', ms=7, color=INTERCEPT_COLOR, zorder=6, alpha=ALPHA_VAL)
ax_left.plot(tB2, r2L[-1], '^', ms=8, color=R2_COLOR,        zorder=6, alpha=ALPHA_VAL)
ax_right.plot(tB,  yR[0],  'o', ms=8, color=SLOPE_COLOR,     zorder=6, alpha=ALPHA_VAL)
ax_right.plot(tB,  iR[0],  's', ms=7, color=INTERCEPT_COLOR, zorder=6, alpha=ALPHA_VAL)
ax_right.plot(tB2, r2R[0], '^', ms=8, color=R2_COLOR,        zorder=6, alpha=ALPHA_VAL)

ax_right.tick_params(labelleft=False, left=False)
ax_left.set_xlim(0, BREAK_POINT)
ax_left.xaxis.set_major_locator(FixedLocator([0, 5, 10, 15, 20]))
ax_left.xaxis.set_minor_locator(MultipleLocator(1))
x_max = float(np.max(time_points)) * 1.02
ax_right.set_xlim(BREAK_POINT, x_max)
ticks_right = [20, 40, 60, 90, 120]
ax_right.xaxis.set_major_locator(FixedLocator(ticks_right))
ax_right.xaxis.set_minor_locator(MultipleLocator(10))

fig.subplots_adjust(wspace=0)
ax_left.axvline(6, color='gray', linestyle='--', lw=1.6, zorder=-4)

fig.supxlabel("Response time (s)", fontsize=24, y=-0.012)
ax_right.legend([line_s_left, line_i_left, line_r2_left],
                ['Slope', 'Intercept', r'$R^2$'],
                loc='lower right', fontsize=22, frameon=True)
plt.show()
