In [1]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import networkx as nx
import networkx.algorithms.bipartite as bipartite
from multiprocessing import Pool
import random
import itertools
import pandas as pd
from collections import Counter
from joblib import Parallel, delayed
import sys
from numba import njit,jit
from numba.typed import Dict,List
import numba as nb
from scipy.special import factorial
sys.path.append('lib/')
from degree_seq_bipartite import bipartite_degree_seq
import time


module loaded at  2020-06-23 08:06:53.508347


#### This test shows that parallel version in numba is slower

In [24]:
N_a = 50_000# n people
N_b = 50_000 #n clusters
a_mean = 4. #average degree of the distribution
#b_mean = (a_mean )* N_a / N_b 
#aseq,bseq = bipartite_degree_seq(N_a,N_b,'shifted_poisson','shifted_poisson',{'lam':a_mean},{'lam':N_a*a_mean/N_b})
aseq,bseq = bipartite_degree_seq(N_a,N_b,'poisson','poisson',{'lam':a_mean},{'lam':N_a*a_mean/N_b})
BG = bipartite.generators.configuration_model(aseq,bseq)
BG = nx.Graph(BG)#convert multilinks to simple 

<built-in method poisson of numpy.random.mtrand.RandomState object at 0x7f816e1dcaf0>


In [25]:
@njit(parallel=True)
def single_instance_numba_parallel(neigh,states,N_a,N_b,p,N_iterations=20):
    individuals = set(range(N_a)).intersection(set(neigh.keys()))
    clusters = set(range(N_a,N_a+N_b)).intersection(set(neigh.keys()))
    #start = time.time()
    for count in range(N_iterations):    #solving cavity equations through forward dynamics
        err = 0
        for j in individuals:#solving 3rd equation
                for idx,mu in enumerate(neigh[j]):
                    new=1.0
                    for nu in set(neigh[j])-{mu}:
                        new*=1-states[nu][neigh[nu]==j][0]
                    err = max(abs(states[j][idx]-1+new),err)
                    states[j][idx]= 1-new
        for mu in clusters: #running 2nd equation
                for idx,i in enumerate(neigh[mu]):
                    new=1.
                    for j in set(neigh[mu])-{i}:
                        new*=(1-p*states[j][neigh[j]==mu][0])
                    err = max(abs(1-new-states[mu][idx]),err)
                    states[mu][idx]= 1-new
        if err<0.001:
            print('Exit after ',count,' iterations')
            break
    if count==N_iterations-1:
        print('Attention! Maximum number of iteration reached')

    #end = time.time()
    #print('it took',end-start,' seconds')

    risk ={}
    for i in individuals:#solving 3rd equation
        new=1
        for mu in set(neigh[i]):
            if len(states[mu][neigh[mu]==i])!=1:#sanity check
                print('mu=',mu,'i=',i)
                raise ValueError('Sanity check failed!')
            new*=(1-states[mu][neigh[mu]==i][0])
        risk[i]= 1-new
    return risk
@njit()
def single_instance_numba(neigh,states,N_a,N_b,p,N_iterations=20):
    individuals = set(range(N_a)).intersection(set(neigh.keys()))
    clusters = set(range(N_a,N_a+N_b)).intersection(set(neigh.keys()))
    #start = time.time()
    for count in range(N_iterations):    #solving cavity equations through forward dynamics
        err = 0
        for j in individuals:#solving 3rd equation
                for idx,mu in enumerate(neigh[j]):
                    new=1.0
                    for nu in set(neigh[j])-{mu}:
                        new*=1-states[nu][neigh[nu]==j][0]
                    err = max(abs(states[j][idx]-1+new),err)
                    states[j][idx]= 1-new
        for mu in clusters: #running 2nd equation
                for idx,i in enumerate(neigh[mu]):
                    new=1.
                    for j in set(neigh[mu])-{i}:
                        new*=(1-p*states[j][neigh[j]==mu][0])
                    err = max(abs(1-new-states[mu][idx]),err)
                    states[mu][idx]= 1-new
        if err<0.001:
            print('Exit after ',count,' iterations')
            break
    if count==N_iterations-1:
        print('Attention! Maximum number of iteration reached')

    #end = time.time()
    #print('it took',end-start,' seconds')

    risk ={}
    for i in individuals:#solving 3rd equation
        new=1
        for mu in set(neigh[i]):
            if len(states[mu][neigh[mu]==i])!=1:#sanity check
                print('mu=',mu,'i=',i)
                raise ValueError('Sanity check failed!')
            new*=(1-states[mu][neigh[mu]==i][0])
        risk[i]= 1-new
    return risk
def Convert_typed(tup):
    ''' for undirected edges. I construct the neighbouring relationship.
    Returns:
        di: dictionary.
    Notes:
    For link (a,b)  it returns the di[a]+=[b], and di[b]+=[a]
    '''
    d_typed = Dict.empty(
            key_type=nb.types.int64,
            value_type=nb.types.int64[:],
        )
    for a, b in tup:
        if a not in d_typed:
            d_typed[a]=np.array([b])
        else:
            d_typed[a]=np.append(d_typed[a],b)
        if b not in d_typed:
            d_typed[b]=np.array([a])
        else:
            d_typed[b]=np.append(d_typed[b],a)
            
    '''     
    for key, value in di.items():
        d_typed[key] = np.array(value,dtype=int) 
    '''
    return d_typed
def dict_to_typed_states(neigh):
    ''' for undirected edges. I construct the neighbouring relationship.
    Returns:
        di: dictionary.
    Notes:
    For link (a,b)  it returns the di[a]+=[b], and di[b]+=[a]
    '''
    d_typed = Dict.empty(
            key_type=nb.types.int64,
            value_type=nb.types.float32[:],
        )

    for key, value in neigh.items():
        d_typed[key] = np.ones(len(value),dtype=np.float32) 
    return d_typed
        
def Convert(tup, di=None):
    ''' for undirected edges. I construct the neighbouring relationship.
    Returns:
        di: dictionary.
    Notes:
    For link (a,b)  it returns the di[a]+=[b], and di[b]+=[a]
    '''
    if di ==None:
        di={}
    for a, b in tup: 
        di.setdefault(a, []).append(b)
        di.setdefault(b, []).append(a)
    di = {key:np.array(value,dtype=int) for key, value in di.items()}
    return di 
#%time neigh = Convert(BG.edges())
#states = {key:np.ones(len(value)) for key,value in neigh.items()}#states[x][y] indicates the state of node "x cavity neigh[x][y]""


In [26]:
%time neigh = Convert_typed(list(BG.edges()))
states = dict_to_typed_states(neigh)

CPU times: user 7.77 s, sys: 148 ms, total: 7.92 s
Wall time: 7.92 s


In [27]:
p = 0.2
%time risk_parallel = single_instance_numba_parallel(neigh,states,N_a,N_b,p)
states = dict_to_typed_states(neigh)
%time risk = single_instance_numba(neigh,states,N_a,N_b,p)


Exit after  6  iterations
CPU times: user 4min 28s, sys: 37.3 s, total: 5min 5s
Wall time: 1min 51s
Exit after  6  iterations
CPU times: user 15.8 s, sys: 93.2 ms, total: 15.9 s
Wall time: 16.6 s


In [28]:
[x for x in risk_parallel.values()]==[x for x in risk.values()]

True