In [1]:
import scanpy as sc
import os,csv,re
import pandas as pd
import numpy as np
import math
from scipy.sparse import issparse
import random, torch
import warnings
warnings.filterwarnings("ignore")
import matplotlib.colors as clr
import matplotlib.pyplot as plt
import cv2

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
adata = sc.read("sample_data.h5ad")

In [3]:
adj_matrix = pd.read_csv("adj.csv")

# Calculate the node degree (sum of connections per node)
node_degree = np.sum(adj_matrix, axis=1)

# Calculate the mean connection strength
mean_connection_strength = np.mean(adj_matrix, axis=1)


# Add these features to the obs (observations) of AnnData
adata.obs['node_degree'] = node_degree
adata.obs['mean_connection_strength'] = mean_connection_strength

from sklearn.preprocessing import StandardScaler

# Create a scaler object
scaler = StandardScaler()

# Scale the newly added features
adata.obs['node_degree_scaled'] = scaler.fit_transform(adata.obs[['node_degree']])
adata.obs['mean_connection_strength_scaled'] = scaler.fit_transform(adata.obs[['mean_connection_strength']])


In [4]:
from PIL import Image
import numpy as np
img = Image.open("histology.tif")
img_array = np.array(img)

s = 1
b = 49

# Function to extract color using coordinates, now includes averaging over an area
def extract_color(x_pixels, y_pixels, image_array, beta, area_size=10):
    colors = []
    half_area = area_size // 2
    for x, y in zip(x_pixels, y_pixels):
        # Define area bounds
        x_min = max(x - half_area, 0)
        y_min = max(y - half_area, 0)
        x_max = min(x + half_area, image_array.shape[1])
        y_max = min(y + half_area, image_array.shape[0])
        # Extract and average color in the area
        area = image_array[y_min:y_max, x_min:x_max]
        area_average = np.mean(area, axis=(0, 1))
        adjusted_color = area_average * beta / 255
        colors.append(np.mean(adjusted_color))
    return np.array(colors)

# Apply color extraction
x_pixel = np.array(adata.obs["x4"].tolist())
y_pixel = np.array(adata.obs["x5"].tolist())
adata.obs["color"] = extract_color(x_pixel, y_pixel, img_array, b)

z_scale = np.std(adata.obs["color"]) * s

# Apply normalization and scaling to the 'color' attribute to derive 'z'
adata.obs["z"] = (adata.obs["color"] - np.mean(adata.obs["color"])) / np.std(adata.obs["color"]) * z_scale

# Clean up by deleting loaded images if no longer needed
del img, img_array

In [5]:
from anndata import AnnData
adata.obs["x_pixel"]=x_pixel
adata.obs["y_pixel"]=y_pixel

In [6]:
adata.var_names_make_unique()
sc.pp.normalize_per_cell(adata, min_counts=0)
sc.pp.log1p(adata)

In [7]:
adata

AnnData object with n_obs × n_vars = 3639 × 33538
    obs: 'x1', 'x2', 'x3', 'x4', 'x5', 'x_array', 'y_array', 'x_pixel', 'y_pixel', 'node_degree', 'mean_connection_strength', 'node_degree_scaled', 'mean_connection_strength_scaled', 'color', 'z', 'n_counts'
    var: 'gene_ids', 'feature_types', 'genome', 'genename'
    uns: 'log1p'

In [8]:
# from dhg import Hypergraph
# from sklearn.neighbors import NearestNeighbors

# # X is your feature matrix derived from spatial coordinates and any additional features
# X = np.array([adata.obs["x_pixel"], adata.obs["y_pixel"], adata.obs["z"]]).T.astype(np.float32)
# nbrs = NearestNeighbors(n_neighbors=10, n_jobs=-1).fit(X)  # 10 or 5
# edges = nbrs.kneighbors_graph(X, mode='connectivity').toarray()

# # Convert the adjacency matrix to list of hyperedges
# hyperedges = [list(np.where(row)[0]) for row in edges if np.any(row)]

# # Number of vertices is the number of rows in your data
# num_vertices = X.shape[0]

# # Create the Hypergraph using the hyperedges
# G = Hypergraph(num_vertices, hyperedges)


In [9]:
import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
import networkx as nx
import matplotlib.pyplot as plt
from dhg import Hypergraph

# Load and preprocess data
selected_features = ["x_pixel", "y_pixel", "z", "color", "node_degree_scaled", "mean_connection_strength_scaled"]  # Spatial coordinates and additional features
X = np.array([adata.obs[f] for f in selected_features]).T.astype(np.float32)

# Example: Fill NaNs with the mean of each column
from sklearn.impute import SimpleImputer

