In [2]:
from __future__ import annotations

import numpy as np
import pandas as pd
import tensorflow as tf
import typing as t

from pathlib import Path
from tensorflow import keras

2023-07-18 18:59:40.150639: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [39]:
from cdrpy.feat.encoders import PandasEncoder, DictEncoder
from cdrpy.models.dualgcn.util import normalize_adj
from cdrpy.util.types import PathLike
from cdrpy.util.io import read_pickled_dict

DegList = list[np.int32]
AdjList = list[list[int]]
ConvMolFeat = tuple[np.ndarray, DegList, AdjList]


def _process_drug_feature(
    feat_mat: np.ndarray, adj_lst: AdjList, max_atoms: int = 100
) -> tuple[np.ndarray, np.ndarray]:
    """Add padding and convert to `np.ndarray`.

    Parameters
    ----------
        feat_mat:
        adj_lst:
        max_atoms:

    Returns
    -------
        A tuple of (feature_matrix, adjacency_matrix).
    """
    n_atoms = len(adj_lst)
    assert feat_mat.shape[0] == n_atoms

    F = np.zeros((max_atoms, feat_mat.shape[-1]))
    F[: feat_mat.shape[0], :] = feat_mat
    A = np.zeros((max_atoms, max_atoms), dtype="float32")

    for i, nodes in enumerate(adj_lst):
        for n in nodes:
            A[i, int(n)] = 1

    assert np.allclose(A, A.T)

    A_vals = normalize_adj(A[:n_atoms, :n_atoms])
    A_pad = normalize_adj(A[n_atoms:, n_atoms:])

    A[:n_atoms, :n_atoms] = A_vals
    A[n_atoms:, n_atoms:] = A_pad

    return F, A


def load_drug_features(
    mol_path: PathLike | Path,
) -> tuple[DictEncoder, DictEncoder]:
    """Load drug chemical features."""
    drug_dict = read_pickled_dict(mol_path)
    drug_dict = {
        k: _process_drug_feature(F, A) for k, (F, _, A) in drug_dict.items()
    }
    drug_feat = {k: v[0].astype("float32") for k, v in drug_dict.items()}
    drug_adj = {k: v[1].astype("float32") for k, v in drug_dict.items()}
    return (
        DictEncoder(drug_feat, name="drug_feature_encoder"),
        DictEncoder(drug_adj, name="drug_adj_encoder"),
    )


def load_cell_features(
    exp_path: PathLike | Path,
    mut_path: PathLike | Path,
    methyl_path: PathLike | Path | None = None,
) -> tuple[PandasEncoder, DictEncoder, PandasEncoder | None]:
    """Load cell features for ScreenDL."""
    exp_mat = pd.read_csv(exp_path, index_col=0).astype("float32")
    mut_mat = pd.read_csv(mut_path, index_col=0).astype("int32")
    
    mut_dict = {}
    for cell_id in mut_mat.index:
        cell_mut = mut_mat.loc[cell_id].values.reshape(1, -1, 1)
        mut_dict[cell_id] = cell_mut

    methyl_enc = None
    if methyl_path is not None:
        methyl_mat = pd.read_csv(methyl_path, index_col=0).astype("float32")
        methyl_enc = PandasEncoder(methyl_mat, name="methyl_encoder")

    return (
        PandasEncoder(exp_mat, name="exp_encoder"),
        DictEncoder(mut_dict, name="mut_encoder"),
        methyl_enc,
    )

In [40]:
input_dir = Path("../../data/inputs/GDSCv2DepMap")
exp_path = input_dir / "DeepCDR/FeatureCellToExpression717CGCGenesTPMLogp1.csv"
mut_path = input_dir / "DeepCDR/FeatureCellToSomaticMutationsPositionEncoded716CGCGenesAll.csv"
mol_path = input_dir / "DrugToConvMolFeatures.pickle"

cell_exp_enc, cell_mut_enc, _ = load_cell_features(exp_path, mut_path)
drug_feat_enc, drug_adj_enc = load_drug_features(mol_path)

In [24]:
mut_mat = pd.read_csv(mut_path, index_col=0, nrows=10).astype("int32")

In [79]:
import keras.backend as K

from keras import layers
from cdrpy.layers.graph import GraphConvBlock


