In [36]:
import matplotlib.pyplot as plt
import numpy as np
import typing
    
def make_values_list(values, types=(str, bool, int, float)):
    """Transform layer type to be sure a list of values is returned."""
    if type(values) == list:
        return values
    if type(values) == tuple:
        return list(values)
    elif type(values)==dict:
        return list(values.values())
    elif type(values) in types:
        return [values]
    else:
        raise TypeError('Layer name(s) {} should be given through a {},'.format(values, types)+\
                        'a list, or a dictionary(where the key would be the file_paths)')


def group_per_layer(
        multiplex_list
        ):
    """Group multiplex info per layer.

    Structure returned:
    {multiplex1: [layer1, layer2, ...],
     multiplex2: [layer1, layer2, ...]}

    """
    print(multiplex_list)
    multiplex_organised = dict()
    for multiplex_name in multiplex_list:
        # we instanciata a dict for each multiplex network
        multiplex_organised[multiplex_name] = dict()
        # we add the layer names
        multiplex_organised[multiplex_name]['layers'] =\
            [layer for layer in multiplex_list[multiplex_name]]
        # we add the graph type
        multiplex_organised[multiplex_name]['graph_type'] =\
            [multiplex_list[multiplex_name][layer] for layer in multiplex_list[multiplex_name]]
    return multiplex_organised


def save_config(config, filename):
    with open(filename, 'w') as f:
        yaml.dump(config, f)


def setup_proba_config(
        config: dict,
        eta: list[float],
        lamb: list[list[float]]):
    """ Setup the RWR probability for the exploration of hummus networks
    with the given eta and lambda values. """

    assert len(config['multiplex']) == len(eta),\
    'eta (length of {}) should be the same length as the number of layers ({})'\
        .format(len(eta), len(config['multiplex']))
    
    lamb = lamb.div(lamb.sum(axis=1), axis=0)
    config['eta'] = eta
    config['lambda'] = lamb.values.tolist()

    return config

# multilayer_f = '../../flattened_networks/ML_hESC_Chen_GeneNW_all_Peaks2Genes_UP0.5K_DOWN0.5K_nofilt_nofilt_TFlayer_nolinks'
# define_grn(multilayer_f, njobs=45)
def get_max_lamb(config):
    #get layer names connected by bipartites
    positions_bipartites = [(config['bipartite'][bipartite]['source'], config['bipartite'][bipartite]['target'])\
                            for bipartite in config['bipartite']]

    #create an empty dataframe with layer names that we'll fill where it's possible to be connected according to the bipartites
    max_lamb = pd.DataFrame(np.zeros((3,3)),
                            index = config['multiplex'].keys(),
                            columns = config['multiplex'].keys())

    # fill each position corresponding to bipartite
    for position in positions_bipartites:
        print(position[0], '<-->', position[1])
        max_lamb.loc[position] = 1
    # ( since we can inverse source and target layers)
    #could be conditionned by bipartite directionality
    max_lamb+=max_lamb.transpose()
    # filling the diagonal to allow intra-layer exploration
    max_lamb+=np.eye(3).astype(int)
    max_lamb = max_lamb.div(max_lamb.sum(axis=1), axis=0)

    
    return max_lamb


def check_lamb(lamb, config):
    
    max_lamb = (get_max_lamb(config)>0).astype(int)
    
    X = np.sum(np.sum((max_lamb - lamb)<0))
    if X>0:
        return False
    else:
        return True
    

def draw_lamb(df):
    
    fig, ax = plt.subplots()
    
    G=nx.from_pandas_adjacency(df, create_using=nx.DiGraph())
    pos = {list(G.nodes)[-i-1]:np.array((0, i)) for i in range(-len(G.nodes), 0)}
    print(list(pos.values())[0][1]-0.5, list(pos.values())[-1][1]+0.5 )
    labels = nx.get_edge_attributes(G, 'weight')
    nx.draw_networkx(G, with_labels=True, pos = pos,
                    node_size=1500, width=4, alpha=0.8, font_weight="bold", arrows=True,
                    connectionstyle='arc3, rad = 0.8')

    edge_labels = nx.get_edge_attributes(G, 'weight')
    edge_labels = {k: round(v, 2) for k,v in edge_labels.items()}
    print(edge_labels)
    self_edges = {}
    non_self_edges = {}
    for edge in edge_labels.keys():
        if edge[0]==edge[1]:
            self_edges[edge] = edge_labels[edge]
        else:
            non_self_edges[edge] = edge_labels[edge]


    pos_self_labels = {k:np.array([0, pos[k][1]+0.30]) for k in pos}
    my_draw_networkx_edge_labels(G, 
                                 pos_self_labels, 
                                 edge_labels=self_edges, 
                                 font_color='k', 
                                 font_size = 12, 
                                 label_pos=12, 
                                 rad = 0.8,
                                 rotate = False)
    
    my_draw_networkx_edge_labels(G, 
                                 pos, 
                                 edge_labels=non_self_edges, 
                                 font_color='k', 
                                 font_size = 12, 
                                 label_pos=0, 
                                 rad = 0.6,
                                 rotate = False)
    
    ax.set_ylim([list(pos.values())[0][1]-0.5,
                 list(pos.values())[-1][1]+0.5 ])
    plt.show()
    
    
