## Colab Setup

Run this cell if you are on Google Colab (takes 30 sec)

In [None]:
# install our package
!git clone https://github.com/harrlol/gutinstinct.git > /dev/null 2>&1
%cd gutinstinct
!pip install -e . -q

# install some extras
!pip install spatialdata -q

# link google drive
from google.colab import drive
drive.mount('/content/drive')

# hugginface
from huggingface_hub import login
login()

# get good path
import sys
sys.path.append("/content/gutinstinct/src/")

/content/gutinstinct/gutinstinct
  Preparing metadata (setup.py) ... [?25l[?25hdone
Mounted at /content/drive


VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

## Imports

In [None]:
import os
import pandas as pd
import numpy as np
import os.path as osp
import spatialdata as sd
from skimage.measure import regionprops
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset
import timm
from timm.data import resolve_data_config
from timm.data.transforms_factory import create_transform

from config import DATA_PATH
from data import patch_john, patch_harry
from models import utils as model_utils
from eval import utils as eval_utils

## Main Pipeline

### Load data

In [None]:
sdata = sd.read_zarr(osp.join(DATA_PATH, "UC6_I.zarr/UC6_I.zarr"))

### Process data

In [None]:
dataset_patch_train, dataset_patch_val, dataset_patch_test = patch_harry.get_patches(sdata)

AttributeError: module 'data.patch_harry' has no attribute 'get_expression'

In [None]:
dataset_expression_train = model_utils.get_expression(sdata, dataset_patch_train)
dataset_expression_val = model_utils.get_expression(sdata, dataset_patch_val)
dataset_expression_test = model_utils.get_expression(sdata, dataset_patch_test)

### Extract features

In [None]:
train_loader = DataLoader(dataset_patch_train, batch_size=32, shuffle=True)
val_loader = DataLoader(dataset_patch_val, batch_size=32, shuffle=True)
test_loader = DataLoader(dataset_patch_test, batch_size=32, shuffle=True)

In [None]:
## instantiate a uni model
timm_kwargs = {
   'img_size': 224,
   'patch_size': 32,
   'depth': 24,
   'num_heads': 24,
   'init_values': 1e-5,
   'embed_dim': 1536,
   'mlp_ratio': 2.66667*2,
   'num_classes': 0,
   'no_embed_class': True,
   'mlp_layer': timm.layers.SwiGLUPacked,
   'act_layer': torch.nn.SiLU,
   'reg_tokens': 8,
   'dynamic_img_size': True
  }
model_uni = timm.create_model("hf-hub:MahmoodLab/UNI2-h", pretrained=True, **timm_kwargs)
model_uni = model_uni.to('cuda')
transform = create_transform(**resolve_data_config(model_uni.pretrained_cfg, model=model_uni))
model_uni.eval()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/587 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/2.73G [00:00<?, ?B/s]

VisionTransformer(
  (patch_embed): PatchEmbed(
    (proj): Conv2d(3, 1536, kernel_size=(32, 32), stride=(32, 32))
    (norm): Identity()
  )
  (pos_drop): Dropout(p=0.0, inplace=False)
  (patch_drop): Identity()
  (norm_pre): Identity()
  (blocks): Sequential(
    (0): Block(
      (norm1): LayerNorm((1536,), eps=1e-06, elementwise_affine=True)
      (attn): Attention(
        (qkv): Linear(in_features=1536, out_features=4608, bias=True)
        (q_norm): Identity()
        (k_norm): Identity()
        (attn_drop): Dropout(p=0.0, inplace=False)
        (proj): Linear(in_features=1536, out_features=1536, bias=True)
        (proj_drop): Dropout(p=0.0, inplace=False)
      )
      (ls1): LayerScale()
      (drop_path1): Identity()
      (norm2): LayerNorm((1536,), eps=1e-06, elementwise_affine=True)
      (mlp): GluMlp(
        (fc1): Linear(in_features=1536, out_features=8192, bias=True)
        (act): SiLU()
        (drop1): Dropout(p=0.0, inplace=False)
        (norm): Identity()
    

In [None]:
train_embeddings, train_cell_ids = model_utils.extract_features(train_loader, model_uni)
val_embeddings, val_cell_ids = model_utils.extract_features(val_loader, model_uni)
test_embeddings, test_cell_ids = model_utils.extract_features(test_loader, model_uni)

### Prepare data

In [None]:
train_dataset = TensorDataset(
    torch.tensor(train_embeddings, dtype=torch.float32),
    torch.tensor(dataset_expression_train, dtype=torch.float32)
)

val_dataset = TensorDataset(
    torch.tensor(val_embeddings, dtype=torch.float32),
    torch.tensor(dataset_expression_val, dtype=torch.float32)
)

test_dataset = TensorDataset(
    torch.tensor(test_embeddings, dtype=torch.float32),
    torch.tensor(dataset_expression_test, dtype=torch.float32)
)

# create dataloader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# fit fcn model
resnet_feature_dim = train_embeddings.shape[1]
model = nn.Sequential(
    nn.Linear(resnet_feature_dim, 512),
    nn.ReLU(),
    nn.Linear(512, 512),
    nn.ReLU(),
    nn.Linear(512, 460)
)
model.to('cuda')

# some setups
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
train_losses = []
val_losses = []

# es logic
best_r2 = float('inf')
patience = 5
counter = 0

for epoch in range(20):
    model.train()
    running_train_loss = 0.0
    for x_batch, y_batch in train_loader:
        x_batch, y_batch = x_batch.to('cuda'), y_batch.to('cuda')
        optimizer.zero_grad()
        y_pred = model(x_batch)
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        running_train_loss += loss.item()

    train_losses.append(running_train_loss / len(train_loader))

    # validation
    model.eval()
    running_val_loss = 0.0
    y_true_all, y_pred_all = [], []
    with torch.no_grad():
        for x_val, y_val in val_loader:
            x_val, y_val = x_val.to('cuda'), y_val.to('cuda')
            y_pred_val = model(x_val)
            val_loss = criterion(y_pred_val, y_val)
            running_val_loss += val_loss.item()

            y_true_all.append(y_val.cpu().numpy())
            y_pred_all.append(y_pred_val.cpu().numpy())
    val_losses.append(running_val_loss / len(val_loader))
    val_r2, val_pearson = compute_r2_pearson(y_true_all, y_pred_all)

    print(f"Epoch {epoch+1}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_losses[-1]:.4f}, Val R2: {val_r2:.4f}, Val Pearson: {val_pearson:.4f}")

    if val_r2 < best_r2:
        best_r2 = val_r2
        counter = 0
        best_model_state = model.state_dict()
    else:
        counter += 1
        if counter >= patience:
            print(f"Early stopping at epoch {epoch+1}")
            break

# save the best model
model.load_state_dict(best_model_state)