imputer = SimpleImputer(strategy='mean')
X_imputed = imputer.fit_transform(X)

# Now scale the data
X_scaled = scaler.fit_transform(X_imputed)


# Constructing a spatial graph based on Euclidean distances
nbrs = NearestNeighbors(n_neighbors=15, metric='euclidean', n_jobs=-1).fit(X_scaled)
edges = nbrs.kneighbors_graph(X_scaled, mode='connectivity').toarray()

def create_weighted_hyperedges(edges):
    """
    Create hyperedges with weights based on the inverse of Euclidean distances,
    simulating a stronger connection between closer nodes.
    """
    hyperedges = []
    weights = []
    for i in range(edges.shape[0]):
        if np.any(edges[i]):
            center = X_scaled[i]
            neighbors = np.where(edges[i])[0]
            distances = np.linalg.norm(X_scaled[neighbors] - center, axis=1)
            weights.append(1 / (1 + distances))  # Inverse distance, avoiding division by zero
            hyperedges.append(list(neighbors))
    return hyperedges, weights

# Generate hyperedges with corresponding weights
hyperedges, weights = create_weighted_hyperedges(edges)

# Initialize the hypergraph
num_vertices = X_scaled.shape[0]
G = Hypergraph(num_vertices, hyperedges)

In [10]:
# import numpy as np
# from sklearn.neighbors import NearestNeighbors
# from sklearn.preprocessing import StandardScaler
# from typing import List, Tuple  # For type hinting
# import networkx as nx
# import matplotlib.pyplot as plt

# # Function to select features based on type (optional)
# def get_features_by_type(adata: AnnData, feature_type: str) -> List[str]:
#   """
#   This function retrieves features from obs based on their type in var.

#   Args:
#       adata: An AnnData object.
#       feature_type: The desired feature type (e.g., "continuous", "categorical").

#   Returns:
#       A list of feature names that match the specified type.
#   """
#   return [f for f in adata.obs.keys() if adata.var["feature_types"][f] == feature_type]

# # Feature selection (replace with your desired features)
# selected_features = ["x_pixel", "y_pixel", "z", "color", "node_degree_scaled", "mean_connection_strength_scaled"]

# # Feature pre-processing (consider feature type and normalization techniques)
# X = np.array([adata.obs[f] for f in selected_features]).T.astype(np.float32)

# from sklearn.impute import SimpleImputer

# imputer = SimpleImputer(strategy='mean')
# X_imputed = imputer.fit_transform(X)

# # Now scale the data
# X_scaled = scaler.fit_transform(X_imputed)

# # Hypergraph construction using NetworkX for efficiency
# def create_hypergraph(X_scaled: np.ndarray, n_neighbors: int) -> Tuple[nx.Graph, List[List[int]], List[float]]:
#   """
#   This function constructs a hypergraph using NetworkX.

#   Args:
#       X_scaled: The preprocessed feature matrix.
#       n_neighbors: The number of nearest neighbors for each data point.

#   Returns:
#       A tuple containing the NetworkX graph object, a list of hyperedges, and a list of weights.
#   """
#   G = nx.Graph()
#   G.add_nodes_from(range(X_scaled.shape[0]))  # Add nodes (data points)

#   # Create hyperedges with optional weighting
#   hyperedges = []
#   weights = []
#   for i in range(X_scaled.shape[0]):
#     neighbors = nbrs.kneighbors(X_scaled[i].reshape(1, -1))[1][0]  # Get nearest neighbors
#     if np.any(neighbors):
#       center_index = i
#       neighbor_list = neighbors.tolist()
#       G.add_edge(center_index, *neighbor_list)  # Add hyperedge using unpacking
#       # Optional weighting (consider alternative weighting functions)
#       distances = np.linalg.norm(X_scaled[neighbors] - X_scaled[center_index], axis=1)
#       weights.append(1 / (1 + distances))  # Inverse distance weighting

#   return G, hyperedges, weights

# # Construct the hypergraph
# nbrs = NearestNeighbors(n_neighbors=15, metric='euclidean', n_jobs=-1).fit(X_scaled)
# G, hyperedges, weights = create_hypergraph(X_scaled, n_neighbors=15)

# # Hypergraph analysis using NetworkX or other libraries
# # ... (Your analysis code here)


In [11]:
import torch

# Convert the sparse matrix to a dense array first
dense_features = adata.X.toarray()  # Converts the sparse matrix to a dense numpy array
features = torch.tensor(dense_features, dtype=torch.float32)


In [12]:
import torch
from torch import nn
from dhg.models import HGNNP
import torch.optim as optim
import torch.nn.functional as F