def my_draw_networkx_edge_labels(
    G,
    pos,
    edge_labels=None,
    label_pos=0.5,
    font_size=10,
    font_color="k",
    font_family="sans-serif",
    font_weight="normal",
    alpha=None,
    bbox=None,
    horizontalalignment="center",
    verticalalignment="center",
    ax=None,
    rotate=True,
    clip_on=True,
    rad=0
):
    """Draw edge labels.

    Parameters
    ----------
    G : graph
        A networkx graph

    pos : dictionary
        A dictionary with nodes as keys and positions as values.
        Positions should be sequences of length 2.

    edge_labels : dictionary (default={})
        Edge labels in a dictionary of labels keyed by edge two-tuple.
        Only labels for the keys in the dictionary are drawn.

    label_pos : float (default=0.5)
        Position of edge label along edge (0=head, 0.5=center, 1=tail)

    font_size : int (default=10)
        Font size for text labels

    font_color : string (default='k' black)
        Font color string

    font_weight : string (default='normal')
        Font weight

    font_family : string (default='sans-serif')
        Font family

    alpha : float or None (default=None)
        The text transparency

    bbox : Matplotlib bbox, optional
        Specify text box properties (e.g. shape, color etc.) for edge labels.
        Default is {boxstyle='round', ec=(1.0, 1.0, 1.0), fc=(1.0, 1.0, 1.0)}.

    horizontalalignment : string (default='center')
        Horizontal alignment {'center', 'right', 'left'}

    verticalalignment : string (default='center')
        Vertical alignment {'center', 'top', 'bottom', 'baseline', 'center_baseline'}

    ax : Matplotlib Axes object, optional
        Draw the graph in the specified Matplotlib axes.

    rotate : bool (deafult=True)
        Rotate edge labels to lie parallel to edges

    clip_on : bool (default=True)
        Turn on clipping of edge labels at axis boundaries

    Returns
    -------
    dict
        `dict` of labels keyed by edge

    Examples
    --------
    >>> G = nx.dodecahedral_graph()
    >>> edge_labels = nx.draw_networkx_edge_labels(G, pos=nx.spring_layout(G))

    Also see the NetworkX drawing examples at
    https://networkx.org/documentation/latest/auto_examples/index.html

    See Also
    --------
    draw
    draw_networkx
    draw_networkx_nodes
    draw_networkx_edges
    draw_networkx_labels
    """

    if ax is None:
        ax = plt.gca()
    if edge_labels is None:
        labels = {(u, v): d for u, v, d in G.edges(data=True)}
    else:
        labels = edge_labels
    text_items = {}
    for (n1, n2), label in labels.items():
        (x1, y1) = pos[n1]
        (x2, y2) = pos[n2]
        (x, y) = (
            x1 * label_pos + x2 * (1.0 - label_pos),
            y1 * label_pos + y2 * (1.0 - label_pos),
        )
        pos_1 = ax.transData.transform(np.array(pos[n1]))
        pos_2 = ax.transData.transform(np.array(pos[n2]))
        linear_mid = 0.5*pos_1 + 0.5*pos_2
        d_pos = pos_2 - pos_1
        rotation_matrix = np.array([(0,1), (-1,0)])
        ctrl_1 = linear_mid + rad*rotation_matrix@d_pos
        ctrl_mid_1 = 0.5*pos_1 + 0.5*ctrl_1
        ctrl_mid_2 = 0.5*pos_2 + 0.5*ctrl_1
        bezier_mid = 0.5*ctrl_mid_1 + 0.5*ctrl_mid_2
        (x, y) = ax.transData.inverted().transform(bezier_mid)

        if rotate:
            # in degrees
            angle = np.arctan2(y2 - y1, x2 - x1) / (2.0 * np.pi) * 360
            # make label orientation "right-side-up"
            if angle > 90:
                angle -= 180
            if angle < -90:
                angle += 180
            # transform data coordinate angle to screen coordinate angle
            xy = np.array((x, y))
            trans_angle = ax.transData.transform_angles(
                np.array((angle,)), xy.reshape((1, 2))
            )[0]
        else:
            trans_angle = 0.0
        # use default box of white with white border
        if bbox is None:
            bbox = dict(boxstyle="round", ec=(1.0, 1.0, 1.0), fc=(1.0, 1.0, 1.0))
        if not isinstance(label, str):
            label = str(label)  # this makes "1" and 1 labeled the same

        t = ax.text(
            x,
            y,
            label,
            size=font_size,
            color=font_color,
            family=font_family,
            weight=font_weight,
            alpha=alpha,
            horizontalalignment=horizontalalignment,
            verticalalignment=verticalalignment,
            rotation=trans_angle,
            transform=ax.transData,
            bbox=bbox,
            zorder=1,
            clip_on=clip_on,
        )
        text_items[(n1, n2)] = t

    ax.tick_params(
        axis="both",
        which="both",
        bottom=False,
        left=False,
        labelbottom=False,
        labelleft=False,
    )

    return text_items

In [None]:
def general_config(
        multiplexes: dict[dict[str]],
        bipartites: typing.Union[str, list[str], dict[str]],
        seed_path: str = 'seeds/seeds.txt',
        folder_multiplexes='multiplex',
        folder_bipartites='bipartite',
        self_loops=0,
        restart_prob=0.7,
        bipartites_type: typing.Union[str, list[str], dict[str]] = ['00', '00']
        ):

    """Create a very general config file for the hummus pipeline."""
    config = dict()
    config['multiplex'] = dict()
    config['bipartite'] = dict()
    config['seed'] = seed_path
    config['self_loops'] = self_loops

    # We add the multiplexes to the config
    for multiplex_name in multiplexes:
        print('multiplex_name', multiplex_name)
        # If folder_multiplexes is None, use the multiplex name as folder name
        config['multiplex'][multiplex_name] = dict()
        
        config['multiplex'][multiplex_name]['layers'] =\
            [(folder_multiplexes+'/'+multiplex_name+'/'+layer).replace('//',
                                                                       '/')
             for layer in multiplexes[multiplex_name]['layers']]
        for layer in multiplexes[multiplex_name]['layers']:
            print('layer', layer)
        config['multiplex'][multiplex_name]['graph_type'] =\
            multiplexes[multiplex_name]['graph_type']

    # if type of bipartites not associated to their names already,
    # we create a dict with the same order as the bipartites
    bipartites_type = make_values_list(bipartites_type)
    if type(bipartites_type) == list:
        temp = dict()
        for i in range(len(bipartites)):
            temp[list(bipartites.keys())[i]] = bipartites_type[i]
        bipartites_type = temp

    # we add the bipartites
    print(type(bipartites_type))
    for bipartite in bipartites:
        bipartite_loc = folder_bipartites+'/'+bipartite
        config['bipartite'][bipartite_loc] = dict()
        config['bipartite'][bipartite_loc]['source'] = \
            bipartites[bipartite]['multiplex_left']
        config['bipartite'][bipartite_loc]['target'] = \
            bipartites[bipartite]['multiplex_right']
        config['bipartite'][bipartite_loc]['graph_type'] = \
            bipartites_type[bipartite]

    config['r'] = restart_prob
    return config