def _create_mut_subnetwork(mut_dim: int) -> keras.Sequential:
    """"""
    model = keras.Sequential(
        [
            layers.Input((1, mut_dim, 1), name="cell_mut_input"),
            layers.Conv2D(
                50,
                activation="tanh",
                kernel_size=(1, 700),
                strides=(1, 5),
                name="cell_mut_conv2d_1",
            ),
            layers.MaxPool2D(pool_size=(1, 5), name="cell_mut_mpool2d_1"),
            layers.Conv2D(
                30,
                activation="relu",
                kernel_size=(1, 5),
                strides=(1, 2),
                name="cell_mut_conv2d_2",
            ),
            layers.MaxPooling2D(pool_size=(1, 10), name="cell_mut_mpool2d_2"),
            layers.Flatten(name="cell_mut_flatten_1"),
            layers.Dense(100, activation="relu", name="cell_mut_dense_1"),
            layers.Dropout(0.1, name="cell_mut_dropout_1"),
        ],
        name="cell_mut_subnet",
    )

    return model


def _create_exp_subnetwork(
    exp_dim: int, exp_norm: layers.Normalization | None = None
) -> keras.Model:
    """Creates the gene expression subnetwork."""
    exp_input = layers.Input((exp_dim,), name="cell_exp_input")
    if exp_norm is not None:
        if not exp_norm.is_adapted:
            # FIXME: change this to a warning since you can still adapt later
            raise ValueError("requires adapted normalization layer...")
        exp_input = exp_norm(exp_input)
    x = layers.Dense(256, activation="tanh", name="cell_exp_dense_1")(exp_input)
    x = layers.BatchNormalization(name="cell_exp_bnorm_1")(x)
    x = layers.Dropout(0.1, name="cell_exp_dropout_1")(x)
    x = layers.Dense(100, activation="relu", name="cell_exp_dense_2")(x)

    model = keras.Model(inputs=exp_input, outputs=x, name="cell_exp_subnet")

    return model


def _create_methyl_subnetwork(methyl_dim: int) -> keras.Sequential:
    """Creates the methylation subnetwork."""
    # FIXME: add option for normalization layer since this is float data
    model = keras.Sequential(
        [
            layers.Input((methyl_dim,), name="cell_methyl_input"),
            layers.Dense(256, activation="tanh", name="cell_methyl_dense_1"),
            layers.BatchNormalization(name="cell_methyl_bnorm_1"),
            layers.Dropout(0.1, name="cell_methyl_dropout_1"),
            layers.Dense(100, activation="relu", name="cell_methyl_dense_2"),
        ],
        name="cell_methyl_subnet",
    )


def _create_cell_subnetwork(
    exp_dim: int,
    mut_dim: int,
    methyl_dim: int | None = None,
    exp_norm: layers.Normalization | None = None,
) -> keras.Model:
    """"""
    exp_subnet = _create_exp_subnetwork(exp_dim, exp_norm)
    mut_subnet = _create_mut_subnetwork(mut_dim)
    subnet_inputs = [exp_subnet.input, mut_subnet.input]
    subnet_outputs = [exp_subnet.output, mut_subnet.output]
    if methyl_dim is not None:
        methyl_subnet = _create_methyl_subnetwork(methyl_dim)
        subnet_inputs.append(methyl_subnet.input)
        subnet_outputs.append(methyl_subnet.output)
    output = layers.Concatenate(name="cell_subnet_concat")(subnet_outputs)

    return keras.Model(
        inputs=subnet_inputs, outputs=output, name="cell_subnet"
    )


def _create_drug_subnetwork(feat_dim: int) -> keras.Model:
    """Creates the drug subnetwork."""
    feat_input = layers.Input((None, feat_dim), name="drug_feat_input")
    adj_input = layers.Input((None, None), name="drug_adj_input")

    x = [feat_input, adj_input]
    x = GraphConvBlock(units=256, step_num=1, name="drug_gconv_1")(x)
    x = [x, adj_input]
    x = GraphConvBlock(units=256, step_num=1, name="drug_gconv_2")(x)
    x = [x, adj_input]
    x = GraphConvBlock(units=256, step_num=1, name="drug_gconv_3")(x)
    x = [x, adj_input]
    x = GraphConvBlock(units=100, step_num=1, name="drug_gconv_4")(x)
    x = layers.GlobalMaxPooling1D(name="drug_pool")(x)

    return keras.Model(
        inputs=[feat_input, adj_input], outputs=x, name="drug_subnet"
    )


