forked from raphael-group/hotnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hotnet.py
executable file
·116 lines (89 loc) · 4.79 KB
/
hotnet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# -*- coding: iso-8859-1 -*-
import networkx as nx, scipy as sp, numpy as np
from collections import defaultdict
################################################################################
# Influence and similarity matrix functions
def induce_infmat(infmat, index2gene, genelist, quiet=True):
"""Create and return induced influence matrix containing influence scores only for those genes
in the given gene list and a index to gene mapping for the returned matrix.
Arguments:
infmat -- 2D ndarray representing the full influence matrix
index2gene -- dict mapping an index in the matrix to the name of the gene represented at that
index in the influence matrix
genelist -- list of genes for which influence scores should be preserved in the returned matrix.
This should be the genes that have heat scores.
"""
if not quiet: print "* Inducing infmat..."
start_index = min(index2gene.keys())
# Reformat gene index
gene2index = dict([(gene, index) for index, gene in index2gene.items()])
# Identify genes in the given list that are also in the network
genelist = [g for g in genelist if g in gene2index.keys()]
indices = [gene2index[g]-start_index for g in genelist]
if not quiet: print "\t- Genes in list and network:", len( indices )
# Create an induced influence matrix
M = np.zeros( (len(genelist), len(genelist)) )
for i in range(len(indices)):
M[i,] = infmat[gene2index[genelist[i]] - start_index, indices]
# Create new gene index and score function
index2gene = dict([(i, genelist[i]) for i in range(len(genelist))])
return M, index2gene
def heat_vec(gene2heat, index2gene):
"""Create and return a linear ndarray of heat scores ordered by gene index.
Arguments:
gene2heat -- dict mapping a gene name to the heat score for that gene
index2gene -- dict mapping an index in the matrix to the name of the gene represented at that
index in the influence matrix. Heat scores will only be included in the returned
ndarray for entries in this dict.
"""
v = [gene2heat[gene] for _, gene in sorted(index2gene.iteritems())]
return np.array(v)
def similarity_matrix(M, heat):
"""Create and return a similarity matrix from the given influence matrix and heat vector.
Arguments:
M -- 2D ndarray representing the induced influence matrix obtained from induce_infmat
heat -- 1D ndarray representing the heat score vector obtained from heat_vec
"""
M = np.minimum(M, M.transpose()) #ensure that the influence matrix is symmetric
sim = np.empty_like(M)
for i in range(M.shape[0]):
for j in range(M.shape[1]):
sim[i][j] = max(heat[i], heat[j]) * M[i][j]
return sim
################################################################################
# Weighted graph functions
def weighted_graph(sim_mat, index2gene, delta):
"""Construct and return weighted graph in which nodes are labeled with gene names and edges
between nodes have weight equal to the similarity score of the two genes.
Arguments:
sim_mat -- similarity matrix obtained from similarity_matrix
index2gene -- dict mapping an index in the matrix to the name of the gene represented at that
index in the similarity matrix
delta -- weight threshold for edge inclusion. Only gene pairs with similarity >= delta will
have edges connecting their corresponding nodes in the returned graph.
"""
e = zip( *sp.where(sim_mat >= delta))
edges = [(int(j), int(i), dict(weight=sim_mat[i,j])) for i, j in e]
G = nx.Graph()
G.add_edges_from([(index2gene[i], index2gene[j], d) for i, j, d in edges])
return G
def connected_components(G, min_size=1):
"""Find connected components in the given graph and return as a list of lists of gene names.
If the graph contains no connected components of size at least min_size, an empty list is returned.
Arguments:
G -- weighted graph in which connected components should be found
min_size -- minimum size for connected components included in the returned component list
"""
ccs = [cc for cc in nx.connected_components(G) if len(cc) >= min_size]
return ccs
def component_sizes(ccs):
"""Return dict mapping a CC size to the number of connected components of that size.
Only sizes for which there is at least one connected component of that size will have an entry
in the returned dict. If the given component list is empty, an empty dict is returned.
Arguments:
ccs -- list of lists of gene names representing connected components
"""
size_dict = defaultdict(int)
for cc in ccs:
size_dict[len(cc)] += 1
return size_dict