In [None]:
m_list = {'TF': {'TF_network': '00'}, 'RNA': {'RNA_GENIE3': '10'}, 'peaks': {'peak_network_cicero': '10'}}
m_list = group_per_layer(m_list)

         {'TF': {'TF_network': '00'}, 'RNA': {'RNA_GENIE3': '10'}, 'peaks': {'peak_network_cicero': '10'}}
b_list = {'atac_rna':{'multiplex_right' : 'peaks', 'multiplex_left' :'RNA'},
        'tf_peak' : {'multiplex_right' : "TF_network", 'multiplex_left': "peaks"}}

In [None]:
m_list

In [None]:
general_config(m_list, b_list)

#### Lamb related checking functions

In [None]:
draw_lamb(lamb)

#### Random walks functions

In [None]:
def compute_multiple_RandomWalk(
    multilayer_f,
    config_name,
    output_f,
    list_seeds,
    config_folder='config',
    save=True,
    return_df=False,
    spec_layer_result_saved='all',
    njobs=1):

    ranking_all_dfs = pd.DataFrame(columns = ['layer', 'target', 'path_layer', 'score', 'seed'])

    l_ranking_df = Parallel(n_jobs=njobs)(delayed(compute_RandomWalk)(multilayer_f, config_name, seeds, config_folder, spec_layer_result_saved)\
                                                                          for seeds in tqdm(list_seeds,
                                                                                            position = 0, 
                                                                                           leave = True))
    ranking_all_dfs = pd.concat([ranking_all_dfs]+l_ranking_df)
    ranking_all_dfs = ranking_all_dfs.sort_values(by='score')

    if save:
        assert output_f!=None, 'You need to provide an output_f name to save the random walks result'
        ranking_all_dfs.to_csv(output_f, sep='\t', index=False, header=True)
    if return_df:
        return ranking_all_dfs

def compute_RandomWalk(
    multilayer_f,
    config_name,
    seeds,
    config_folder='config',
    spec_layer_result_saved='all',
    unnamed=False,
    njobs=1):

    # seeds file names
    seeds = make_values_list(seeds)
    if unnamed is True:
        seeds_filename = 'seeds.txt'
        if njobs>1:
            raise Exception("Impossible to use unnamed seeds files while parallelising random walks.")
    else:
        seeds_filename = '_'.join(seeds)

    # write seeds file
    with open(multilayer_f+'/seeds/'+seeds_filename+'.txt', 'w') as f:
        f.write('\n'.join(seeds)+'\n')        
    
    # config file personalised with seed file
    with open(multilayer_f+'/{}/'.format(config_folder)+config_name, 'r') as f:
        config = yaml.load(f, Loader=yaml.BaseLoader)
        config['seed'] = 'seeds/'+seeds_filename+'.txt'
    with open(multilayer_f+'/{}/'.format(config_folder)+seeds_filename+'_'+config_name, 'w') as f:
        yaml.dump(config, f)
        
    # multixrank
    multixrank_obj = mxr.Multixrank(config=multilayer_f+'/{}/'.format(config_folder)+seeds_filename+'_'+config_name, wdir=multilayer_f)
    ranking_df = multixrank_obj.random_walk_rank()
    
    # and filter df results andadd seeds name
    ranking_df['tf'] = '_'.join(seeds)
    ranking_df = ranking_df[ranking_df.score > 0]  # ??
    ranking_df.columns = ['layer', 'target', 'path_layer', 'score', 'seed']
    if spec_layer_result_saved != 'all':
        if type(spec_layer_result_saved)==str:
            spec_layer_result_saved = [spec_layer_result_saved]
        ranking_df = ranking_df[ranking_df['layer'].isin(spec_layer_result_saved)]

    return ranking_df

#### Probas for classic GRN definitions

In [None]:
import hummuspy.config

In [None]:
hummuspy.config.get_classic_grn_lamb

In [None]:
def get_classic_grn_lamb(config,
                         tf_multiplex='TF',
                         peak_multiplex='peaks',
                         rna_multiplex='RNA',
                         draw=True
                        ):
    
    for multiplex in [tf_multiplex, peak_multiplex, rna_multiplex]:
        assert multiplex in config['multiplex'].keys(), 'The multiplex {} is not in the config file provided'.format(multiplex)
        
    ordered_multiplexes = config['multiplex'].keys()
    lamb = pd.DataFrame(np.ones((3,3)),
                        index = ordered_multiplexes,
                        columns = ordered_multiplexes)
    # Remove proba between TF and RNA layers
    lamb.loc[tf_multiplex, rna_multiplex] = 0
    lamb.loc[rna_multiplex, tf_multiplex] = 0
    lamb = lamb.div(lamb.sum(axis=1), axis=0)

    # max_lambd check to see
    assert check_lamb(config=config, lamb=lamb), 'There seem to be a incoherence between bipartite source/targets and multiplex names provided in get_classic_grn_lamb'
    
    if draw == True:
        to_draw_lamb = lamb.loc[[tf_multiplex, peak_multiplex, rna_multiplex],
                                [tf_multiplex, peak_multiplex, rna_multiplex]]
        
        draw_lamb(to_draw_lamb)
        
    return lamb