class HGNN_Autoencoder(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(HGNN_Autoencoder, self).__init__()
        # Define encoder using nn.ModuleDict
        self.encoder = nn.ModuleDict({
            'hgnnp1': HGNNP(input_dim, hidden_dim, hidden_dim, use_bn=True),
            'relu': nn.ReLU(),
            'dropout': nn.Dropout(0.5),
            'hgnnp2': HGNNP(hidden_dim, hidden_dim, output_dim, use_bn=True)
        })
        # Define decoder using nn.Sequential
        self.decoder = nn.Sequential(
            nn.Linear(output_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(hidden_dim, input_dim)
        )

    def forward(self, x, G):
        x = self.encode(x, G)  # Use the new encode method
        x = self.decoder(x)
        return x

    def encode(self, x, G):
        # Explicitly define the forwarding through encoder layers
        for layer in ['hgnnp1', 'relu', 'dropout', 'hgnnp2']:
            if 'hgnnp' in layer:
                x = self.encoder[layer](x, G)
            else:
                x = self.encoder[layer](x)
        return x

    
# Parameters
input_dim = features.shape[1]
hidden_dim = 32
output_dim = 32  # Could be tuned based on the complexity of the data

# Initialize the model
model = HGNN_Autoencoder(input_dim, hidden_dim, output_dim)
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4) # 0.1 is close to the spaGCN


In [13]:
def train(model, features, G, optimizer, epochs):
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(features, G)
        loss = F.mse_loss(outputs, features)  # Mean Squared Error loss for reconstruction
        loss.backward()
        optimizer.step()
        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item():.4f}")

train(model, features, G, optimizer, 200)


Epoch 0, Loss: 0.1048
Epoch 10, Loss: 0.0595
Epoch 20, Loss: 0.0563
Epoch 30, Loss: 0.0553
Epoch 40, Loss: 0.0549
Epoch 50, Loss: 0.0550
Epoch 60, Loss: 0.0550
Epoch 70, Loss: 0.0549
Epoch 80, Loss: 0.0548
Epoch 90, Loss: 0.0547
Epoch 100, Loss: 0.0545
Epoch 110, Loss: 0.0540
Epoch 120, Loss: 0.0510
Epoch 130, Loss: 0.0375
Epoch 140, Loss: 0.0355
Epoch 150, Loss: 0.0346
Epoch 160, Loss: 0.0344
Epoch 170, Loss: 0.0340
Epoch 180, Loss: 0.0335
Epoch 190, Loss: 0.0333


In [14]:
model.eval()
with torch.no_grad():
    encoded_features = model.encode(features, G)  # Get the encoded features using the new method

from sklearn.cluster import KMeans

kmeans = KMeans(n_clusters=10, random_state=0)  # Adjust the number of clusters as needed
cluster_labels = kmeans.fit_predict(encoded_features.cpu().numpy())


In [15]:
from sklearn.metrics import silhouette_score
from sklearn.cluster import KMeans
labels = kmeans.labels_
score = silhouette_score(encoded_features, cluster_labels)
print("Silhouette Score: ", score)

Silhouette Score:  0.56419754


In [16]:
colors_use = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#bcbd22', '#17becf', 
              '#aec7e8', '#ffbb78', '#98df8a', '#ff9896', '#bec1d4', '#bb7784', '#0000ff', '#111010', '#FFFF00', 
              '#1f77b4', '#800080', '#959595', '#7d87b9', '#bec1d4', '#d6bcc0', '#bb7784', '#8e063b', '#4a6fe3', 
              '#8595e1', '#b5bbe3', '#e6afb9', '#e07b91', '#d33f6a', '#11c638', '#8dd593', '#c6dec7', '#ead3c6', 
              '#f0b98d', '#ef9708', '#0fcfc0', '#9cded6', '#d5eae7', '#f3e1eb', '#f6c4e1', '#f79cd4']
# Visualization or further analysis can be done using `cluster_labels`
adata.obs["pred"] = pd.Categorical(cluster_labels)
num_celltype = len(adata.obs['pred'].cat.categories)
adata.uns['pred_colors'] = colors_use[:num_celltype]

# Plotting
ax = sc.pl.scatter(adata, alpha=1, x='y_pixel', y='x_pixel', color='pred', title='Spatial Clustering',
                   palette=colors_use[:num_celltype], show=False, size=150000 / adata.shape[0])  # Adjust size as needed
ax.set_aspect('equal', 'box')
ax.axes.invert_yaxis()

# Saving the plot
plt.savefig("results/Another/multi_sections_domains_adjacency_2.png", dpi=600)
plt.close()
