In [2]:
from typing import Optional, Iterable, Tuple, Union
from numbers import Integral, Real
from warnings import warn

import numpy as np
import pandas as pd
from scipy.sparse import issparse, csc_matrix, csr_matrix
from sklearn.mixture import GaussianMixture
from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression
from anndata import AnnData

In [9]:
def dsb_norm(
    protein_profile: Union[AnnData, pd.DataFrame],
    rna_profile: Union[AnnData, pd.DataFrame],
    pseudocount: Integral = 10,
    denoise_counts: bool = True,
    isotype_controls: Optional[Iterable[str]] = None,
    empty_counts_range: Tuple[Real, Real] = (0, 0.1),
    cell_counts_range: Tuple[Real, Real] = (0.1, 0.9),
    add_layer: bool = False,
    random_state: Optional[Union[int, np.random.RandomState, None]] = None,
    ) -> Union[AnnData, pd.DataFrame, None]:
    """
    Normalize the protein profile using dsb method
    Args:
        protein_profile: protein profile
        rna_profile: rna profile
        pseudocount: pseudocount to add to the protein profile
        denoise_counts: whether to denoise the protein profile
        isotype_controls: names of isotype controls. If None, it wont be used
        add_layer: whether to add the normalized protein profile to the adata object
        random_state: random seed
    Returns:
        normalized protein profile
    """
    # check the empty cells
    if max(*empty_counts_range) > min(*cell_counts_range):
        raise ValueError("empty_counts_range should be smaller than cell_counts_range")
    if protein_profile.shape[0] != rna_profile.shape[0]:
        raise ValueError("protein_profile and rna_profile should have the same number of cells")
    if type(protein_profile) == pd.DataFrame:
        protein_matrix = protein_profile.values
    if type(rna_profile) == pd.DataFrame:
        rna_matrix = rna_profile.values

    log10umi = np.log10(rna_matrix.sum(axis=1).squeeze() + 1)
    empty_idx = np.where(
        (log10umi >= min(*empty_counts_range)) & (log10umi <= max(*empty_counts_range))
    )[0]
    cell_idx = np.where(
        (log10umi >= min(*cell_counts_range)) & (log10umi <= max(*cell_counts_range))
    )[0]
    cellidx = protein_profile.index[cell_idx]
    empty = protein_profile.iloc[empty_idx, :]
    protein_profile = protein_profile.iloc[cell_idx, :]
    protein_matrix = protein_matrix[cell_idx, :]

    if pseudocount < 0:
        raise ValueError("pseudocount should be larger than 0")

    empty_scaled = np.log(empty + pseudocount)
    cells_scaled = np.log(protein_profile + pseudocount)
    cells_scaled = (cells_scaled - empty_scaled.mean(axis=0)) / empty_scaled.std(axis=0)

    if denoise_counts:
        background_means = np.empty(cells_scaled.shape[0], np.float32)
        # init_params needs to be random, otherwise fitted variance for one of the n_components somestimes is 0
        shared_var = GaussianMixture(n_components=2, covariance_type="tied", init_params="random", random_state=random_state)
        separate_var = GaussianMixture(n_components=2, covariance_type="full", init_params="random", random_state=random_state)
        for cell in range(cells_scaled.shape[0]):
            shared_var.fit(cells_scaled[cell, :, np.newaxis])
            separate_var.fit(cells_scaled[cell, :, np.newaxis])
            
            if shared_var.bic(cells_scaled[cell, :, np.newaxis]) < separate_var.bic(cells_scaled[cell, :, np.newaxis]):
                background_means[cell] = shared_var.means_[0]
            else:
                background_means[cell] = shared_var.means_[1]
        
        if isotype_controls is not None:
            pca = PCA(n_components=1, whiten=True)
            ctrl_idx = np.where(protein_profile.columns.isin(set(isotype_controls)))[0]
            if len(ctrl_idx) < len(isotype_controls):
                warn("Some isotype controls are not in the protein profile")
            covar = pca.fit_transform(np.hstack((cells_scaled[:, ctrl_idx], background_means.reshape(-1, 1))))
        else:
            covar = background_means[:, np.newaxis]
        
        reg = LinearRegression(fit_intercept=True, copy_X=False)
        reg.fit(covar, cells_scaled)

        cells_scaled -= reg.predict(covar) - reg.intercept_
    if add_layer:
        protein_profile.layers["dsb"] = cells_scaled
    else:
        protein_profile = cells_scaled
    return protein_profile

In [5]:
np.arange(0, 5, 0.1).reshape(2,-1)[1, :, np.newaxis].shape # extract one layer and flatten the array into a 1D array, each row is a 1D array
# np.arange(0, 5, 0.1).reshape(2,-1)
pd.DataFrame(np.arange(0, 5, 0.1).reshape(2,-1)) # get the number of non-zero elements in each row

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,15,16,17,18,19,20,21,22,23,24
0,0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,...,1.5,1.6,1.7,1.8,1.9,2.0,2.1,2.2,2.3,2.4
1,2.5,2.6,2.7,2.8,2.9,3.0,3.1,3.2,3.3,3.4,...,4.0,4.1,4.2,4.3,4.4,4.5,4.6,4.7,4.8,4.9


In [6]:
def clr_norm(
    protein_profile: Union[AnnData, pd.DataFrame],
    inplace: bool = False,
    axis: int = 0,
):
    '''
    Apply centered log-ratio transformation to the protein profile
    Args:
        protein_profile: protein profile
        inplace: whether to do the transformation inplace
        axis: axis to apply the transformation
    '''
    if axis not in [0, 1]:
        raise ValueError("axis should be 0 or 1")
    if not inplace:
        protein_profile = protein_profile.copy()
    if type(protein_profile) == AnnData:
        x = protein_profile.X
    else:
        x = protein_profile.values
    
    # x /= np.repeat(np.exp(np.log1p(x).sum(axis=axis).A / x.shape[axis]), x.getnnz)
    x = np.log1p(x / np.exp(np.log1p(x).sum(axis=axis, keepdims=True) / x.shape[axis]))

    if type(protein_profile) == AnnData:
        protein_profile.X = x

    return protein_profile