def get_classic_grn_eta(config,
                        rna_multiplex='RNA'):

    ordered_multiplex = config['multiplex'].keys()
    assert rna_multiplex in ordered_multiplex, "It seems rna_multiplex not in config['multiplex']"

    eta = [0 if multiplex!=rna_multiplex else 1 for multiplex in ordered_multiplex ]
    return eta


def define_grn(
    multilayer_f,
    config,
    gene_list = 'all',
    tf_list = 'all',
    config_name='grn_config.yml',
    config_folder = 'config',
    tf_multiplex:str='TF',
    peak_multiplex:str='peaks',
    rna_multiplex:str='RNA',
    update_config=True,
    save=False,
    return_df=True,
    output_f=None,
    njobs=1):
    
    # store mutliplex already because it will be when saving yaml file,
    # while eta and lambda won't.
    config['multiplex'] = {k:config['multiplex'][k] for k in sorted(config['multiplex'].keys())}
    
    if update_config:
        eta = get_classic_grn_eta(config)
        lamb = get_classic_grn_lamb(config, draw=False)
        config = setup_proba_config(config, eta, lamb)
    config_path = multilayer_f+'/'+config_folder+'/'+config_name
    save_config(config, config_path)
    
    if gene_list == 'all':
        gene_list = []
        for layer in config['multiplex'][rna_multiplex]['layers']:
            df_layer = pd.read_csv(multilayer_f+'/'+layer, sep='\t', header=None, index_col=None)
            layer_nodes = np.concatenate([np.unique(df_layer[0].values),
                                          np.unique(df_layer[1].values)])
            gene_list = np.unique(np.concatenate([gene_list,
                                                layer_nodes]))            

    df = compute_multiple_RandomWalk(multilayer_f,
                                config_name=config_name,
                                output_f=output_f,
                                list_seeds = gene_list,
                                config_folder=config_folder,
                                save=False,
                                return_df=return_df,
                                spec_layer_result_saved=tf_multiplex,
                                njobs=1)

    df['gene'] = df['seed']
    df['tf'] = df['target']
    del df['target']
    del df['seed']

    if tf_list == 'all':
        tf_list = []
        for layer in config['multiplex'][tf_multiplex]['layers']:
            df_layer = pd.read_csv(multilayer_f+'/'+layer, sep='\t', header=None, index_col=None)
            layer_nodes = np.concatenate([np.unique(df_layer[0].values),
                                          np.unique(df_layer[1].values)])
            tf_list = np.unique(np.concatenate([tf_list,
                                                layer_nodes]))

    # Add normalisation ?
    df = df[df['tf'].isin(tf_list)]

    if save:
        assert output_f!=None, 'You need to provide an output_f name to save the GRN result'
        ranking_all_dfs.sort_values(by='score').to_csv(output_f, sep='\t', index=False, header=True)

    if return_df:
        return df

In [None]:
import typing
import yaml
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
from joblib import Parallel, delayed
import multixrank as mxr

In [None]:
filename = "./a/config.yaml"

with open(filename, 'r') as f:
        config = yaml.load(f, Loader=yaml.BaseLoader)

In [None]:
config['multiplex']['RNA']['graph_type'] = ['10']

In [None]:
config['multiplex']['peaks']['graph_type'] = ['10']

In [None]:
define_grn('a', config, njobs = 10)

In [None]:
config

In [None]:
multiplex_list = {'TF': {'TF_network': '00'}, 'RNA': {'RNA_GENIE3': '00'}, 'peak_network': {'peak_network_GENIE3': '00'}}

In [None]:
a = group_per_layer(multiplex_list)

In [None]:
bipartites_list = {'RNA_peak' : {'multiplex_right' : 'TF', 'multiplex_left' : 'peak'}, 
                   'TF_peak' : {'multiplex_right' : 'RNA', 'multiplex_left' : 'peak'}}

In [None]:
general_config(a, bipartites_list)

In [None]:
lamb = get_classic_grn_lamb(config)

In [None]:
draw_lamb(lamb)

In [None]:
bipartites_list

In [None]:
import typing
import os
import yaml
import re
import numpy as np
import pandas as pd
import multixrank as mxr
import time
from joblib import Parallel, delayed
from tqdm import tqdm

def make_values_list(values, types=(str, bool, int, float)):
    """Transform layer type to be sure a list of values is returned."""
    if type(values) == list:
        return values
    elif type(values)==dict:
        return list(values.values())
    elif type(values) in types:
        return [values]
    else:
        raise TypeError('Layer name(s) {} should be given through a {},'.format(values, types)+\
                        'a list, or a dictionary(where the key would be the file_paths)')



