### 1D-CNN model for phosphorylation identification at the single molecule level

In [None]:

import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dropout, Dense, BatchNormalization
import numpy as np
import matplotlib.pyplot as plt
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import confusion_matrix, roc_curve
import seaborn as sns
from itertools import cycle


# -------------------- Utility Functions --------------------
def ensure_dir(directory):
    """Ensure the directory exists, create it if it does not."""
    if not os.path.exists(directory):
        os.makedirs(directory)

def plot_metrics(history, dataset_name, save_path):
    """Plot training loss and accuracy and save the figure."""
    ensure_dir(save_path)
    fig, ax1 = plt.subplots(figsize=(12, 6), dpi=800)
    
    # Increase font sizes
    font_size = 18
    tick_size = 16
    legend_size = 14

    # Plot Loss
    ax1.set_xlabel('Epoch', fontsize=font_size, fontweight='bold')
    ax1.set_ylabel('Loss', color='tab:blue', fontsize=font_size, fontweight='bold')
    ax1.plot(history.epoch, history.history['loss'], label='Train Loss', color='tab:blue', linestyle='-')
    ax1.tick_params(axis='y', labelcolor='tab:blue', labelsize=tick_size)
    ax1.tick_params(axis='x', labelsize=tick_size)

    # Plot Accuracy
    ax2 = ax1.twinx()
    ax2.set_ylabel('Accuracy', color='tab:green', fontsize=font_size, fontweight='bold')
    ax2.plot(history.epoch, history.history['accuracy'], label='Train Accuracy', color='tab:green', linestyle='--')
    ax2.tick_params(axis='y', labelcolor='tab:green', labelsize=tick_size)

    # Title
    plt.title(f'Training Loss and Accuracy - {dataset_name}', fontsize=font_size + 2, fontweight='bold')

    # Grid and Layout
    fig.tight_layout()
    plt.grid(False)

    # Save the Figure
    plt.savefig(os.path.join(save_path, f"{dataset_name}_training_metrics.png"))
    plt.close()


def plot_combined_roc(y_true_list, y_score_list, dataset_names, save_path):
    """Plot combined ROC curves and save the figure."""
    ensure_dir(save_path)
    plt.figure(figsize=(9, 7))

    # Increase font sizes
    font_size = 18
    tick_size = 16
    legend_size = 14

    colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'darkgreen', 'red', 'purple'])

    for y_true, y_score, dataset_name, color in zip(y_true_list, y_score_list, dataset_names, colors):
        fpr, tpr, _ = roc_curve(y_true, y_score)
        plt.plot(fpr, tpr, color=color, lw=2, label=f'{dataset_name}')

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    
    # Axis Labels
    plt.xlabel('False Positive Rate', fontsize=font_size, fontweight='bold')
    plt.ylabel('True Positive Rate', fontsize=font_size, fontweight='bold')
    
    # Title
    plt.title('Combined ROC Curve for All Datasets', fontsize=font_size + 2, fontweight='bold')
    
    # Legend
    plt.legend(loc="lower right", fontsize=legend_size)

    # Grid
    plt.grid(False)

    # Save the Figure
    plt.savefig(os.path.join(save_path, "combined_roc_curve.png"))
    plt.close()


def plot_confusion_matrix(true_labels, pred_labels, dataset_name, evaluation_type, class_names, save_path):
    """Plot and save confusion matrix."""
    ensure_dir(save_path)
    cm = confusion_matrix(true_labels, pred_labels)
    cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100  # Convert to percentages
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm_percentage, annot=True, fmt='.2f', cmap="Blues", cbar=False,
                xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted label')
    plt.ylabel('True label')
    plt.title(f'Confusion Matrix - {dataset_name} ({evaluation_type})')
    plt.tight_layout()
    plt.savefig(os.path.join(save_path, f"{dataset_name}_{evaluation_type}_confusion_matrix.png"))
    plt.close()


# -------------------- Data Loading --------------------
def load_spectrum_file(dir_path):
    """Load spectrum data from the given directory."""
    data = []
    label = []
    file_dir_list = os.listdir(dir_path)
    for file_dir in file_dir_list:
        file_list = os.listdir(os.path.join(dir_path, file_dir))
        for filename in file_list:
            file_path = os.path.join(dir_path, file_dir, filename)
            x, y = np.loadtxt(file_path, dtype=float, comments='#', delimiter='\t', unpack=True)
            data.append(y)
            label.append(file_dir)
    return np.array(data), label


