In [1]:
import scanpy as sc
import muon as mu
import numpy as np
import pandas as pd
import mofax as mofa
import seaborn as sns
import matplotlib.pyplot as plt
import pyro
from pyro.nn import PyroSample, PyroModule
from pyro.infer import SVI, Trace_ELBO, autoguide
import torch
import torch.nn.functional as F
from torch.nn.functional import softplus
from sklearn.metrics import mean_squared_error
import random
import seaborn as sns
import muon as mu
import anndata

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def to_device(t): return torch.tensor(t).to(device)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
dir="/scratch/deeplife/"
pbmc = sc.read_10x_h5(dir+"5k_pbmc_protein_v3_nextgem_filtered_feature_bc_matrix.h5", gex_only=False)
pbmc.var_names_make_unique()
pbmc.layers["counts"] = pbmc.X.copy()

  utils.warn_names_duplicates("var")


In [3]:
protein = pbmc[:, pbmc.var["feature_types"] == "Antibody Capture"].copy()
rna = pbmc[:, pbmc.var["feature_types"] == "Gene Expression"].copy()


In [4]:
class FA(PyroModule):
    def __init__(self, Y, K):
        """
        Args:
            Y: Tensor (Samples x Features)
            K: Number of Latent Factors
        """
        super().__init__()
        pyro.clear_param_store()
        
        # data
        self.obs_mask = torch.logical_not(torch.isnan(Y))
        self.Y = torch.nan_to_num(Y, nan=0)
        self.K = K
        self.batch_size = 64
        
        self.num_samples = self.Y.shape[0]
        self.num_features = self.Y.shape[1]
        
        self.sample_plate = pyro.plate("sample", self.num_samples, subsample_size=self.batch_size)
        self.feature_plate = pyro.plate("feature", self.num_features)
        self.latent_factor_plate = pyro.plate("latent factors", self.K)
        
        
    def model(self):
        """
        how to generate a matrix
        """
        with self.latent_factor_plate:
            with self.feature_plate:
                # sample weight matrix with Normal prior distribution
                W = pyro.sample("W", pyro.distributions.Normal(0., to_device(1.0)))                
                
            with self.sample_plate:
                # sample factor matrix with Normal prior distribution
                Z = pyro.sample("Z", pyro.distributions.Normal(0., to_device(1.0)))
        
        # estimate for Y
        Y_hat = torch.matmul(Z, W.t())
        # print(Z.shape, W.shape, self.Y.shape, Y_hat.shape)
        
        with pyro.plate("feature_", self.Y.shape[1]), pyro.plate("sample_", self.Y.shape[0], subsample_size=self.batch_size) as sample_shit:
            # print(sample_shit)
            # masking the NA values such that they are not considered in the distributions
            with pyro.poutine.mask(mask=self.obs_mask[sample_shit, :]):
                # a valid value for the NAs has to be defined even though these samples will be ignored later
        
                # sample scale parameter for each feature-sample pair with LogNormal prior (has to be positive)
                scale = pyro.sample("scale", pyro.distributions.LogNormal(0., to_device(1.0)))
                # compare sampled estimation to the true observation Y
                pyro.sample("obs", pyro.distributions.Normal(Y_hat, scale), obs=self.Y[sample_shit, :])


    def train(self):
        # set training parameters
        optimizer = pyro.optim.Adam({"lr": 0.02})
        elbo = Trace_ELBO()
        guide = autoguide.AutoNormal(self.model)
        
        # initialize stochastic variational inference
        svi = SVI(
            model = self.model,
            guide = guide,
            optim = optimizer,
            loss = elbo
        )
        
        num_iterations = 4000
        train_loss = []
        for j in range(num_iterations):
            # calculate the loss and take a gradient step
            loss = svi.step()

            train_loss.append(loss/self.Y.shape[0])
            if j % 200 == 0:
                print("[iteration %04d] loss: %.4f" % (j + 1, loss / self.Y.shape[0]))
        
        # Obtain maximum a posteriori estimates for W and Z
        map_estimates = guide(self.Y)
        
        return train_loss, map_estimates, guide


In [None]:
factor_model = FA(Y = torch.tensor(rna.X.toarray()).to(device), K = 5)
loss, map_estimates, trained_guide = factor_model.train()



[iteration 0001] loss: 275462.8760




[iteration 0201] loss: 129847.1281
[iteration 0401] loss: 91448.8441
[iteration 0601] loss: 67855.4114
[iteration 0801] loss: 50201.4829
[iteration 1001] loss: 39054.9678
[iteration 1201] loss: 32418.0226
[iteration 1401] loss: 30279.5930
[iteration 1601] loss: 30611.5712
[iteration 1801] loss: 31037.8897
[iteration 2001] loss: 32382.3728
[iteration 2201] loss: 33839.2646
[iteration 2401] loss: 37202.1101


In [None]:
print({ k: v.shape for k, v in map_estimates.items() })
print(trained_guide(factor_model.Y))

In [None]:
print(pyro.get_param_store().get_all_param_names())
pyro.get_param_store().get_param("AutoNormal.locs.W")