# GRN between Cell Types

This Notebook is part of the Bachelorthese from David Wild.

The goal of this notebook is to find out gene-gene interactions between different cell types. We want as a result a GRN that not only describes which genes express which, but also how much (weighted graph) and in which direction (exhibiting, inhibiting).  
Another aspect that would be interesting are finding co-factors.


In [None]:
!git clone https://github.com/DavidWild02/BachelorThesis-ML-Gene-Interactions.git
%cd BachelorThesis-ML-Gene-Interactions/
!pip install scanpy
!pip install torch
!pip install matplotlib
!pip install pandas
!pip install numpy

Cloning into 'BachelorThesis-ML-Gene-Interactions'...
fatal: unable to access 'https://https://github.com/DavidWild02/BachelorThesis-ML-Gene-Interactions.git/': Could not resolve host: https
[Errno 2] No such file or directory: 'BachelorThesis-ML-Gene-Interactions/'
/content
Collecting scanpy
  Downloading scanpy-1.11.1-py3-none-any.whl.metadata (9.9 kB)
Collecting anndata>=0.8 (from scanpy)
  Downloading anndata-0.11.4-py3-none-any.whl.metadata (9.3 kB)
Collecting legacy-api-wrap>=1.4 (from scanpy)
  Downloading legacy_api_wrap-1.4.1-py3-none-any.whl.metadata (2.1 kB)
Collecting scikit-learn<1.6.0,>=1.1 (from scanpy)
  Downloading scikit_learn-1.5.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Collecting session-info2 (from scanpy)
  Downloading session_info2-0.1.2-py3-none-any.whl.metadata (2.5 kB)
Collecting array-api-compat!=1.5,>1.4 (from anndata>=0.8->scanpy)
  Downloading array_api_compat-1.11.2-py3-none-any.whl.metadata (1.9 kB)
Downloading scanpy

In [None]:
import scanpy as sc
import pandas as pd
import numpy as np

adata = sc.read_h5ad("./data/subdom_processed.h5ad")
adata

In [None]:
adata.to_df().head()

In [None]:
sc.pl.umap(adata, color="clusters")

In [None]:
sc.pl.umap(adata, color="ct_pseudotime")

In the data set the data is already preprocessed, clustered and also the pseudo-time was calculated on it. The pseudo-time is a time approximation, measured on how specified the cell is in terms of how many genes are expressed. The idea is that more specialized cells have fewer expressed genes.

We already have a GRN, but it is global, we would prefer a single GRN for each cell-type to cell-type transition. Also we want a weighted GRN and the GRN at the moment is a binary matrix.
One method to construct such GRNs is by just using a Masked Linear Regression. The weight matrix of this linear regression would then be the adjacency matrix of the GRN.

To look if the approach is feasible at all, we will first only do that for stem cells (cluster 7) to transition cells (cluster 0).  
After that we validate how well prediction is (accuracy, residual plot) and how well the GRN correlates to existing knowledge.
Also how non linear models like Neural networks perform in comparison.

In [None]:
from IPython.display import display

#  load data global GRN
df_tf_data = pd.read_csv("./data/transcription_factor_adjacency_matrix.csv", index_col="gene_ids")
print("transcription factor adjacency matrix")
display(df_tf_data.head())

#  the tf_data set only contains data for a subset of the genes. So create a new dataframe for all genes and copy values to it
df_grn = pd.DataFrame(
    np.zeros((adata.n_vars, adata.n_vars), dtype=np.uint8),
    index=adata.var_names, columns=adata.var_names
)
df_grn.loc[df_tf_data.index, df_tf_data.columns] = df_tf_data
print("GRN adjacency matrix")
display(df_grn.head())

grn_matrix = df_grn.to_numpy()

# Sparsity gives  a measure on how empty the matrix is. (How many zero entries)
sparsity = grn_matrix.sum() / grn_matrix.size
print("Sparsity of the matrix: ", sparsity)
print("Count non zero entries of the matrix: ", grn_matrix.sum())

