In [1]:
!pip install anndata scanpy



In [2]:
import pandas as pd
import numpy as np
import sklearn
import scipy

import anndata as ad
import scanpy as sc

from dask import delayed
from dask.distributed import Client, LocalCluster

import os, binascii

In [3]:
! aws s3 cp s3://saturn-kaggle-datasets/open-problems-single-cell-perturbations-optional/train_or_control_bulk_by_cell_type_adata.h5ad --no-sign-request .
! aws s3 cp s3://saturn-kaggle-datasets/open-problems-single-cell-perturbations-optional/lincs_id_compound_mapping.parquet --no-sign-request .

download: s3://saturn-kaggle-datasets/open-problems-single-cell-perturbations-optional/train_or_control_bulk_by_cell_type_adata.h5ad to ./train_or_control_bulk_by_cell_type_adata.h5ad
download: s3://saturn-kaggle-datasets/open-problems-single-cell-perturbations-optional/lincs_id_compound_mapping.parquet to ./lincs_id_compound_mapping.parquet


## Loading expression data

Here we load expression data (long format) and converting it into an AnnData object (wide sparse format).

You'll need to increase your instance RAM to at least 64 GB.

In [4]:
data_dir = '.'
adata_train_df = pd.read_parquet(os.path.join(data_dir, 'adata_train.parquet'))
adata_obs_meta_df = pd.read_csv(os.path.join(data_dir, 'adata_obs_meta.csv'))

adata_train_df['obs_id'] = adata_train_df['obs_id'].astype('category')
adata_train_df['gene'] = adata_train_df['gene'].astype('category')

obs_ids = adata_train_df['obs_id'].unique()
obs_id_map = dict(zip(obs_ids, range(len(obs_ids))))

genes = adata_train_df['gene'].unique()
gene_map = dict(zip(genes, range(len(genes))))

adata_train_df['obs_index'] = adata_train_df['obs_id'].map(obs_id_map)
adata_train_df['gene_index'] = adata_train_df['gene'].map(gene_map)

normalized_counts_values = adata_train_df['normalized_count'].to_numpy()
counts_values = adata_train_df['count'].to_numpy()

row_indices = adata_train_df['obs_index'].to_numpy()
col_indices = adata_train_df['gene_index'].to_numpy()

counts = scipy.sparse.csr_matrix((counts_values, (row_indices, col_indices)))

obs_df = pd.Series(obs_ids, name='obs_id').to_frame()
var_df = pd.Series(genes, name='gene').to_frame()

obs_df = obs_df.set_index('obs_id')
var_df = var_df.set_index('gene')

obs_df.index = obs_df.index.astype('str')
var_df.index = var_df.index.astype('str')

counts_adata = ad.AnnData(
    X=counts,
    obs=obs_df,
    var=var_df,
    dtype=np.uint32,
)

index_ordering_before_join = counts_adata.obs.index
counts_adata.obs = counts_adata.obs.join(adata_obs_meta_df.set_index('obs_id'))
index_ordering_after_join = counts_adata.obs.index
assert (index_ordering_before_join == index_ordering_after_join).all()



## Pseudobulking counts by cell type

In [5]:
from scipy import sparse

def sum_by(adata: ad.AnnData, col: str) -> ad.AnnData:
    """
    Adapted from this forum post: 
    https://discourse.scverse.org/t/group-sum-rows-based-on-jobs-feature/371/4
    """
    
    assert pd.api.types.is_categorical_dtype(adata.obs[col])

    # sum `.X` entries for each unique value in `col`
    cat = adata.obs[col].values
    indicator = sparse.coo_matrix(
        (
            np.broadcast_to(True, adata.n_obs),
            (cat.codes, np.arange(adata.n_obs))
        ),
        shape=(len(cat.categories), adata.n_obs),
    )
    sum_adata = ad.AnnData(
        indicator @ adata.X,
        var=adata.var,
        obs=pd.DataFrame(index=cat.categories),
        dtype=adata.X.dtype,
    )
    
    # copy over `.obs` values that have a one-to-one-mapping with `.obs[col]`
    obs_cols = adata.obs.columns
    obs_cols = list(set(adata.obs.columns) - set([col]))
    
    one_to_one_mapped_obs_cols = []
    nunique_in_col = adata.obs[col].nunique()
    for other_col in obs_cols:
        if len(adata.obs[[col, other_col]].drop_duplicates()) == nunique_in_col:
            one_to_one_mapped_obs_cols.append(other_col)

    joining_df = adata.obs[[col] + one_to_one_mapped_obs_cols].drop_duplicates().set_index(col)
    assert (sum_adata.obs.index == sum_adata.obs.join(joining_df).index).all()
    sum_adata.obs = sum_adata.obs.join(joining_df)
    sum_adata.obs.index.name = col
    sum_adata.obs = sum_adata.obs.reset_index()
    sum_adata.obs.index = sum_adata.obs.index.astype('str')

    return sum_adata

