In [11]:
import os
import gc
import json
from pathlib import Path

import numpy as np

import src.load as load
import jsbeautifier
from src._operations import group_by
from src.deconvolution import Deconvolution
from tqdm import tqdm

from scipy.stats import pearsonr, entropy
from scipy.spatial.distance import jensenshannon

from sklearn.model_selection import train_test_split as tts
from sklearn.utils.validation import column_or_1d

In [3]:
dataset = Path('IPF')
root = 'data' / dataset
os.makedirs(root, exist_ok=True)
os.makedirs(root / 'images/png', exist_ok=True)
os.makedirs(root / 'images/svg', exist_ok=True)

<h2>Load adata</h2>
Deconvolution works best on linear space so we do not log-transform the data.
`filter_genes`, `normalize` are set to True for all datasets.
`high_var` is set to True for HCA for run-time efficiency.

In [5]:
adata, key = load.dataset(
    str(dataset),
    filter_genes=False, # set to true for MC, HCA, false for IPF
    normalize=True,
    high_var=False, # set to true for HCA, false for MC, IPF
    log=False, # don't log for cibersort
    scale=False,
)
adata = load.remove_low_count_ct(adata, key, 50)

adata.shape=(96301, 4443)
Removed low count classes.
adata.shape=(96196, 4443)
33 label combinations.


Split the data into train and test. Form a signature matrix using train data and form a pseudo-mixture using test data.

In [6]:
def get_signature(adata, key, *, rs=42, verbose=False):
    x_train, x_test = tts(adata, random_state=rs, stratify=adata.obs[key], train_size=0.5)
    # Form the signature matrix by averaging per phenotype
    signature = np.asarray(group_by(x_train.X, x_train.obs[key]))
    mixture = np.array(x_test.X.mean(axis=0)).flatten()
    _, ground_truth = np.unique(x_test.obs[key], return_counts=True)
    ground_truth = ground_truth.astype(float) / x_test.shape[0]
    if verbose:
        print(f"Formed signature matrix with shape {signature.shape}")
        print(f"Max value in the signature matrix: {signature.max()}")
        print(f"Formed mixture with shape {mixture.shape}")
    del x_train
    del x_test
    gc.collect()
    return signature, mixture, ground_truth

Run deconvolution using nu_SVR (CIBERSORT).

In [7]:
def deconv(
        signature,
        mixture,
        *,
        dict_key,
        ground_truth,
        feature_selector=None,
        selected=None,
        json_path=None,
        report=None,
    ):
    """Run CIBERSORT using the given signature matrix
    and mixture by using only features returned by
    feature selector.
    """
    if selected is None:
        assert feature_selector is not None
        selected = feature_selector.get_support(indices=True)
    dv = Deconvolution(verbose=False)
    deconvolved = dv.fit_predict(
        signature.T[selected],
        column_or_1d(mixture)[selected])
    
    if report is None:
        report = {}
    report[dict_key] = {}
    report[dict_key]['n_features'] = len(selected)
    report[dict_key]['phenotypes'] = signature.shape[0]
    report[dict_key]['n_features_in'] = signature.shape[1]
    report[dict_key]['JS'] = jensenshannon(deconvolved, ground_truth)
    report[dict_key]['entropy'] = entropy(deconvolved, ground_truth)
    report[dict_key]['pearson'] = pearsonr(deconvolved, ground_truth)[0]
    report[dict_key]['deconvolution'] = deconvolved.tolist()
    if isinstance(selected, np.ndarray):
        selected = selected.tolist()
    report[dict_key]['solution'] = selected
    # Dump json
    options = jsbeautifier.default_options()
    options.indent_size = 4
    beau_report = jsbeautifier.beautify(json.dumps(report), options)

    if json_path is not None:
        with open(json_path, "w") as f:
            f.write(beau_report)
    
    return report

def load_rep(path_to_json):
    with open(path_to_json, "r") as f:
        __report = json.load(f)
    return __report

<h2>Read reports and run deconvolution</h2>

In [8]:
report_filename = 'report.json'
deconv_filename = 'deconv.json'

In [None]:
base_dirs = [
#     'GreedyCover',
#     'DT',
#     'TopDE',
#     'ReliefF',
    'scGeneFit',
#     'Fval',
#     'MI',
#     'mRMR',
#     'CrossEntropy',
]

# For every random seed
for rs in range(44, 47):
    signature, mixture, ground_truth = get_signature(adata, key, rs=rs)
    sub_root = 'data' / dataset / 'RS' / f'rs{rs}'
    bd = [sub_root / m for m in base_dirs]
    
    # For every method
    for rd in tqdm(bd):
        f_report = load_rep(rd / report_filename)

        report = {}
        # For every coverage factor
        for cov in tqdm(f_report):
            _ = deconv(
                signature,
                mixture,
                dict_key=cov,
                ground_truth=ground_truth,
                json_path=rd / deconv_filename,
                selected=f_report[cov]['solution'],
                report=report,
        )