In [1]:
import scanpy as sc
import torch
from torch import nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, TensorDataset
import lightning.pytorch as pl
from sklearn.preprocessing import LabelEncoder
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, f1_score, classification_report
import numpy as np
import pandas as pd
import os
import re
import copy

# Paths for datasets
train_data_dir = '../../dataset/tabula_sapiens_train_set_mapped.h5ad'
val_data_dir = '../../dataset/tabula_sapiens_val_set_mapped.h5ad'
test_data_dir = '../../dataset/tabula_sapiens_test_set_mapped.h5ad'

# Load the datasets (no change in loading)
adata_train = sc.read_h5ad(train_data_dir)
adata_val = sc.read_h5ad(val_data_dir)
adata_test = sc.read_h5ad(test_data_dir)


In [2]:
sc.pp.normalize_total(adata_train, target_sum=1e4)
sc.pp.log1p(adata_train)

sc.pp.normalize_total(adata_val, target_sum=1e4)
sc.pp.log1p(adata_val)

sc.pp.normalize_total(adata_test, target_sum=1e4)
sc.pp.log1p(adata_test)


In [3]:
# Step 1: Take the union of all unique labels across the three datasets
all_labels = np.concatenate([
    adata_train.obs['cell_type'].values, 
    adata_val.obs['cell_type'].values, 
    adata_test.obs['cell_type'].values
])

# Step 2: Fit LabelEncoder on the combined labels
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

# Directly use the 'cell_type' column, assuming it is already encoded as int64
X_train = adata_train.X.toarray()
y_train = label_encoder.transform(adata_train.obs['cell_type'])

X_val = adata_val.X
y_val = label_encoder.transform(adata_val.obs['cell_type'])

X_test = adata_test.X
y_test = label_encoder.transform(adata_test.obs['cell_type'])

In [4]:
import numpy as np
from sklearn.decomposition import PCA

n_components = 64
pca = PCA(n_components=n_components)
train_embeddings = pca.fit_transform(X_train)
val_embeddings = pca.transform(X_val)
test_embeddings = pca.transform(X_test)

In [5]:
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, f1_score, classification_report


    # 初始化和训练KNN分类器
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(val_embeddings, y_val)
    
    # 模型预测
predictions = knn.predict(test_embeddings)

In [6]:
unique_classes = np.unique(np.concatenate([y_test, predictions]))


accuracy = accuracy_score(y_test, predictions)  
f1 = f1_score(y_test, predictions, average='weighted')  
macro_f1 = f1_score(y_test, predictions, average='macro')  

print(f"KNN Accuracy: {accuracy}")  
print(f"Weighted F1 Score: {f1}")  
print(f"Macro F1 Score: {macro_f1}")  

# Get the class names for only the classes present in the data  
present_classes = [label_encoder.classes_[i] for i in unique_classes]  
report = classification_report(y_test, predictions,   
                             labels=unique_classes,  # specify which labels to include  
                             target_names=present_classes)  # their corresponding names  
print(report)  

# Optionally, print which class is missing  
all_classes_set = set(range(len(label_encoder.classes_)))  
present_classes_set = set(unique_classes)  
missing_classes = all_classes_set - present_classes_set  
if missing_classes:  
    print("\nMissing class indices:", missing_classes)  
    print("Missing class names:", [label_encoder.classes_[i] for i in missing_classes])
    
random_seed = 42

import numpy as np
import os
import json

# Create directory to store embeddings and predictions
output_dir = os.path.join('./prediction_results', f'PCA_seed_{random_seed}')
os.makedirs(output_dir, exist_ok=True)

# Save embeddings
np.save(os.path.join(output_dir, 'train_embeddings.npy'), train_embeddings)
np.save(os.path.join(output_dir, 'val_embeddings.npy'), val_embeddings) 
np.save(os.path.join(output_dir, 'test_embeddings.npy'), test_embeddings)