# -------------------- Model Training and Evaluation --------------------
def run_analysis(train_path, post_eval_path, dataset_name, save_path):
    """Train model, evaluate on validation and post-evaluation sets, and save results."""
    print(f"\nLoading dataset from: {train_path}")
    data, label = load_spectrum_file(train_path)
    data = data.reshape((data.shape[0], data.shape[1], 1))

    le = LabelEncoder()
    int_labels = le.fit_transform(label)
    labels = int_labels
    class_names = le.classes_

    train_data, val_data, train_label, val_label = train_test_split(
        data, labels, train_size=0.7, test_size=0.3, stratify=labels, shuffle=True
    )

    # Define Model
    model = Sequential()
    model.add(tf.keras.layers.InputLayer(input_shape=(data.shape[1], 1)))
    conv_layers = [16,32,64]
    reg_strength = 1e-4

    for cl in conv_layers:
        model.add(Conv1D(cl, 3, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)))
        model.add(BatchNormalization())
        model.add(Conv1D(cl, 3, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)))
        model.add(BatchNormalization())
        model.add(MaxPooling1D(2))
        model.add(Dropout(0.3))

    model.add(Flatten())
    model.add(Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)))
    model.add(BatchNormalization())
    model.add(Dropout(0.5))
    model.add(Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(reg_strength)))
    model.add(BatchNormalization())
    model.add(Dropout(0.5))
    model.add(Dense(1, activation='sigmoid'))

    model.compile(
        optimizer=Adam(learning_rate=1e-3),
        loss='binary_crossentropy',
        metrics=['accuracy', Precision(), Recall(), AUC(name='auc')]
    )

    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_auc', patience=10, mode='max', restore_best_weights=True, verbose=2
    )

    history = model.fit(
        train_data, train_label,
        validation_data=(val_data, val_label),
        batch_size=16,
        epochs=50,
        callbacks=[early_stopping],
        verbose=1
    )

    plot_metrics(history, dataset_name, save_path)

    print("\nEvaluating on validation set...")
    val_pred_probs = model.predict(val_data, batch_size=32).ravel()
    val_pred_classes = (val_pred_probs > 0.5).astype(int)
    val_true_classes = val_label

    plot_confusion_matrix(val_true_classes, val_pred_classes, dataset_name, "Validation", class_names, save_path)

    print(f"\nLoading post-evaluation dataset from: {post_eval_path}")
    post_data, post_label = load_spectrum_file(post_eval_path)
    post_data = post_data.reshape((post_data.shape[0], post_data.shape[1], 1))
    post_label_encoded = le.transform(post_label)

    print("\nEvaluating on post-evaluation set...")
    post_pred_probs = model.predict(post_data, batch_size=32).ravel()
    post_pred_classes = (post_pred_probs > 0.5).astype(int)

    plot_confusion_matrix(post_label_encoded, post_pred_classes, dataset_name, "Post-Evaluation", class_names, save_path)

    return val_true_classes, val_pred_probs, post_label_encoded, post_pred_probs


# -------------------- Run and Save Results --------------------
save_path = ""  # Directory to save all generated figures

datasets = [
    {"train_path": "",
     "post_eval_path": "",
     "name": "Cit-free"},
    {"train_path": "",
     "post_eval_path": "",
     "name": "Cit-affected"}
]

val_labels_list = []
val_probs_list = []
post_labels_list = []
post_probs_list = []
dataset_names = []

for dataset in datasets:
    val_label, val_probs, post_label, post_probs = run_analysis(
        dataset["train_path"], dataset["post_eval_path"], dataset["name"], save_path
    )
    val_labels_list.append(val_label)
    val_probs_list.append(val_probs)
    post_labels_list.append(post_label)
    post_probs_list.append(post_probs)
    dataset_names.append(dataset["name"] + " (Validation)")
    dataset_names.append(dataset["name"] + " (Post-Evaluation)")

# Save combined ROC curve
plot_combined_roc(val_labels_list + post_labels_list, val_probs_list + post_probs_list, dataset_names, save_path)
model_save_path =""

# Save the model
model.save(model_save_path)
print(f"Model saved successfully at {model_save_path}")

### 1D-Gradient Feature extractor

In [None]:
import os, numpy as np, tensorflow as tf, matplotlib.pyplot as plt
from scipy.signal import find_peaks

# ────────── paths ──────────
DATA_DIR   = r"" #enter the directory of the dataset
MODEL_PATH = r"" # directory of model

# ────────── helper functions ───────────────────────────────────────────
def load_spectra_and_labels(root):
    data, labels, raman = [], [], None
    for cls in sorted(os.listdir(root)):
        sub = os.path.join(root, cls)
        if not os.path.isdir(sub): continue
        for fn in os.listdir(sub):
            if fn.lower().endswith('.txt'):
                x, y = np.loadtxt(os.path.join(sub, fn), delimiter='\t', unpack=True)
                if raman is None: raman = x
                data.append(y)
                labels.append(cls)
    return np.vstack(data), np.array(labels), raman