def detailed_config(
    request: str,
    filename: str,
    tf_layers: typing.Union[str, list[str], dict[str]],
    atac_layers: typing.Union[str, list[str], dict[str]],
    rna_layers: typing.Union[str, list[str], dict[str]],
    tf_atac_links: str, 
    atac_rna_links: str,
    seed_path: str = 'seeds/seeds.txt',
    folder_tf_layers = 'layer_TFS',
    folder_atac_layers = 'layer_PEAKS',
    folder_rna_layers = 'layer_GENES',
    tf_layers_weighted: typing.Union[bool, list[bool], dict[bool]] = False, 
    atac_layers_weighted: typing.Union[bool, list[bool], dict[bool]] = True, 
    rna_layers_weighted: typing.Union[bool, list[bool], dict[bool]] = True,
    tf_layers_directed: typing.Union[bool, list[bool], dict[bool]] = False, 
    atac_layers_directed: typing.Union[bool, list[bool], dict[bool]] = False, 
    rna_layers_directed: typing.Union[bool, list[bool], dict[bool]] = False,
    tf_atac_links_weighted: bool = False,
    atac_rna_links_weighted: bool = False,
    tf_atac_links_directed: bool = False, 
    atac_rna_links_directed: bool = False,
    r:float = 0.7,
    self_loops = 0, 
    return_config = False):
    
    config = define_minimal_config(
        tf_layers=tf_layers,
        atac_layers=atac_layers,
        rna_layers=rna_layers,
        tf_atac_links=tf_atac_links, 
        atac_rna_links=atac_rna_links,
        seed_path=seed_path,
        folder_tf_layers = folder_tf_layers,
        folder_atac_layers = folder_atac_layers,
        folder_rna_layers = folder_rna_layers,
        tf_layers_weighted=tf_layers_weighted, 
        atac_layers_weighted=atac_layers_weighted, 
        rna_layers_weighted=rna_layers_weighted,
        tf_layers_directed=tf_layers_directed, 
        atac_layers_directed=atac_layers_directed, 
        rna_layers_directed=rna_layers_directed,
        tf_atac_links_weighted=tf_atac_links_weighted,
        atac_rna_links_weighted=atac_rna_links_weighted,
        tf_atac_links_directed=tf_atac_links_directed, 
        atac_rna_links_directed=atac_rna_links_directed,
        r=r,
        self_loops = self_loops)
    
    if request.upper()=='GRN':
        layers_eta = [1, 0, 0]
        
        #Check direction of lamb entry in the last multixrank version !!!
        go_to_tfs   = [0, '1/3', 0]
        go_to_peaks = [1, '1/3', 0.5]
        go_to_genes = [0, '1/3', 0.5]

    elif request.enhancers()=='enhancers':
        layers_eta = [0, 0, 1]

        #Check direction of lamb entry in the last multixrank version !!!
        go_to_tfs   = [0, 0, 0]
        go_to_peaks = [1, 1, 1]
        go_to_genes = [0, 0, 0]

    elif request.enhancers()=='target_regions':
        layers_eta = [1, 0, 0]

        #Check direction of lamb entry in the last multixrank version !!!
        go_to_tfs   = [0, 0, 0]
        go_to_peaks = [1, 1, 1]
        go_to_genes = [0, 0, 0]

    layers_proba = [go_to_tfs, go_to_peaks, go_to_genes]
    layers_names = [folder_tf_layers, folder_atac_layers, folder_rna_layers]
    
    #sort transition matrix based on order of layers in the config file, aka alphabetical order
    config['lamb'] = [[val for _, val in sorted(zip(layers_names, go_to_proba))]\
                      for _, go_to_proba in sorted(zip(layers_names, layers_proba))]
    config['eta']  = [eta for _, eta  in sorted(zip(layers_names, layers_eta))]
    
    with open(filename, 'w') as f:
        yaml.dump(config, f)

    if return_config:
        return config

define_grn()

In [None]:
def define_grn(
    multilayer_f: str,
    TFs: typing.Union[str, list[str], dict[str]] = 'all',
    save: bool = True,
    output_f_grn = 'RW_grn.tsv',
    Return: bool = False,
    njobs:int = 1,
    target_saved: typing.Union['genes', 'peaks', 'tfs'] = 'genes',
    tf_layers: typing.Union[str, list[str], dict[str]] = 'tf_edges.tsv',
    atac_layers: typing.Union[str, list[str], dict[str]] = 'peak_edges.tsv',
    rna_layers: typing.Union[str, list[str], dict[str]] = 'gene_edges.tsv',
    tf_atac_links: str = 'bipartite/tfs2peaks.tsv', 
    atac_rna_links: str = 'bipartite/peaks2genes.tsv',
    config_name='grn_config.yml',
    config_folder='config',
    seed_path: str = 'seeds/seeds.txt',
    folder_tf_layers: str = 'multiplex/layer_TFS',
    folder_atac_layers: str = 'multiplex/layer_PEAKS',
    folder_rna_layers: str = 'multiplex/layer_GENES',
    tf_layers_weighted: typing.Union[bool, list[bool], dict[bool]] = False, 
    atac_layers_weighted: typing.Union[bool, list[bool], dict[bool]] = True, 
    rna_layers_weighted: typing.Union[bool, list[bool], dict[bool]] = True,
    tf_layers_directed: typing.Union[bool, list[bool], dict[bool]] = False, 
    atac_layers_directed: typing.Union[bool, list[bool], dict[bool]] = False, 
    rna_layers_directed: typing.Union[bool, list[bool], dict[bool]] = False,
    tf_atac_links_weighted: bool = False,
    atac_rna_links_weighted: bool = False,
    tf_atac_links_directed: bool = False, 
    atac_rna_links_directed: bool = False,
    r:float = 0.7,
    self_loops: bool = 0):

    detailed_config(
        request='grn',
        filename=multilayer_f+'/'+config_folder+'/'+config_name,
        tf_layers=tf_layers,
        atac_layers=atac_layers,
        rna_layers=rna_layers,
        tf_atac_links=tf_atac_links,
        atac_rna_links=atac_rna_links,
        seed_path=seed_path,
        folder_tf_layers=folder_tf_layers,
        folder_atac_layers=folder_atac_layers,
        folder_rna_layers=folder_rna_layers,
        tf_layers_weighted=tf_layers_weighted,
        atac_layers_weighted=atac_layers_weighted,
        rna_layers_weighted=rna_layers_weighted,
        tf_layers_directed=tf_layers_directed,
        atac_layers_directed=atac_layers_directed,
        rna_layers_directed=rna_layers_directed,
        tf_atac_links_weighted=tf_atac_links_weighted,
        atac_rna_links_weighted=atac_rna_links_weighted,
        tf_atac_links_directed=tf_atac_links_directed,
        atac_rna_links_directed=atac_rna_links_directed,
        r=r,
        self_loops=self_loops,
        return_config=False)
    
    if TFs == 'all':
        TFs = list(pd.read_csv(multilayer_f + '/' + tf_atac_links, sep="\t", header=None)[0].str.strip().unique())
    TFs = make_values_list(TFs)
    
    rna_layers = make_values_list(rna_layers)
    print([folder_rna_layers+'/'+rna_layer for rna_layer in rna_layers])

    if Return is True:
        grn = compute_multiple_RandomWalk(
            multilayer_f=multilayer_f,
            config_name=config_name,
            output_f=output_f_grn,
            list_seeds=TFs,
            config_folder=config_folder,
            save=save,
            Return=Return,
            spec_layer_result_saved=folder_rna_layers,
            njobs=njobs)
        return grn
    else:
        grn = compute_multiple_RandomWalk(
            multilayer_f=multilayer_f,
            config_name=config_name,
            output_f=output_f_grn,
            list_seeds=TFs,
            config_folder=config_folder,
            save=save,
            Return=Return,
            spec_layer_result_saved=folder_rna_layers,
            njobs=njobs)