In [6]:
counts_adata.obs['plate_well_cell_type'] = counts_adata.obs['plate_name'].astype('str') \
    + '_' + counts_adata.obs['well'].astype('str') \
    + '_' + counts_adata.obs['cell_type'].astype('str')
counts_adata.obs['plate_well_cell_type'] = counts_adata.obs['plate_well_cell_type'].astype('category')

bulk_adata = sum_by(counts_adata, 'plate_well_cell_type')
bulk_adata.obs = bulk_adata.obs.drop(columns=['plate_well_cell_type'])
bulk_adata.X = np.array(bulk_adata.X.todense())
bulk_adata.X = bulk_adata.X.astype('float64')
bulk_adata = bulk_adata.copy()



In [7]:
plate_reordering = {
    'plate_0': 'plate_1',
    'plate_1': 'plate_2',
    'plate_2': 'plate_3',
    'plate_3': 'plate_0',
    'plate_4': 'plate_4',
    'plate_5': 'plate_5',
}

## Loading pseudobulked counts from correctly filtered AnnData (@Daniel: delete this section once the fixed RNA expression AnnData is uploaded to Kaggle)

In [8]:
fixed_bulk_adata = sc.read_h5ad('train_or_control_bulk_by_cell_type_adata.h5ad')
fixed_bulk_adata.X = fixed_bulk_adata.layers['counts']

In [9]:
new_plate_names = bulk_adata.obs['plate_name'].sort_values().unique()
original_plate_names = fixed_bulk_adata.obs['plate_name'].sort_values().unique()
plate_name_map = dict(zip(original_plate_names, new_plate_names))

fixed_bulk_adata.obs['plate_name'] = fixed_bulk_adata.obs['plate_name'].map(plate_name_map)
fixed_bulk_adata.obs['plate_name'] = fixed_bulk_adata.obs['plate_name'].map(plate_reordering)

In [10]:
new_donor_ids = bulk_adata.obs['donor_id'].sort_values().unique()
original_donor_ids = fixed_bulk_adata.obs['raw_cell_id'].sort_values().unique()
donor_id_map = dict(zip(original_donor_ids, new_donor_ids))

fixed_bulk_adata.obs['donor_id'] = fixed_bulk_adata.obs['raw_cell_id'].map(donor_id_map)

In [11]:
lincs_id_mapping_df = pd.read_parquet('lincs_id_compound_mapping.parquet')

In [12]:
compound_id_to_sm_lincs_id = lincs_id_mapping_df.set_index('compound_id')['sm_lincs_id'].to_dict()
compound_id_to_sm_name = lincs_id_mapping_df.set_index('compound_id')['sm_name'].to_dict()
compound_id_to_smiles = lincs_id_mapping_df.set_index('compound_id')['smiles'].to_dict()

fixed_bulk_adata.obs['sm_lincs_id'] = \
    fixed_bulk_adata.obs['compound_id'].map(compound_id_to_sm_lincs_id)
fixed_bulk_adata.obs['sm_name'] = \
    fixed_bulk_adata.obs['compound_id'].map(compound_id_to_sm_name)
fixed_bulk_adata.obs['SMILES'] = \
    fixed_bulk_adata.obs['compound_id'].map(compound_id_to_smiles)

In [13]:
sorted_index = fixed_bulk_adata.obs.sort_values(['plate_name', 'sm_name', 'cell_type']).index
fixed_bulk_adata = fixed_bulk_adata[sorted_index].copy()

In [14]:
sorted_index = bulk_adata.obs.sort_values(['plate_name', 'sm_name', 'cell_type']).index
bulk_adata = bulk_adata[sorted_index].copy()

If the new expression data is uploaded to Kaggle (or placed in `'/home/jovyan/kaggle/input/open-problems-multimodal-2023'` in the correct format), the assertion below should evaluate to True.

In [15]:
bulk_adata.obs['plate_name'] = bulk_adata.obs['plate_name'].map(plate_reordering)

In [16]:
# assert np.allclose(fixed_bulk_adata.X, bulk_adata.X)

In [17]:
bulk_adata = fixed_bulk_adata.copy()

## Running Limma

In [18]:
# bulk_adata.obs['plate_name'] = bulk_adata.obs['plate_name'].map(plate_reordering)

In [30]:
de_pert_cols = [
    'sm_name',
    'sm_lincs_id',
    'SMILES',
    'dose_uM',
    'timepoint_hr',
    'cell_type',
]

