Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel Computation of Transition Matrix #19

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 85 additions & 24 deletions pathpy/HigherOrderNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import scipy.linalg as _la
import scipy as _sp

import functools as fu
import multiprocessing as _mp

from pathpy.Log import Log
from pathpy.Log import Severity

Expand All @@ -53,7 +56,7 @@ class HigherOrderNetwork:
"""


def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrderTransitions', lanczosVecs=15, maxiter=1000):
def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrderTransitions', lanczosVecs=15, maxiter=1000, n_cores = 1):
"""
Generates a k-th-order representation based on the given path statistics.

Expand All @@ -78,6 +81,8 @@ def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrde
a k-order edge is set to the transition probability T['v_k', 'v_k+1'] in the first order network.
For method='KOrderPi' the entry pi['v1-...-v_k'] in the stationary distribution of the
k-order network is used instead.

@param n_cores: for some calculations Parallel Computing on multiple cores is used. This parameter determines the number of cores used
"""

assert nullModel == False or (nullModel and k>1)
Expand Down Expand Up @@ -108,6 +113,9 @@ def __init__(self, paths, k=1, separator='-', nullModel=False, method='FirstOrde
## A dictionary containing the sets of predecessors of all nodes
self.predecessors = _co.defaultdict( lambda: set() )

## number of cores used in calculating the transition matrix
self.cores = n_cores

if k>1:
# For k>1 we need the first-order network to generate the null model
# and calculate the degrees of freedom
Expand Down Expand Up @@ -864,6 +872,41 @@ def getAdjacencyMatrix(self, includeSubPaths = True, weighted = True, transposed
return _sparse.coo_matrix( (data, (row, col)), shape=(self.vcount(), self.vcount()) ).tocsr()


def partialTransitionMatrix(node_data, degree_data, includeSubPaths, chunk):
"""
takes a chunk of the self.edges dictionary and returns its part of the sparse matrix
row, data, col
"""

nodes = node_data.copy()
D = degree_data

row = []
col = []
data = []

for index, (s, t) in enumerate(chunk):
value = chunk[(s, t)]
if (value[1] > 0) or (includeSubPaths and value[0]>0):
row.append(nodes.index(t))
col.append(nodes.index(s))

if includeSubPaths:
count = value.sum()
else:
count = value[1]

assert D[nodes.index(s)]>0, 'Encountered zero out-degree for node ' + str(s) + ' while weight of link (' + str(s) + ', ' + str(t) + ') is non-zero.'
prob = count / D[nodes.index(s)]
if prob < 0 or prob > 1:
tn.Log.add('Encountered transition probability outside [0,1] range.', Severity.ERROR)
raise ValueError()
data.append( prob )

#print(index/len(chunk), end = '\r', flush = True)

return [row, col, data]


def getTransitionMatrix(self, includeSubPaths=True):
"""
Expand All @@ -873,31 +916,49 @@ def getTransitionMatrix(self, includeSubPaths=True):
@param includeSubpaths: whether or not to include subpath statistics in the
transition probability calculation (default True)
"""
row = []
col = []
data = []
D = self.degrees(includeSubPaths=includeSubPaths, weighted=True, mode='OUT')
for (s,t) in self.edges:
# either s->t has been observed as a longest path, or we are interested in subpaths as well

# the following makes sure that we do not accidentially consider zero-weight edges (automatically added by default_dic)
if (self.edges[(s,t)][1] > 0) or (includeSubPaths and self.edges[(s,t)][0]>0):
row.append(self.nodes.index(t))
col.append(self.nodes.index(s))
if includeSubPaths:
count = self.edges[(s,t)].sum()

edges_length= len(self.edges)

n_chunks = self.cores

### Splits dictionary into roughly equal parts which will be passed on to a worker pool
def split_dict_equally(input_dict, chunks=n_chunks):
"""Splits dict by keys. Returns a list of dictionaries."""
"""
Credit to http://enginepewpew.blogspot.ch/2012/03/splitting-dictionary-into-equal-chunks.html
"""
# prep with empty dicts
return_list = [dict() for idx in range(chunks)]
idx = 0
for k,v in input_dict.items():
return_list[idx][k] = v
if idx < chunks-1: # indexes start at 0
idx += 1
else:
count = self.edges[(s,t)][1]
#print((s,t))
#print(D[self.nodes.index(s)])
#print(count)
assert D[self.nodes.index(s)]>0, 'Encountered zero out-degree for node ' + str(s) + ' while weight of link (' + str(s) + ', ' + str(t) + ') is non-zero.'
prob = count / D[self.nodes.index(s)]
if prob < 0 or prob > 1:
tn.Log.add('Encountered transition probability outside [0,1] range.', Severity.ERROR)
raise ValueError()
data.append( prob )

idx = 0
return return_list


# split the dictionary into smaler chunks to be passed to the processingpool
dicts = split_dict_equally(self.edges, n_chunks)

# partially calculate some functions, so that it may be passed to the pool
partiallyCalculated = fu.partial(HigherOrderNetwork.partialTransitionMatrix, self.nodes, D, includeSubPaths)

a = 0
# initializing the processingPool
with _mp.Pool(n_chunks) as pool:
# calculating the elements for the sparse matrix
a = pool.map(partiallyCalculated, dicts)
pool.close()

# assigning the correct data to the variables, due to the way the amag().get() returns values
row = _np.hstack([a[i][0] for i in range(n_chunks)])
col = _np.hstack([a[i][1] for i in range(n_chunks)])
data = _np.hstack([a[i][2] for i in range(n_chunks)])


data = _np.array(data)
data = data.reshape(data.size,)

Expand Down