In [10]:
config = {'bipartite': {'bipartite/atac_rna.tsv': {'graph_type': '00',
   'source': 'RNA',
   'target': 'peaks'},
  'bipartite/tf_peak.tsv': {'graph_type': '00',
   'source': 'peaks',
   'target': 'TF'}},
 'eta': ['1', '0', '0'],
 'lambda': [['1', '0', '0'], ['1/3', '1/3', '1/3'], ['0', '1/2', '1/2']],
 'multiplex': {'TF': {'graph_type': ['00'],
   'layers': ['multiplex/TF/TF_network.tsv']},
  'RNA': {'graph_type': ['00'], 'layers': ['multiplex/RNA/RNA_GENIE3.tsv']},
  'peaks': {'graph_type': ['00'],
   'layers': ['multiplex/peaks/peak_network_cicero.tsv']}},
 'seed': 'seeds/seeds.txt',
 'self_loops': '0'}
config

In [None]:
multixrank_obj = mxr.Multixrank(config=filename, wdir='a')
ranking_df = multixrank_obj.random_walk_rank()

In [None]:
ranking_df

#### Errors
Graph types of multiplex should be lists

No empty tsv file as layers

bipartites instead of bipartite in path config bipartites

Add .tsv at the end of bipartites path

seeds/seed.tsv doesn't exist

remove eta and lamb from config file if null cause must be numbers etc

In [None]:
config['multiplex']

In [None]:
lamb = get_max_lamb(config)
lamb

In [None]:
eta = get_classic_grn_eta(config)
lamb = get_classic_grn_lamb(config, draw=True)
config = setup_proba_config(config, eta, lamb)
eta, lamb, config

### Test hummuspy pip version

In [None]:
import hummuspy.config

In [None]:
hummuspy.config.general_config(multiplex_list, bipartites_list)

In [None]:
bipartites_type

In [None]:
make_values_list(('00', '00'))

Copy pasta of config functions

In [None]:
seeds = make_values_list('A2M')

In [3]:
multiplex_list = {'TF': {'TF_network': '00'}, 'RNA': {'RNA_GENIE3': '00'}, 'peaks': {'peak_network_cicero': '00'}}

In [4]:
bipartites_list = {'atac_rna.tsv' : {'multiplex_right' : 'RNA', 'multiplex_left' : 'peaks'}, 
                   'tf_peak.tsv' : {'multiplex_right' : 'TF', 'multiplex_left' : 'peaks'}}

In [None]:
df = define_grn(
    'b',
    c,
    gene_list=None,
    njobs = 8,
    draw=True)

In [4]:
import numpy as np

In [5]:
a = np.array([1,2,3])
a = a[a!=1]

In [6]:
a

array([2, 3])

In [None]:
df[df['gene']=='ARID3A'].score.value_counts(), df[df['gene']=='ARID3A'][df[df['gene']=='ARID3A']['tf']=='fake_node']

In [None]:
get_classic_grn_lamb(c)

In [None]:
df

In [None]:
bipartites_list

In [1]:
c = {'multiplex': {'RNA': {'layers': ['multiplex/RNA/RNA_GENIE3.tsv'],
   'graph_type': ['00']},
  'TF': {'layers': ['multiplex/TF/TF_network.tsv'], 'graph_type': ['00']},
  'peaks': {'layers': ['multiplex/peaks/peak_network_GENIE3.tsv'],
   'graph_type': ['00']}},
 'bipartite': {'bipartite/RNA_peak': {'source': 'peaks',
   'target': 'TF',
   'graph_type': '00'},
  'bipartite/TF_peak': {'source': 'peaks',
   'target': 'RNA',
   'graph_type': '00'}},
 'seed': 'seeds/seeds.txt',
 'self_loops': 0,
 'r': 0.7,
 'eta': [1, 0, 0],
 'lambda': [[0.5, 0.0, 0.5],
  [0.0, 0.5, 0.5],
  [0.3333333333333333, 0.3333333333333333, 0.3333333333333333]]}

In [2]:
import hummuspy.explore_network

In [6]:
multilayer_f = 'b'

In [21]:
multiplexes_list =  {'TF': {'TF_network': '00'}, 'RNA': {'RNA_GENIE3': '10'}, 'peaks': {'peak_network_cicero': '10'}}
bipartites_list =  {'atac_rna.tsv': {'multiplex_right': 'peaks', 'multiplex_left': 'RNA'},
                    'tf_peak.tsv': {'multiplex_right': 'TF', 'multiplex_left': 'peaks'}}

In [27]:
grn = hummuspy.explore_network.define_grn_for_R(
    multilayer_f,
    multiplex_list,
    bipartites_list,
    tf_multiplex = "TF",
    peak_multiplex = "peaks",
    rna_multiplex = "RNA",
    update_config = True,
    njobs=15)

