In [None]:
# code autoreload
%load_ext autoreload
%autoreload 2
import os
import sys

import collections
import random
import math
import numpy.random as nr
import numpy as np
import pandas as pd

import joblib

import xarray as xr
import dask
import dask.dataframe as ddf
import dask.array as da
import zarr

# set default scheduler to threaded
dask.config.set(scheduler='threads')

import tqdm

import scanpy.api as sc
import anndata as ad

import matplotlib.pyplot as plt
import matplotlib

matplotlib.style.use('ggplot')

import seaborn as sns

import plotnine as pn

## init plotly
# from plotly.offline import iplot, init_notebook_mode
# init_notebook_mode(connected=True)
import plotly.io as pio
pio.renderers.default = 'iframe_connected'
import plotly.graph_objs as go
import plotly.express as px

import datashader as ds
import holoviews as hv
import holoviews.operation.datashader as hd
hd.shade.cmap=["lightblue", "darkblue"]
hv.extension("bokeh", "matplotlib")
import hvplot

import sklearn

from scipy import (
    stats as scistats,
    special as scispecial,
)

import tensorflow as tf
import keras as k


In [2]:
!env|grep -i cuda


In [None]:
from dask.cache import Cache
cache = Cache(8e9)  # Leverage eight gigabytes of memory
cache.register()

In [None]:
sys.path.append(os.path.expanduser("~/Projects/REP/rep"))
import rep.random as rnd

In [None]:
CACHE_DIR="/s/project/rep/cache/"
RAW_DATA_DIR="/s/project/rep/raw/"
PROCESSED_DATA_DIR="/s/project/rep/processed/"
MODEL_DIR=os.path.join(PROCESSED_DATA_DIR, "training_results")

In [None]:
CURRENT_MODEL_DIR=os.path.join(MODEL_DIR, "expr_multitask")

if not os.path.exists(CURRENT_MODEL_DIR):
    os.mkdir(CURRENT_MODEL_DIR)

In [None]:
xrds = xr.open_zarr(os.path.join(PROCESSED_DATA_DIR, "gtex/OUTRIDER/xarray_unstacked.zarr"))
xrds

In [None]:
stacked_xrds = xrds.stack(observations=["individual", "genes"])

In [None]:
blood = stacked_xrds.sel(subtissue="Whole_Blood")
blood

In [None]:
from typing import Union, Tuple, List
import numpy as np
import xarray as xr

def concat_by_axis(
    darrs: Union[List[xr.DataArray], Tuple[xr.DataArray]],
    dims: Union[List[str], Tuple[str]],
    axis: int = None,
    drop_coords=True,
    **kwargs
):
    """
    Concat arrays along some axis similar to `np.concatenate`. Automatically renames the dimensions to `dims`.
    Please note that this renaming happens by the axis position, therefore make sure to transpose all arrays
    to the correct dimension order.

    :param darrs: List or tuple of xr.DataArrays
    :param dims: The dimension names of the resulting array. Renames axes where necessary.
    :param axis: The axis which should be concatenated along
    :param kwargs: Additional arguments which will be passed to `xr.concat()`
    :return: Concatenated xr.DataArray with dimensions `dim`.
    """

    # Get depth of nested lists. Assumes `darrs` is correctly formatted as list of lists.
    if axis is None:
        axis = 0
        l = darrs
        # while l is a list or tuple and contains elements:
        while isinstance(l, List) or isinstance(l, Tuple) and l:
            # increase depth by one
            axis -= 1
            l = l[0]
        if axis == 0:
            raise ValueError("`darrs` has to be a (possibly nested) list or tuple of xr.DataArrays!")

    to_concat = list()
    for i, da in enumerate(darrs):
        # recursive call for nested arrays;
        # most inner call should have axis = -1,
        # most outer call should have axis = - depth_of_darrs
        if isinstance(da, list) or isinstance(da, tuple):
            da = concat_axis(da, dims=dims, axis=axis + 1, **kwargs)

        if not isinstance(da, xr.DataArray):
            raise ValueError("Input %d must be a xr.DataArray" % i)
        if len(da.dims) != len(dims):
            raise ValueError("Input %d must have the same number of dimensions as specified in the `dims` argument!" % i)

        # force-rename dimensions
        da = da.rename(dict(zip(da.dims, dims)))
        
        # remove coordinates
        if drop_coords:
            da = da.reset_coords(drop=True)

        to_concat.append(da)

    return xr.concat(to_concat, dim=dims[axis], **kwargs)

