In [None]:
import random
from pathlib import Path

import anndata as ad
import numpy as np
import pandas as pd
from PIL import Image
import scanpy as sc
import scipy as sp
import SpaGCN as spg
import torch


# disable DecompressionBombWarning
Image.MAX_IMAGE_PIXELS = None

In [None]:
data_dir = Path("./data")
result_dir = Path("./results")

technology = "Visium"
seed = 42

alpha = 1
beta = 49
p = 0.5

In [None]:
metadata = pd.read_table(data_dir / "samples.tsv").loc[:, ["directory", "n_clusters"]]

In [None]:
# Set seed
random.seed(seed)
torch.manual_seed(seed)
np.random.seed(seed)

In [None]:
def get_anndata(path):
    X = sp.io.mmread(path / "counts.mtx").tocsr()

    observations = pd.read_table(path / "observations.tsv", index_col=0)
    features = pd.read_table(path / "features.tsv", index_col=0)

    coordinates = (
        pd.read_table(path / "coordinates.tsv", index_col=0)
        .loc[observations.index, :]
        .to_numpy()
    )

    adata = ad.AnnData(
        X=X, obs=observations, var=features, obsm={"spatial_pixel": coordinates}
    )

    adata.uns["image"] = np.array(Image.open(path / "H_E.tiff"))

    return adata

In [None]:
for _, sample in metadata.iterrows():
    print("Processing " + sample.directory)

    sample_dir = data_dir / sample.directory
    out_dir = result_dir / sample.directory
    adata = get_anndata(sample_dir)

    adj = spg.calculate_adj_matrix(
        adata.obs["col"],
        adata.obs["row"],
        adata.obsm["spatial_pixel"][:, 0],
        adata.obsm["spatial_pixel"][:, 1],
        image=adata.uns["image"],
        alpha=alpha,
        beta=beta,
        histology=True,
    )

    spg.prefilter_genes(adata, min_cells=3)
    spg.prefilter_specialgenes(adata)
    adata.X = adata.X.astype("float64")
    sc.pp.normalize_per_cell(adata)
    sc.pp.log1p(adata)

    clf = spg.SpaGCN()

    # Find the l value given p
    l = spg.search_l(p, adj)
    clf.set_l(l)

    # Search for suitable resolution
    res = spg.search_res(
        adata, adj, l, sample.n_clusters, r_seed=seed, t_seed=seed, n_seed=seed
    )

    clf.train(adata, adj, res=res)

    y_pred, prob = clf.predict()

    adata.obs["cluster"] = pd.Series(y_pred, index=adata.obs_names, dtype="category")

    # Do cluster refinement(optional)
    adj_2d = spg.calculate_adj_matrix(
        adata.obs["col"], adata.obs["row"], histology=False
    )

    refined_pred = spg.refine(
        sample_id=adata.obs_names.tolist(),
        pred=adata.obs["cluster"].tolist(),
        dis=adj_2d,
        shape="hexagon",
    )

    adata.obs["refined_cluster"] = pd.Series(
        refined_pred, index=adata.obs_names, dtype="category"
    )

    # label_df = adata.obs[["cluster"]]
    label_df_ref = adata.obs[["refined_cluster"]]

    ## Write output
    out_dir.mkdir(parents=True, exist_ok=True)

    label_df.columns = ["label"]
    label_df.to_csv(out_dir / "SpaGCN.tsv", sep="\t", index_label="")