In [3]:
c = {'multiplex': {'RNA': {'layers': ['multiplex/RNA/RNA_GENIE3.tsv'],
   'graph_type': ['00']},
  'TF': {'layers': ['multiplex/TF/TF_network.tsv'], 'graph_type': ['00']},
  'peaks': {'layers': ['multiplex/peaks/peak_network_GENIE3.tsv'],
   'graph_type': ['00']}},
 'bipartite': {'bipartite/RNA_peak': {'source': 'peaks',
   'target': 'TF',
   'graph_type': '00'},
  'bipartite/TF_peak': {'source': 'peaks',
   'target': 'RNA',
   'graph_type': '00'}},
 'seed': 'seeds/seeds.txt',
 'self_loops': 0,
 'r': 0.7,
 'eta': [1, 0, 0],
 'lambda': [[0.5, 0.0, 0.5],
  [0.0, 0.5, 0.5],
  [0.3333333333333333, 0.3333333333333333, 0.3333333333333333]]}



In [55]:
def initialise_lamb(config,
                    tf_multiplex='TF',
                    peak_multiplex='peaks',
                    rna_multiplex='RNA',
                    value = 1):
    
    for multiplex in [tf_multiplex, peak_multiplex, rna_multiplex]:
        assert multiplex in config['multiplex'].keys(),\
            "The multiplex {}".format(multiplex) +\
            " is not in the config file provided"

    ordered_multiplexes = config['multiplex'].keys()

    if value == 1:
        array = np.ones((3, 3))
    elif value == 0:
        array = np.zeros((3,3))
    else:
        raise ValueError('value param of initialise_lamb should only be 1 or 0')
    
    lamb = pd.DataFrame(array,
                        index=ordered_multiplexes,
                        columns=ordered_multiplexes)
    return lamb


In [70]:
#################################
#  Get enhancers classic lamb   #
#################################
def get_classic_enhancers_lamb(config,
                         tf_multiplex='TF',
                         peak_multiplex='peaks',
                         rna_multiplex='RNA',
                         draw=True
                         ):

    lamb = initialise_lamb(config,
                           tf_multiplex,
                           peak_multiplex,
                           rna_multiplex,
                           value=0) # because enhancer lamb is mostly 0s
    print(lamb)
    
    # Remove proba between TF and RNA layers
    lamb.loc[peak_multiplex, rna_multiplex] = 1
    lamb.loc[peak_multiplex, peak_multiplex] = 1
    lamb.loc[rna_multiplex, peak_multiplex] = 1
    lamb = lamb.div(lamb.sum(axis=1),
                    axis=0)
    lamb = lamb.fillna(0)
    print(lamb)

    # max_lambd check to see
    assert check_lamb(config=config, lamb=lamb), "There seem to be a " +\
        "incoherence between bipartite source/targets and multiplex names" +\
        "provided in get_classic_grn_lamb"

    if draw is True:
        to_draw_lamb = lamb.loc[[tf_multiplex, peak_multiplex, rna_multiplex],
                                [tf_multiplex, peak_multiplex, rna_multiplex]]
        draw_lamb(to_draw_lamb)

    return lamb

In [75]:
#################################
#  Get binding regions classic lamb   #
#################################
def get_target_genes_lamb(config,
                         tf_multiplex='TF',
                         peak_multiplex='peaks',
                         rna_multiplex='RNA',
                         draw=True
                         ):

    lamb = initialise_lamb(config,
                           tf_multiplex,
                           peak_multiplex,
                           rna_multiplex,
                           value=1) # because enhancer lamb is mostly 0s
    print(lamb)
    
    # Remove proba between TF and RNA layers
    lamb.loc[tf_multiplex, rna_multiplex] = 0
    lamb.loc[rna_multiplex, tf_multiplex] = 0
    lamb = lamb.div(lamb.sum(axis=1),
                    axis=0)
    lamb = lamb.fillna(0)
    print(lamb)

    # max_lambd check to see
    assert check_lamb(config=config, lamb=lamb), "There seem to be a " +\
        "incoherence between bipartite source/targets and multiplex names" +\
        "provided in get_classic_grn_lamb"

    if draw is True:
        to_draw_lamb = lamb.loc[[tf_multiplex, peak_multiplex, rna_multiplex],
                                [tf_multiplex, peak_multiplex, rna_multiplex]]
        draw_lamb(to_draw_lamb)

    return lamb

In [76]:
def get_single_layer_eta(config, starting_multiplex='RNA'):

    ordered_multiplex = config['multiplex'].keys()
    assert rna_multiplex in ordered_multiplex, "It seems rna_multiplex not " +\
        "in config['multiplex']"

    eta = [0 if multiplex != starting_multiplex else 1
           for multiplex in ordered_multiplex]

    return eta

In [8]:
hummuspy.config.get_target_genes_lamb(config)

In [38]:
import pandas as pd

In [1]:
import hummuspy.explore_network
import hummuspy.config

In [17]:
hummuspy.explore_network.define_grn_from_config('a', config, njobs=5)