def gradient_saliency_x_input(model, X, positive=True):
    """
    positive=True  ⇒ returns | ∂p/∂X  * X |     (p = P(pSer))
    positive=False ⇒ returns | ∂(1−p)/∂X * X |
    """
    Xtf = tf.convert_to_tensor(X, tf.float32)
    with tf.GradientTape() as tape:
        tape.watch(Xtf)
        p = model(Xtf)[:,0]                     
        score =      p if positive else (1.0 - p)
    grads = tape.gradient(score, Xtf).numpy().squeeze()  
    X_np  = X.squeeze()                                   
    sal   = np.abs(grads * X_np)                          # elementwise
    return sal


def peak_frequency(X, prom=0.4, h=0.5):
    Xf = X.reshape(X.shape[0], -1)
    n, m = Xf.shape
    present = np.zeros((n, m), bool)
    for i in range(n):
        y = (Xf[i] - Xf[i].min()) / Xf[i].ptp()
        pk, _ = find_peaks(y, prominence=prom, height=h)
        present[i, pk] = True
    return present.sum(0) / n

# ────────── main ───────────────────────────────────────────────────────
model       = tf.keras.models.load_model(MODEL_PATH)
data, labels, raman = load_spectra_and_labels(DATA_DIR)
X           = data[..., None]               
y           = (labels == "pSer Cit-affected").astype(int)

G_ser  = gradient_saliency_x_input(model, X[y==0], positive=False)
G_pser = gradient_saliency_x_input(model, X[y==1], positive=True)

ser_w  = G_ser.mean(0)
pser_w = G_pser.mean(0)
ser_w /= ser_w.max()
pser_w /= pser_w.max()


# per-class peak frequencies
freq_ser  = peak_frequency(X[y==0])
freq_pser = peak_frequency(X[y==1])

# ────────── plot ───────────────────────────────────────────────────────
fig, (ax1, ax2) = plt.subplots(2,1, sharex=True, figsize=(12,6), dpi=200,
                               gridspec_kw={'hspace':0.05})
w = raman[1] - raman[0]

# Ser
ax1.plot(raman, ser_w,  color='C0', lw=1.6)
ax1.set_ylabel("Normalized\nFeature Weight", color='C0', fontsize=12, fontweight='bold')  # Add newline for wrapping
ax1.tick_params(axis='y', labelcolor='C0')

# Adjust y-axis limits for ax1 to add space
y_min_ax1, y_max_ax1 = ax1.get_ylim() 
ax1.set_ylim(y_min_ax1, y_max_ax1 * 1.1)  

ax1b = ax1.twinx()
ax1b.bar(raman, freq_ser, width=w, color='0.25', alpha=0.5)
ax1b.set_ylabel("Peak Occurrence\nFrequency", color='0.25', fontsize=12, fontweight='bold')  # Add newline for wrapping
ax1b.tick_params(axis='y', labelcolor='0.25')

# Adjust y-axis limits for ax1b to add space if necessary
y_min_ax1b, y_max_ax1b = ax1b.get_ylim()  # Get current y-axis limits for twin axis
ax1b.set_ylim(y_min_ax1b, y_max_ax1b * 1.1)  # Increase the upper limit by 10%

# pSer
ax2.plot(raman, pser_w, color='C3', lw=1.6)
ax2.set_ylabel("Normalized\nFeature Weight", color='C3', fontsize=12, fontweight='bold')  # Add newline for wrapping
ax2.tick_params(axis='y', labelcolor='C3')

# Adjust y-axis limits for ax2 to add space
y_min_ax2, y_max_ax2 = ax2.get_ylim()  # Get current y-axis limits
ax2.set_ylim(y_min_ax2, y_max_ax2 * 1.1)  # Increase the upper limit by 10%

ax2b = ax2.twinx()
ax2b.bar(raman, freq_pser, width=w, color='0.25', alpha=0.5)
ax2b.set_ylabel("Peak Occurrence\nFrequency", color='0.25', fontsize=12, fontweight='bold')  # Add newline for wrapping
ax2b.tick_params(axis='y', labelcolor='0.25')

# Adjust y-axis limits for ax2b to add space if necessary
y_min_ax2b, y_max_ax2b = ax2b.get_ylim()  # Get current y-axis limits for twin axis
ax2b.set_ylim(y_min_ax2b, y_max_ax2b * 1.1)  # Increase the upper limit by 10%

ax2.set_xlabel("Raman Shift (cm$^{-1}$)", fontsize=12, fontweight='bold')
ax2.set_xlim(raman[0], raman[-1])
plt.tight_layout()
plt.show()

out_dir = r"C" # out directory
os.makedirs(out_dir, exist_ok=True)

# save high-res figure
fname = os.path.join(out_dir, '')
fig.savefig(fname, dpi=300, format='png', bbox_inches='tight')