In [None]:
features = concat_by_axis([
    blood.cdf.expand_dims({"features": ["cdf"]}, axis=-1), 
    blood.padj.expand_dims({"features": ["padj"]}, axis=-1), 
    blood.hilo_padj.expand_dims({"features": ["hilo_padj"]}, axis=-1), 
    blood.missing.expand_dims({"features": ["missing"]}, axis=-1), 
], dims=("observations", "features"), fill_value=0., coords="minimal")
features = features.fillna(0)
features

In [None]:
stacked_xrds["c_features"] = features
stacked_xrds["cdf"] = stacked_xrds["cdf"].fillna(0)
stacked_xrds["normppf"] = stacked_xrds["normppf"].fillna(0)
stacked_xrds["padj"] = stacked_xrds["padj"].fillna(0)

In [None]:
stacked_xrds

In [None]:
pd.Series(blood.hilo_padj.values[~ blood.missing.values]).value_counts()

In [None]:
pd.Series(stacked_xrds.hilo_padj.values[~ stacked_xrds.missing.values]).value_counts()

In [None]:
# split data into test and train sets
testDelim = int(blood.dims["observations"] * 0.8)

train = stacked_xrds.isel(observations=slice(None, testDelim))
test  = stacked_xrds.isel(observations=slice(testDelim, None))

print("train:")
print(train)
print("test:")
print(test)



In [None]:
n_input_features = train.dims["features"]
n_input_features

In [None]:
n_targets = train.dims["subtissue"]
n_targets

In [None]:
model = k.Sequential([
#     k.layers.Dense(units=20, activation="relu", input_shape=(n_input_features,)),
    k.layers.Dense(units=n_targets, activation="linear", input_shape=(n_input_features,)),
])

model.summary()

In [None]:
model.compile(
    optimizer='adam',
    loss="mse",
    metrics=["mse", "mae", "mape", 'cosine'],
)

In [None]:
model.fit(
    x=train.c_features.transpose("observations", "features", transpose_coords=False), 
    y=train.normppf.transpose("observations", "subtissue", transpose_coords=False), 
    batch_size=315230, 
    shuffle=False,
    validation_split=0.1,
    epochs=100,
    class_weight=train.missing.transpose("observations", "subtissue", transpose_coords=False),
    callbacks=[
        k.callbacks.EarlyStopping(patience=4),
    ],
)

In [None]:
model.save(os.path.join(CURRENT_MODEL_DIR, "model.h5"))

In [None]:
def predict_lazy(features, model, feature_dim="features", output_dim="subtissue", output_size=None):
    """
    Predicts using a (Keras-) model with a two-dimensional input and two-dimensional output,
    keeps xarray metadata and dask chunks
    """
    if output_size==None:
        output_size = model.output.shape[-1].value
    
    model_predict_lazy = da.gufunc(
        model.predict, 
        signature="(features)->(classes)", 
        output_dtypes="float32", 
        output_sizes={"classes": output_size}, 
        allow_rechunk=True, 
        vectorize=False
    )
    if isinstance(features, xr.DataArray):
        return xr.apply_ufunc(
            predict_lazy, features, 
            kwargs={"model": model}, 
            input_core_dims=[[feature_dim]], 
            output_core_dims=[[output_dim]], 
            dask="allowed",
        )
    else:
        return model_predict_lazy(features)

In [None]:
predicted = predict_lazy(test["c_features"], model)

In [None]:
to_save = predicted.reset_coords(drop=True).reset_index("observations").rename("predicted")
to_save

In [None]:
print("size of data to save: %.2f MB" % (to_save.nbytes/2**20))

In [None]:
to_save.sizes

In [None]:
with dask.config.set(scheduler='single-threaded'):
    to_save.chunk({"observations": 2**22 // to_save.sizes["subtissue"]}).to_dataset(name="predicted").to_zarr(os.path.join(CURRENT_MODEL_DIR, "predicted.zarr"), mode="w")

In [None]:
np.any(np.isnan(to_save)).compute()