# Influence function - standard error

In [1]:
# Import libraries
%matplotlib inline
import matplotlib.pyplot as plt
import time
import numpy as np
from numpy import random
import math
import scipy
from scipy import stats
from random import choice
import networkx as nx
import json
from networkx.readwrite import json_graph
import line_profiler
import IPython
ip = IPython.get_ipython()
ip.define_magic('lprun', line_profiler.magic_lprun)

In [2]:
import findspark
import os
findspark.init()
import pyspark
sc = pyspark.SparkContext()

In [3]:
# Load network graph
with open("graph/nc_full.json", "r") as graph_data:
    graph_data = json.load(graph_data)
    NC_digraph = json_graph.node_link_graph(graph_data)

### Calculate variance of simulation results

In [4]:
def influenceFunction(detStartNodes, t=999999):

    explored = set()
    activated = set()
    nodes = []
    for node in detStartNodes:
        nodes.append(node)
        activated.add(node)
    
    start = detStartNodes[0]
    successors = []

    for i in range(t):
        successors = []
        while len(nodes)>0:
            startNode = nodes[0]
            if startNode not in explored:
                explored.add(startNode)
            for succNode in NC_digraph.succ[startNode]:
                if succNode not in activated:
                    alpha = NC_digraph[startNode][succNode]['weight']
                    beta = NC_digraph.node[succNode]['review_count']
                    if random.uniform(0,1) < np.sqrt(random.beta(alpha, beta)):
                        if succNode not in explored:
                            activated.add(succNode)
                            successors.append(succNode)
            nodes.remove(startNode)
        nodes = successors
        if not nodes:
            break
    return len(activated)

In [5]:
def influenceFunctionNotPar(detStartNodes, N, t=999999):
    result = []
    for n in xrange(N):
        result.append(float(influenceFunction(detStartNodes, t)))
    return np.mean(result)

In [6]:
def influenceFunctionPar(detStartNodes, N, t=999999):
    runs = sc.parallelize(range(N), 4)
    results = runs.map(lambda x: influenceFunction(detStartNodes, t))
    return np.mean(results.collect())

In [7]:
def greedySearch(k=3, N=1000, t=999999):
    
    best_s = []
    max_inf = 0
    nodeRDD = sc.parallelize(list(set(list(sum(NC_digraph.edges(), ())))), 4)
#     nodeRDD = sc.parallelize(NC_digraph.nodes())
    
    for i in range(k):
        infRDD = nodeRDD.map(lambda n: (n, 0.) if n in best_s else (n, influenceFunctionNotPar(best_s + [n], N)))
        next_s, next_i = infRDD.reduce(lambda a,b: a if a[1] > b[1] else b)
        print next_s
        best_s += [next_s]
        max_inf = next_i
    
    return best_s,max_inf

In [10]:
# Test influence function for given start nodes
start = ['NzWLMPvbEval0OVg_YDn4g','ts7EG6Zv2zdMDg29nyqGfA','VhI6xyylcAxi0wOy2HOX3w']

print "No cap on t (serial):"
%time print influenceFunctionNotPar(start, 1)

print "t capped at 10 (serial):"
%time print influenceFunctionNotPar(start, 1, 10)

print "t capped at 10 (parallel):"
%time print influenceFunctionNotPar(start, 1, 10)

No cap on t (serial):
8928.0
CPU times: user 352 ms, sys: 12.4 ms, total: 364 ms
Wall time: 474 ms
t capped at 10 (serial):
8946.0
CPU times: user 288 ms, sys: 3.6 ms, total: 292 ms
Wall time: 462 ms
t capped at 10 (parallel):
8940.0
CPU times: user 270 ms, sys: 808 µs, total: 270 ms
Wall time: 355 ms


In [9]:
# Test greedy algorithm

print "No cap on t:"
%time print greedySearch(3, 1)

print "t capped at 10:"
%time print greedySearch(3, 1, 10)

No cap on t:
woOvQPOamYerXTjo8g4yxg


KeyboardInterrupt: 