In [None]:
# Tchourine, K., Vogel, C., and Bonneau, R. (2018)
# Condition-Specific Modeling of Biophysical Parameters Advances Inference of Regulatory Networks
# Cell Reports 23, 376–388. https://doi.org/10.1016/j.celrep.2018.03.048

# Load modules

from inferelator import utils
from inferelator.distributed.inferelator_mp import MPControl
from inferelator import workflow

# Set verbosity level to "Talky"
utils.Debug.set_verbose_level(1)

In [None]:
# Set the location of the input data and the desired location of the output files

DATA_DIR = '../data/tchourine'
OUTPUT_DIR = '~/tchourine_2018/'

# The expression and gene metadata are separated into clusters based on the biclustering approach in the
# published manuscript. 

EXPRESSION_FILE_NAME = ['expr_clust1.tsv.gz', 'expr_clust2.tsv.gz', 'expr_clust3.tsv.gz', 'expr_clust4.tsv.gz']
METADATA_FILE_NAME = ['meta_clust1.tsv.gz', 'meta_clust2.tsv.gz', 'meta_clust3.tsv.gz', 'meta_clust4.tsv.gz']
GENE_CLUSTER_FILE = ['genes_clust1.tsv.gz', 'genes_clust2.tsv.gz', 'genes_clust3.tsv.gz', 'genes_clust4.tsv.gz', 'genes_clust5.tsv.gz']
GENE_METADATA_FILE_NAME = 'orfs.tsv'
TF_LIST_FILE_NAME = 'tf_names_restrict.tsv'

In [None]:
# Define a new workflow class to handle crossvalidation of the Tau parameter for each individual cluster

from inferelator import amusr_workflow
from inferelator import utils
from inferelator.postprocessing.results_processor import ResultsProcessor
import numpy as np
import pandas as pd


class InferecladrRP(ResultsProcessor):

    def summarize_network(self, cluster_gene_list, gold_standard, priors):

        cluster_aupr = []
        for cluster_genes in cluster_gene_list:
            cluster_gold = gold_standard.reindex(gold_standard.index.intersection(cluster_genes))
            cluster_priors = priors.reindex(priors.index.intersection(cluster_genes))
            cluster_aupr.append(super(InferecladrRP, self).summarize_network(None, cluster_gold, cluster_priors))

        return cluster_aupr


def patch_puppet(obj):
    """
    Fix the emit results in a puppet for the inferecladr
    :param obj: PuppetWorkflow
    """

    import types

    def emit_results(self, betas, rescaled_betas, gold_standard, priors):
        if self.is_master():
            rp = InferecladrRP(betas, rescaled_betas, filter_method=self.gold_standard_filter_method)
            self.aupr = rp.summarize_network(self.cluster_gene_list, gold_standard, priors)
            return self.aupr

    obj.emit_results = types.MethodType(emit_results, obj)