The Linear Regression should take samples from cluster A and predict samples in cluster B. The mappings a->b are sampled randomly.  
Of course this approach could be enhanced with other techniques, that search for a better mapping. Like Optimal Transport for example.

In [None]:
from typing import Iterator, Tuple
from torch.utils.data import Sampler

np.random.seed(42) # set seed to make reproducible outputs

# TODO: let this inherit from torch.Sampler
class RandomMappingSampler(Sampler):
    def __init__(self, data_A: np.ndarray, data_B: np.ndarray):
        self._data_A = data_A
        self._data_B = data_B
        super().__init__()

    def __iter__(self) -> Iterator[Tuple[int, int]]:
        while True:
            index_A = np.random.randint(0, self._data_A.shape[0])
            index_B = np.random.randint(0, self._data_B.shape[0])
            # a sampler should just return the indices and not the samples
            yield index_A, index_B



In [None]:
from torch import nn
import torch

class SparseMaskedLinearRegression(nn.Module):
    def __init__(self, input_dim: int, output_dim: int, mask: torch.Tensor):
        super().__init__()

        assert (mask.shape == (input_dim, output_dim))

        self.input_dim = input_dim
        self.output_dim = output_dim

        sparse_mask = mask.to_sparse()
        weight_values_shape = sparse_mask.values().shape
        weight_values = nn.Parameter(torch.rand(weight_values_shape))
        self.register_buffer("mask_indices", sparse_mask.indices())
        self.register_parameter("weight_values", weight_values)

    def forward(self, x: torch.Tensor):
        sparse_weight = self.get_weight_matrix()
        out = x @ sparse_weight.T
        return out

    def get_weight_matrix(self) -> torch.Tensor:
        return torch.sparse_coo_tensor(self.get_buffer("mask_indices"), self.get_parameter("weight_values"), (self.input_dim, self.output_dim))

In [None]:
from itertools import islice


def train_masked_linear_regression(X: torch.Tensor, y: torch.Tensor, mask: torch.Tensor, epochs=1000, learning_rate=0.01, ridge_lambda=0.001) -> torch.Tensor:
    input_dim = X.shape[1]
    output_dim = y.shape[1]
    model = SparseMaskedLinearRegression(input_dim, output_dim, mask)
    model.train()

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        optimizer.zero_grad()
        y_pred = model(X)

        mse_loss = criterion(y_pred, y)
        ridge_loss = ridge_lambda * model.get_parameter("weight_values").norm(1)
        loss = mse_loss + ridge_loss
        print(f"Epoch {epoch}, Train Loss: {loss.item():.4f}")

        loss.backward()
        optimizer.step()

    return model.get_weight_matrix()



def create_grn_for_cluster_to_cluster_association(samples_cluster_a: np.ndarray, samples_cluster_b: np.ndarray, mask_grn: torch.Tensor, num_samples=1000):
    sampler = RandomMappingSampler(samples_cluster_a, samples_cluster_b)
    sample_indices = list(islice(iter(sampler), num_samples))
    sample_indices_a = list(map(lambda x: x[0], sample_indices))
    sample_indices_b = list(map(lambda x: x[1], sample_indices))

    X = torch.Tensor(samples_cluster_a[sample_indices_a])
    y = torch.Tensor(samples_cluster_b[sample_indices_b])

    W = train_masked_linear_regression(X, y, mask_grn)
    return W


In [None]:
stem_cells_cluster_id = '7'
transition_cells_cluster_id = '0'

stem_cells_cluster = adata.X[adata.obs["clusters"].values == stem_cells_cluster_id, :]
transition_cell_cluster = adata.X[adata.obs["clusters"].values == transition_cells_cluster_id, :]

print(f"Number of stem cells: {stem_cells_cluster.shape[0]}")
print(f"Number of transition cells: {transition_cell_cluster.shape[0]}")

In [None]:

grn_mask = torch.Tensor(grn_matrix)
grn = create_grn_for_cluster_to_cluster_association(stem_cells_cluster, transition_cell_cluster, grn_mask)
grn_df = pd.DataFrame(grn, indices=adata.var_names, columns=adata.var_names)
grn_df.head()