def create_model(
    cell_exp_dim: int,
    cell_mut_dim: int,
    drug_feat_dim: int,
    cell_methyl_dim: int | None = None,
    cell_exp_norm: layers.Normalization | None = None,
) -> keras.Model:
    """Creates the DeepCDR model."""
    cell_subnet = _create_cell_subnetwork(
        cell_exp_dim, cell_mut_dim, cell_methyl_dim, cell_exp_norm
    )
    drug_subnet = _create_drug_subnetwork(drug_feat_dim)

    subnet_inputs = [*cell_subnet.input, *drug_subnet.input]
    subnet_outputs = [cell_subnet.output, drug_subnet.output]

    x = layers.Concatenate(name="shared_concat_1")(subnet_outputs)
    x = layers.Dense(300, activation="tanh", name="shared_dense_1")(x)
    x = layers.Dropout(0.1, name="shared_dropout_1")(x)
    x = layers.Lambda(lambda x: K.expand_dims(x, axis=-1))(x)
    x = layers.Lambda(lambda x: K.expand_dims(x, axis=1))(x)
    x = layers.Conv2D(
        30,
        kernel_size=(1, 150),
        strides=(1, 1),
        activation="relu",
        name="shared_conv2d_1",
    )(x)
    x = layers.MaxPooling2D(pool_size=(1, 2), name="shared_mpool2d_1")(x)
    x = layers.Conv2D(
        filters=10,
        kernel_size=(1, 5),
        strides=(1, 1),
        activation="relu",
        name="shared_conv2d_2",
    )(x)
    x = layers.MaxPooling2D(pool_size=(1, 3), name="shared_mpool2d_2")(x)
    x = layers.Conv2D(
        filters=5,
        kernel_size=(1, 5),
        strides=(1, 1),
        activation="relu",
        name="shared_conv2d_3",
    )(x)
    x = layers.MaxPooling2D(pool_size=(1, 3), name="shared_mpool2d_3")(x)
    x = layers.Dropout(0.1, name="shared_dropout_2")(x)
    x = layers.Flatten(name="shared_flat_1")(x)
    x = layers.Dropout(0.2, name="shared_dropout_3")(x)
    output = layers.Dense(1, name="output")(x)

    model = keras.Model(inputs=subnet_inputs, outputs=output, name="DeepCDR")

    return model

In [68]:
adapt_data = np.array(
    [[0.0, 7.0, 4.0], [2.0, 9.0, 6.0], [0.0, 7.0, 4.0], [2.0, 9.0, 6.0]],
    dtype="float32",
)
input_data = np.array([[1.0, 2.0, 3.0]], dtype="float32")
layer = layers.Normalization(axis=-1, invert=True)
layer.adapt(adapt_data)

In [77]:
exp_dim = cell_exp_enc.shape[-1]
mut_dim = cell_mut_enc.shape[1]
drug_dim = drug_feat_enc.shape[-1]



In [80]:
model = create_model(exp_dim, mut_dim, drug_dim)

In [82]:
model.summary()

Model: "DeepCDR"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 cell_mut_input (InputLayer  [(None, 1, 33290, 1)]        0         []                            
 )                                                                                                
                                                                                                  
 cell_mut_conv2d_1 (Conv2D)  (None, 1, 6519, 50)          35050     ['cell_mut_input[0][0]']      
                                                                                                  
 cell_mut_mpool2d_1 (MaxPoo  (None, 1, 1303, 50)          0         ['cell_mut_conv2d_1[0][0]']   
 ling2D)                                                                                          
                                                                                            

[<KerasTensor: shape=(None, 200) dtype=float32 (created by layer 'cell_subnet_concat')>,
 <KerasTensor: shape=(None, 100) dtype=float32 (created by layer 'drug_pool')>]

In [17]:
np.zeros((800, 1, 100, 1), dtype="float32")[0][0]

array([0.], dtype=float32)