class InferecladrWorkflow(amusr_workflow.MultitaskLearningWorkflow):
    seeds = None
    auprs = None
    taus = None

    # Gene cluster lists
    gene_cluster_list_file = None
    cluster_gene_list = None

    # What type of modeling used to crossvalidate taus
    cv_regression_type = "bbsr"
    cv_workflow_type = "tfa"

    final_network_file = "network.tsv"
    final_pr_curve = "pr_curve.pdf"

    def get_data(self):
        # Load the cluster lists and then call the normal get_data
        self.read_gene_cluster_lists()
        super(InferecladrWorkflow, self).get_data()

    def startup_finish(self):
        # Handle data preprocessing in the individual CV runs
        pass

    def read_gene_cluster_lists(self, file=None):
        """
        Load a list of gene list files or break a single dataframe with clusters as the second column
        into a list of genes
        """

        file = file if file is not None else self.gene_cluster_list_file

        if isinstance(file, list):
            self.cluster_gene_list = [self.input_dataframe(cluster_genes) for cluster_genes in file]
        else:
            cluster_df = self.input_dataframe(file, index_col=None, header=None, names=("gene", "cluster"))
            clusters = cluster_df.loc[:, "cluster"].sort_values().unique()
            self.cluster_gene_list = [cluster_df.loc[cluster_df["cluster"] == gene_clust, "gene"].tolist()
                                      for gene_clust in clusters]

    def run(self):
        """
        Execute workflow, after all configuration.
        """

        # Set the random seed (for bootstrap selection)
        np.random.seed(self.random_seed)

        # Call the startup workflow
        self.startup()

        # Crossvalidate clusters for tau
        self.crossvalidate_taus()
        self.tau = self.find_optimal_taus()

        # Pile up cluster data
        self.pileup_clusters()

        # Reset the workflow for the final regression
        self.cv_result_processor_type = ResultsProcessor
        final_task = self.new_puppet(self.expression_matrix, self.meta_data,
                                     gold_standard=self.gold_standard, priors_data=self.priors_data)
        final_task.network_file_name = self.final_network_file
        final_task.pr_curve_file_name = self.final_pr_curve
        final_task.write_network = True
        final_task.output_dir = self.output_dir
        final_task.run()

        return final_task.network

    def pileup_clusters(self):
        """
        Glue together the cluster data into one large data set
        :return:
        """
        self.expression_matrix = pd.concat(self.expression_matrix, axis=1)
        self.meta_data = pd.concat(self.meta_data, axis=0)

    def crossvalidate_taus(self):
        """
        Run a regression (2-fold CV) on each expression cluster with every random seed and every possible tau
        Set self.auprs[gene_cluster][tau] to a list of AUPR results for that gene cluster at a given tau
        """
        self.auprs = {k: {t: [] for t in self.taus} for k in range(len(self.expression_matrix))}
        for expr, meta in zip(self.expression_matrix, self.meta_data):
            assert expr.shape[1] == meta.shape[0]
            for tau in self.taus:
                for seed in self.seeds:
                    task = self.new_puppet(expr, meta, seed=seed)
                    task.split_gold_standard_for_crossvalidation = True
                    task.cv_split_ratio = 0.5
                    task.tau = tau
                    task.cluster_gene_list = self.cluster_gene_list
                    patch_puppet(task)
                    task.run()
                    for gene_cluster in range(len(self.cluster_gene_list)):
                        self.auprs[gene_cluster][tau].append(task.aupr[gene_cluster])

    def find_optimal_taus(self):

        n_clusters = len(self.cluster_gene_list)

        # Find the median AUPR for each gene_cluster & tau combination
        median_by_seed = {cluster: {tau: np.median(self.auprs[cluster][tau])
                                    for tau in self.tau}
                          for cluster in range(n_clusters) }

        # Find the TAU which maximizes median AUPR for each gene cluster
        best_aupr = {k: max(median_by_seed[k], key=median_by_seed[k].get) for k in range(n_clusters)}

        taus = []
        for gene_clust, genes in enumerate(self.cluster_gene_list):
            utils.Debug.vprint("Optimal Tau={tau} for cluster {cl}".format(tau=best_aupr[gene_clust], cl=gene_clust))
            taus.extend([best_aupr[gene_clust]] * len(genes))
        return taus


In [None]:
# Start Multiprocessing Engine
# Default to a single computer. Setting up a cluster is left as an exercise to the reader.

n_cores_dask = 200
activate_path = '~/.local/anaconda3/bin/activate'
dask_engine = False

n_cores_local = 3
local_engine = True

# The if __name__ is __main__ pragma protects against runaway multiprocessing
# Dask requires a slurm controller in an HPC environment.
# The conda or venv activate script is necessary to set the worker environment
# This code does NOT set the environment for the current process, only for workers

if __name__ == '__main__' and dask_engine:
    MPControl.set_multiprocess_engine("dask-cluster")
    MPControl.client.minimum_cores = n_cores_dask
    MPControl.client.maximum_cores = n_cores_dask
    MPControl.client.walltime = '48:00:00'
    MPControl.client.add_worker_env_line('module load slurm')
    MPControl.client.add_worker_env_line('module load gcc/8.3.0')
    MPControl.client.add_worker_env_line('source ' + activate_path)
    MPControl.client.cluster_controller_options.append("-p ccb")
    MPControl.connect()
    
# Multiprocessing uses the pathos implementation of multiprocessing (with dill instead of cPickle)
# This is suited for a single computer, but will likely be too slow for the example here
    
if __name__ == '__main__' and local_engine:
    MPControl.set_multiprocess_engine("multiprocessing")
    MPControl.client.processes = n_cores_local
    MPControl.connect()

# Set up the workflow

worker = workflow.inferelator_workflow(regression="bbsr", workflow=InferecladrWorkflow)
worker.input_dir = DATA_DIR
worker.output_dir = OUTPUT_DIR
worker.expression_matrix_file = EXPRESSION_FILE_NAME
worker.meta_data_file = METADATA_FILE_NAME
worker.gene_metadata_file = GENE_METADATA_FILE_NAME
worker.tf_names_file = TF_LIST_FILE_NAME
worker.gene_cluster_list_file = GENE_CLUSTER_FILE

# Set the seeding and the potential Tau values for crossvalidation

worker.seeds = range(42,62)
worker.taus = [0, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 120, 140, 160, 200, 250]

final_network = worker.run()