# Save predictions and ground truth
np.save(os.path.join(output_dir, 'test_predictions.npy'), predictions)
np.save(os.path.join(output_dir, 'test_ground_truth.npy'), y_test)
np.save(os.path.join(output_dir, 'train_ground_truth.npy'), y_train)
np.save(os.path.join(output_dir, 'val_ground_truth.npy'), y_val)

# Save training history if exists
if 'train_losses' in globals() and 'val_losses' in globals():
    np.save(os.path.join(output_dir, 'train_losses.npy'), np.array(train_losses))
    np.save(os.path.join(output_dir, 'val_losses.npy'), np.array(val_losses))

# Save label encoder classes (target names)
label_mapping = {i: label_name for i, label_name in enumerate(label_encoder.classes_)}
with open(os.path.join(output_dir, 'label_mapping.json'), 'w') as f:
    json.dump(label_mapping, f, indent=4)

print(f"Saved embeddings, predictions and label mapping to {output_dir}")

KNN Accuracy: 0.7870209663982227
Weighted F1 Score: 0.771187745907886
Macro F1 Score: 0.5663548548929224
                                                                            precision    recall  f1-score   support

                                                                    B cell       0.77      0.50      0.61       154
                     CD16-negative, CD56-bright natural killer cell, human       0.66      0.69      0.67       555
                                                CD4-positive helper T cell       0.72      0.78      0.75      5067
                                 CD4-positive, alpha-beta cytotoxic T cell       0.20      0.15      0.17       567
                                 CD8-positive, alpha-beta cytotoxic T cell       0.59      0.88      0.70      7122
                            central memory CD8-positive, alpha-beta T cell       0.66      0.30      0.42      2156
                                              class switched memory B cell       0

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Saved embeddings, predictions and label mapping to ./prediction_results/PCA_seed_42


In [7]:

import pandas as pd
import os
import re

# 当前 Notebook 文件名
notebook_name = "tabula_sapiens_PCA_42.ipynb"

# 初始化需要打印的值
init_train_loss = train_losses[0] if 'train_losses' in globals() else None
init_val_loss = val_losses[0] if 'val_losses' in globals() else None
converged_epoch = len(train_losses) - patience if 'train_losses' in globals() else None
converged_val_loss = best_val_loss if 'best_val_loss' in globals() else None

# 打印所有所需的指标
print("Metrics Summary:")
if 'train_losses' in globals():
    print(f"init_train_loss\tinit_val_loss\tconverged_epoch\tconverged_val_loss\tmacro_f1\tweighted_f1\tmicro_f1")
    print(f"{init_train_loss:.3f}\t{init_val_loss:.3f}\t{converged_epoch}\t{converged_val_loss:.3f}\t{macro_f1:.3f}\t{f1:.3f}\t{accuracy:.3f}")
else:
    print(f"macro_f1\tweighted_f1\tmicro_f1")
    print(f"{macro_f1:.3f}\t{f1:.3f}\t{accuracy:.3f}")

# 保存结果到 CSV 文件
output_data = {
    'dataset_split_random_seed': [int(random_seed)],
    'dataset': ['Tabula Sapiens'],
    'method': [re.search(r'tabula_sapiens_(.*?)_\d+', notebook_name).group(1)],
    'init_train_loss': [init_train_loss if init_train_loss is not None else ''],
    'init_val_loss': [init_val_loss if init_val_loss is not None else ''],
    'converged_epoch': [converged_epoch if converged_epoch is not None else ''],
    'converged_val_loss': [converged_val_loss if converged_val_loss is not None else ''],
    'macro_f1': [macro_f1],
    'weighted_f1': [f1],
    'micro_f1': [accuracy]
}
output_df = pd.DataFrame(output_data)

# 保存到当前目录下名为 results 的文件夹中
if not os.path.exists('results'):
    os.makedirs('results')

csv_filename = f"results/{os.path.splitext(notebook_name)[0]}_results.csv"
output_df.to_csv(csv_filename, index=False)


Metrics Summary:
macro_f1	weighted_f1	micro_f1
0.566	0.771	0.787