In [28]:
from hummuspy.explore_network import *
def get_output_from_dicts(
        output_request: Union['grn', 'enhancers', 'binding_regions', 'target_genes'],
        multilayer_f,
        multiplexes_list,
        bipartites_list,
        folder_multiplexes='multiplex',
        folder_bipartites='bipartite',
        gene_list=None,
        tf_list=None,
        config_filename='grn_hummuspy.config.yml',
        config_folder='config',
        tf_multiplex: str = 'TF',
        peak_multiplex: str = 'peaks',
        rna_multiplex: str = 'RNA',
        update_config=True,
        save=False,
        return_df=True,
        output_f=None,
        njobs=1):
    """Define a GRN from a multilayer network and a config file.
    Random walks are computed for each gene in the gene list and we keep
    the probability to reach each TF in the TF list.
    You can provide a list of genes and TFs to restrict the GRN.
    You can choose to save the result in a file and/or return it.

    Parameters
    ----------
    multilayer_f : str
        Path to the multilayer folder.
    config : dict
        Config dictionnary.
    gene_list : list, optional
        List of genes. The default is 'all'.
    tf_list : list, optional
        List of TFs. The default is 'all'.
    config_name : str, optional
        Name of the config file. The default is 'grn_hummuspy.config.yml'.
    config_folder : str, optional
        Name of the config folder. The default is 'config'.
    tf_multiplex : str, optional
        Name of the TF multiplex. The default is 'TF'.
    peak_multiplex : str, optional
        Name of the peak multiplex. The default is 'peaks'.
    rna_multiplex : str, optional
        Name of the RNA multiplex. The default is 'RNA'.
    update_config : bool, optional
        Update the config file. The default is True.
    save : bool, optional
        Save the result. The default is False.
    return_df : bool, optional
        Return the result. The default is True.
    output_f : str, optional
        Name of the output file. The default is None.
    njobs : int, optional
        Number of jobs. The default is 1.

    Returns
    -------
    df : pd.DataFrame
        Dataframe containing the random walks's results that defines the GRN.
        Columns:
            layer : str
                Name of the target layer.

            path_layer : str
                Name of the layer of the path.
            score : float
                Score of the random walk.
            gene : str
                Name of the gene-seed.
            tf : str
                Name of the TF-target.

    """

    njobs = int(njobs)
    print('multiplexes_list : ', multiplexes_list)
    print('bipartites_list : ', bipartites_list)
    print('folder_multiplexes : ', folder_multiplexes)
    print('folder_bipartites : ', folder_bipartites)
    print('gene_list : ', gene_list)
    print('tf_list : ', tf_list)
    print('config_filename : ', config_filename)
    print('config_folder : ', config_folder)
    print('tf_multiplex : ', tf_multiplex)
    print('peak_multiplex : ', peak_multiplex)
    print('rna_multiplex : ', rna_multiplex)
    print('update_config : ', update_config)
    print('save : ', save)
    print('return_df : ', return_df)
    print('output_f : ', output_f)
    print('njobs : ', njobs)

    # Create general config file
    config = hummuspy.config.general_config(
        multiplexes=multiplexes_list,
        bipartites=bipartites_list,
        folder_multiplexes=folder_multiplexes,
        folder_bipartites=folder_bipartites,
        suffix='.tsv',
        self_loops=0,
        restart_prob=0.7,
        bipartites_type=('00', '00'),
        save_configfile=False,
        config_filename=config_filename)

    parameters = {
        'multilayer_f':   multilayer_f,
        'config':         config,
        'gene_list':      gene_list,
        'tf_list':        tf_list,
        'config_name':    config_filename,
        'config_folder':  config_folder,
        'tf_multiplex':   tf_multiplex,
        'peak_multiplex': peak_multiplex,
        'rna_multiplex':  rna_multiplex,
        'update_config':  update_config,
        'save':           save,
        'return_df':      return_df,
        'output_f':       output_f,
        'njobs':          njobs
    }
    
    if output_request == 'grn':
        df = define_grn_from_config(**parameters)
        
    elif output_request == 'enhancers':
        df = define_enhancers_from_config(**parameters)

    elif output_request == 'binding_regions':
        df = define_binding_regions_from_config(**parameters)

    elif output_request == 'target_genes':
        df = define_target_genes_from_config(**parameters)

    return df

In [29]:
def make_values_list(
        values,
        types: Union[str, bool, int, float] = (str, bool, int, float)):
    """Transform layer type to be sure a list of values is returned.
    Parameters
    ----------
    values: list, dict, str, bool, int, float
        The values to transform into a list.
    types: list, dict, str, bool, int, float
        The types of the values to transform into a list.
    Returns
    -------
    values: list
        The values transformed into a list.
    Raises
    ------
    TypeError if the values are not of the given types.

    """
    types = list(types)
    if type(values) == list:
        return values
    elif type(values) == tuple:
        return list(values)
    elif type(values) == dict:
        return list(values.values())
    elif type(values) in types:
        return [values]
    else:
        raise TypeError('Layer name(s) {} should be given through {},'.format(
            values,
            types) +
            'a list, or a dictionary(where the key would be the file_paths)')

In [30]:
from typing import Union

In [31]:
multiplex_list = {'TF': {'TF_network': '00'}, 'RNA': {'RNA_GENIE3': '00'}, 'peaks': {'peak_network_cicero': '00'}}

In [32]:
bipartites_list = {'atac_rna.tsv' : {'multiplex_right' : 'RNA', 'multiplex_left' : 'peaks'}, 
                   'tf_peak.tsv' : {'multiplex_right' : 'TF', 'multiplex_left' : 'peaks'}}

In [36]:
df = get_output_from_dicts('grn', 
                      'b',
                      multiplexes_list=multiplex_list,
                      bipartites_list=bipartites_list,
                     njobs = 8)

In [39]:
df.reset_index(drop=True)

In [2]:
hummuspy.explore_network.__dict__

{'__name__': 'hummuspy.explore_network',
 '__doc__': None,
 '__package__': 'hummuspy',
 '__loader__': <_frozen_importlib_external.SourceFileLoader at 0x7fcbf45c1240>,
 '__spec__': ModuleSpec(name='hummuspy.explore_network', loader=<_frozen_importlib_external.SourceFileLoader object at 0x7fcbf45c1240>, origin='/home/rtrimbou/miniconda3/lib/python3.10/site-packages/hummuspy/explore_network.py'),
 '__file__': '/home/rtrimbou/miniconda3/lib/python3.10/site-packages/hummuspy/explore_network.py',
 '__cached__': '/home/rtrimbou/miniconda3/lib/python3.10/site-packages/hummuspy/__pycache__/explore_network.cpython-310.pyc',
 '__builtins__': {'__name__': 'builtins',
  '__doc__': "Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil' object; Ellipsis represents `...' in slices.",
  '__package__': '',
  '__loader__': _frozen_importlib.BuiltinImporter,
  '__spec__': ModuleSpec(name='builtins', loader=<class '_frozen_importlib.BuiltinImporter'>, origin='built-in'),
  '__