control_compound = 'Dimethyl Sulfoxide'

In [31]:
import limma_utils
import imp
imp.reload(limma_utils)


<module 'limma_utils' from '/home/jovyan/workspace/limma_utils.py'>

In [32]:
!mkdir -p output

In [36]:
def _run_limma_for_cell_type(bulk_adata, cell_type):
    
    import limma_utils
    bulk_adata = bulk_adata.copy()
    
    compound_name_col = de_pert_cols[0]
    
    # limma doesn't like dashes etc. in the compound names
    rpert_mapping = bulk_adata.obs[compound_name_col].drop_duplicates() \
        .reset_index(drop=True).reset_index() \
        .set_index(compound_name_col)['index'].to_dict()
    
    bulk_adata.obs['Rpert'] = bulk_adata.obs.apply(
        lambda row: rpert_mapping[row[compound_name_col]], 
        axis='columns',
    ).astype('str')

    compound_name_to_Rpert = bulk_adata.obs.set_index(compound_name_col)['Rpert'].to_dict()
    ref_pert = compound_name_to_Rpert[control_compound]
            
    random_string = binascii.b2a_hex(os.urandom(15)).decode()
    
    
    limma_utils.limma_fit(
        bulk_adata, 
        design='~0+Rpert+donor_id+plate_name+row',
        y_output_path=f'output/{cell_type}_y.rds',
        output_path=f'output/{cell_type}_lmfit.rds',
        # plot_output_path=f'output/{cell_type}_voom',
        exec_path='limma_fit.r',
        verbose=True,
    )

    pert_de_dfs = []

    for pert in bulk_adata.obs['Rpert'].unique():
        if pert == ref_pert:
            continue

        pert_de_df = limma_utils.limma_contrast(
            fit_path=f'output/{cell_type}_lmfit.rds',
            contrast='Rpert'+pert+'-Rpert'+ref_pert,
            exec_path='limma_contrast.r',
        )

        pert_de_df['Rpert'] = pert

        pert_obs = bulk_adata.obs[bulk_adata.obs['Rpert'].eq(pert)]
        for col in de_pert_cols:
            pert_de_df[col] = pert_obs[col].unique()[0]
        pert_de_dfs.append(pert_de_df)

    de_df = pd.concat(pert_de_dfs, axis=0)
    de_df.to_csv(f'{cell_type}_contrasts.tsv', sep='\t')
    

run_limma_for_cell_type = delayed(_run_limma_for_cell_type)

In [37]:
%%capture

cluster = LocalCluster(
    n_workers=6,
    processes=True,
    threads_per_worker=1,
    memory_limit='20GB',
)

c = Client(cluster)

In [38]:
%%capture

cell_types = bulk_adata.obs['cell_type'].unique()
de_dfs = []

for cell_type in cell_types:
    cell_type_selection = bulk_adata.obs['cell_type'].eq(cell_type)
    cell_type_bulk_adata = bulk_adata[cell_type_selection].copy()
    
    cell_type_bulk_adata.obs.to_csv(f'meta-{cell_type}.tsv', sep='\t')
    
    de_df = run_limma_for_cell_type(cell_type_bulk_adata, cell_type)
    
    de_dfs.append(de_df)

de_dfs = c.compute(de_dfs, sync=True)

Key:       _run_limma_for_cell_type-f2e6e721-ba40-4817-ba1b-f1d8418f1378
Function:  _run_limma_for_cell_type
args:      (AnnData object with n_obs × n_vars = 507 × 18211
    obs: 'bio_sample_id', 'cell_type', 'raw_cell_id', 'bio_exp_id', 'raw_plate_name', 'compound_id', 'cell_id', 'plate_name', 'dose_uM', 'pert', 'clt_batch_id', 'hashtag_id', 'library_id', 'compound_batch_id', 'cy_id', 'container_format', 'row', 'cy_batch_id', 'compound_name', 'timepoint_hr', 'clt_id', 'col', 'well', 'plate_id', 'readable_pert', 'split', 'donor_id', 'sm_lincs_id', 'sm_name', 'SMILES'
    uns: 'log1p'
    layers: 'counts', 'T cells CD8+')
kwargs:    {}
Exception: "RuntimeError('Error in limma \\n Error message: Error in readRDS(opt$input_fit) : error reading from connection\\nExecution halted\\n \\n \\n stdout: output/T cells CD8+_lmfit.rdsRpert105-Rpert22/home/jovyan/.tmp/limma/aljk7auh/contrast_result.csv')"

Key:       _run_limma_for_cell_type-aa7a3d5c-d188-413d-a3b1-4fb93edb92f3
Function:  _run_limm