# Running influence function on Spark

In [1]:
# Import libraries
%matplotlib inline
import matplotlib.pyplot as plt
import time
import numpy as np
from numpy import random
import math
import scipy
from scipy import stats
from random import choice
import networkx as nx
import json
from networkx.readwrite import json_graph
import line_profiler
import IPython
ip = IPython.get_ipython()
ip.define_magic('lprun', line_profiler.magic_lprun)

In [2]:
# Load network graph
with open("graph/nc_mini.json", "r") as graph_data:
    graph_data = json.load(graph_data)
    NC_digraph = json_graph.node_link_graph(graph_data)

In [3]:
import findspark
import os
findspark.init()
import pyspark
sc = pyspark.SparkContext()

In [6]:
def activateNodesSpark(detStartNodes):

    nx.set_node_attributes(NC_digraph, 'activated', False)
    nx.set_node_attributes(NC_digraph, 'explored', False)
    
    activated = 0
    nodes = []
    
    for n in detStartNodes:
        nodes.append(n)
        NC_digraph.node[n]['activated'] = True
        activated = activated + 1
    
    start = nodes[0]

    while len(nodes)>0:

        startNode = nodes[0]

        if NC_digraph.node[startNode]['explored']==False:

            NC_digraph.node[startNode]['explored'] = True

            successors = []

            for succNode in NC_digraph.succ[startNode]:

                if NC_digraph.node[succNode]['activated']==False:

                    alpha = NC_digraph[startNode][succNode]['weight']
                    beta = NC_digraph.node[succNode]['review_count']

                    randUnif = random.uniform(0,1)
                    randBeta = np.sqrt(random.beta(alpha, beta))

                    if randUnif < randBeta:
                        NC_digraph.node[succNode]['activated'] = True
                        successors.append(succNode)
                        activated = activated + 1

        nodes = nodes[1:]
        nodes = nodes + successors

    return activated

def activateNodesLoopSpark(N, startNodes):
    
    detStartNodes = startNodes
    random.seed()
    
    result = []

    for n in xrange(N):
        result.append(float(activateNodesSpark(detStartNodes)))
        
    return np.mean(result)

In [7]:
random.seed()
startNodes = sc.parallelize(NC_digraph.nodes(), 4)
activatedNodes = startNodes.map(lambda x: (x, activateNodesLoopSpark(1000, [x])))
maxNode = activatedNodes.takeOrdered(1, lambda x: -x[1])
print maxNode

[(u'NzWLMPvbEval0OVg_YDn4g', 28.466999999999999)]
