In [38]:
import time, enum, math
import panel as pn
from panel import widgets as pnw
import random
import numpy as np
from numpy.random import choice
import pandas as pd
import pylab as plt
import networkx as nx
import itertools
from mesa import Agent, Model
from mesa.time import RandomActivation
from mesa.space import NetworkGrid
from mesa.datacollection import DataCollector
from matplotlib.colors import ListedColormap, LinearSegmentedColormap
import bokeh.palettes
from bokeh.plotting import figure, show
from bokeh.models import Legend, LegendItem, ColumnDataSource, Grid, Line, LinearAxis, Plot
from sklearn import preprocessing
import copy
from IPython.display import display
from ipywidgets import FloatSlider, IntSlider
from ipywidgets import interact, interact_manual
import ipywidgets as widgets
import statistics

### General functions

In [39]:
def model_data(model):
    '''retrieve data from model'''
    data = model.datacollector.get_model_vars_dataframe()
    data = data.reset_index()
    data = data.rename(columns={'index':'Step'})
    data = data.set_index('Step')
    return data

def tournament_selection(choices, model, own_node_nr):
    '''tournament selection of neighbor'''
    if len(choices) >= 3:
        top_3_nbs = random.sample(choices, 3)
        their_weights = [model.G[nb][own_node_nr]['weight'] for nb in top_3_nbs]
        nec_dict = {k:v for k,v in zip(top_3_nbs,their_weights)}
        nec_dict_sorted = dict(sorted(nec_dict.items(), key=lambda item: item[1]))
        if choice(['y','n'], 1, p=[0.5,0.5])[0] == 'y':
            return [list(nec_dict_sorted.keys())[0]]
        elif choice(['y','n'], 1, p=[0.5*(1-0.5),1-(0.5*(1-0.5))])[0] == 'y':
            return [list(nec_dict_sorted.keys())[1]]
        else:
            return [list(nec_dict_sorted.keys())[2]]
    elif len(choices) == 2:
        their_weights = [model.G[nb][own_node_nr]['weight'] for nb in choices]
        nec_dict = {k:v for k,v in zip(choices,their_weights)}
        nec_dict_sorted = dict(sorted(nec_dict.items(), key=lambda item: item[1]))
        if choice(['y','n'], 1, p=[0.5,0.5])[0] == 'y':
            return [list(nec_dict_sorted.keys())[0]]
        else:
            return [list(nec_dict_sorted.keys())[1]]
    elif len(choices) == 1:
        return [choices[0]]
    else:
        return []
    
def fans_percentage(model):
    '''used to calculate: what % of public are fan of the 20% most popular influencers'''
    total_dict = [{k: v for k, v in vars(a).items() if (k != 'unique_id' and k != 'model' and k != 'pos')} for a in model.influencees]
    # if nobody has a score, return 0% --> there are no fans
    if (all(value == True for value in [all(value == 0 for value in dic.values()) for dic in total_dict])):
        return 0
    # for each influenced: what influencer is their favorite
    influenced_fav = [int(''.join(filter(str.isdigit, max(dic, key=dic.get)))) for dic in total_dict]
    fanbase_dict = {}
    # for each influencer: count how often they are a favorite
    for influencer in model.influencers:
        fanbase_dict[influencer.unique_id] = influenced_fav.count(influencer.unique_id) 
    # sort dictionary
    fanbase_dict_sorted = dict(sorted(fanbase_dict.items(), key=lambda item: item[1], reverse=True))
    # most adored 20% of influencers (rounded up)
    most_pop_infl = math.ceil(len(fanbase_dict_sorted)/5)
    # fans in hands of top 20% of influencers
    nr_of_fans = sum(dict(itertools.islice(fanbase_dict_sorted.items(), most_pop_infl)).values())
    # percentage
    fanbase_perc = nr_of_fans/len(model.influencees)
    return round(fanbase_perc*100,2)

def run_social_model(runs,fill_val,nr_steps,N,pos_sig_atn,mab,neg_sig_atn,max_pos_sig_str,neg_sig_str,neg_sig_prob,frac_size,nb_parameter):
    '''run the plant propagation model with user-given parameter values and nr of runs'''
    store_results = []
    for _ in range(runs):
        model = SocialModel(N,pos_sig_atn,mab,neg_sig_atn,max_pos_sig_str,neg_sig_str,neg_sig_prob,frac_size,nb_parameter)
        #step through the model and plot at each step
        for i in range(nr_steps):
            model.step() 
        
        run_results_y = list(model_data(model).iloc[:,0])
        store_results.append(run_results_y)
            
    f = plt.figure()
    f.set_figwidth(10)
    f.set_figheight(6)
    
    if fill_val == 'lines' or runs == 1:
        for run_results_yy in store_results:
            # print the lines of the multiple runs
            plt.plot([*range(0, nr_steps+1, 1)],[0]+run_results_yy,linewidth=3)
    else:
        means = []
        stdevs = []
        minima = []
        maxima = []
        for item in zip(*store_results):
            means.append(statistics.mean(list(item)))
            stdevs.append(statistics.stdev(list(item)))
            minima.append(min(item))
            maxima.append(max(item))
        # plot range of vals, mean, stdev
        plt.fill_between([*range(0, nr_steps+1, 1)], [0]+minima,[0]+maxima,color='k',alpha=.3, label='range of values')
        plt.errorbar([*range(0, nr_steps+1, 1)], [0]+means, [0]+stdevs, marker='.', label='\u03BC and \u03C3')
        
    plt.xticks([0,*range(10, nr_steps+1, 10)])
    plt.yticks([*range(0, 101, 10)])
    plt.ylabel('% of watchers')
    plt.xlabel('Step')
    plt.title('% of watchers that are dominated by most popular 20% of SMIs per step in model ({} runs)'.format(runs))
    plt.legend(loc="best")
    plt.show()
    
def parameter_space_social_model(runs,nr_steps,N,pos_sig_atn,mab,neg_sig_atn,max_pos_sig_str,neg_sig_str,neg_sig_prob,frac_size,neighbors):
    '''test the individual parameters'''
    args = locals()
    del args['runs']
    argsCOPY = copy.deepcopy(args)
    all_runs = []
    for _ in range(runs):
        results = []
        x_axis = [v for v in argsCOPY.values() if type(v) == list][0]
        param_name = [k for k,v in argsCOPY.items() if type(v) == list][0]
        al = [v for v in argsCOPY.values()]    
        for j in range(len(x_axis)):
            inputt = []
            for i in al:
                if type(i) == list:
                    inputt.append(i[j])
                else:
                    inputt.append(i)

            model = SocialModel(*inputt[1:])
            for _ in range(inputt[0]):
                model.step()
            percentage = model_data(model).iloc[-1][0]
            results.append(percentage)
        all_runs.append(results)
        
    f = plt.figure()
    f.set_figwidth(10)
    f.set_figheight(6)
    
    means = []
    stdevs = []
    maxima = []
    minima = []
    # if param values are cardinal
    if type(x_axis[0]) == int or type(x_axis[0]) == float:
        for item in zip(*all_runs):
            means.append(statistics.mean(list(item)))
            stdevs.append(statistics.stdev(list(item)))
            minima.append(min(item))
            maxima.append(max(item))
        # plot range of vals, mean, stdev
        plt.fill_between(x_axis, minima, maxima,color='k',alpha=.2, label='range of values')
        plt.errorbar(x_axis, means, stdevs, marker='o', label='\u03BC and \u03C3')
        
    else:
        for item in zip(*all_runs):
            means.append(statistics.mean(list(item)))
            stdevs.append(statistics.stdev(list(item)))
        # plot barplot
        plt.bar(x_axis, means, label='\u03BC')
        plt.errorbar(x_axis, means, stdevs, marker='.', linestyle='None', label='\u03C3', color='black')
        
    
    plt.xlabel(f'{param_name} parameter')
    plt.ylabel('% of watchers')
    plt.title('% of watchers that are dominated by most popular 20% of SMIs per parameter value (avg. over {} runs; {} steps per run)'.format(runs,nr_steps))
    plt.xticks(x_axis)
    plt.yticks([*range(0,110,10)])
    plt.legend(loc="best")
    plt.show()

### Class for agents

In [40]:
class SocialAgent(Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
    
    """INFLUENCERS: SEND OUT POSITIVE PULSE"""
    def send_pulse(self, influencer_id, sent_pos_pulse, increased_score, sig):
        
        nbs_agents = self.model.grid.get_cell_list_contents(self.model.grid.get_neighbors(self.pos, include_center=False))
        distances = [self.model.G[self.unique_id][nb.pos]['weight'] for nb in nbs_agents] 
        
        # only 1 neighbor
        if self.model.nb_parameter == 1 and len(nbs_agents) > 0:
            nbs_agents = [random.choice(nbs_agents)]
            distances = [self.model.G[self.unique_id][nbs_agents[0].pos]['weight']]
            
        # tournament selection
        elif self.model.nb_parameter == 2:
            nbs = tournament_selection(self.model.grid.get_neighbors(self.pos, include_center=False), self.model, self.pos)
            distances = [self.model.G[self.unique_id][nbs[0]]['weight']]
            nbs_agents = self.model.G.nodes[nbs[0]]['agent']
            
        # random fraction of neighbors
        elif self.model.nb_parameter == 3:
            nbs_agents = random.sample(nbs_agents, random.randrange(0,len(nbs_agents)+1))
            distances = [self.model.G[self.unique_id][nb.pos]['weight'] for nb in nbs_agents]
        
        # spread the pulse
        for agent, distance in zip(nbs_agents, distances):
            # if agent didn't already have their score updated for this influencer this step
            if agent.unique_id not in increased_score:
                # increase interest with signal strength * attenuation parameter
                setattr(agent, 'influencer{}'.format(influencer_id), getattr(agent, 'influencer{}'.format(influencer_id)) + (sig*self.model.pos_sig_atn))
                increased_score.append(agent.unique_id)
            sent_pos_pulse.append(self.unique_id)
            # if agent didn't already send a pulse for this influencer this step
            # and signal wil be bigger than 1
            if agent.unique_id not in sent_pos_pulse and sig*self.model.pos_sig_atn >= 1:
                # spread signal with signal strength * attenuation parameter
                agent.send_pulse(influencer_id, sent_pos_pulse, increased_score, sig=sig*self.model.pos_sig_atn)
                
    """INFLUENCEES: SEND OUT NEGATIVE PULSE"""
    def send_neg_pulse(self, influencer_id, sent_neg_pulse, decreased_score, sig):
        
        nbs_agents = self.model.grid.get_cell_list_contents(self.model.grid.get_neighbors(self.pos, include_center=False))
        distances = [self.model.G[self.unique_id][nb.pos]['weight'] for nb in nbs_agents] 
        
        # only 1 neighbor
        if self.model.nb_parameter == 1:
            nbs_agents = [random.choice(nbs_agents)]
            distances = [self.model.G[self.unique_id][nbs_agents[0].pos]['weight']]
            
        # tournament selection
        elif self.model.nb_parameter == 2:
            nbs = tournament_selection(self.model.grid.get_neighbors(self.pos, include_center=False), self.model, self.pos)
            distances = [self.model.G[self.unique_id][nbs[0]]['weight']]
            nbs_agents = self.model.G.nodes[nbs[0]]['agent']
            
        # random fraction of neighbors
        elif self.model.nb_parameter == 3:
            nbs_agents = random.sample(nbs_agents, random.randrange(0,len(nbs_agents)+1))
            distances = [self.model.G[self.unique_id][nb.pos]['weight'] for nb in nbs_agents]

        # spread the pulse
        for agent, distance in zip(nbs_agents, distances):
            # if agent didn't already have their score updated for this influencer this step
            if agent.unique_id not in decreased_score:
                # decrease interest with signal strength * attenuation parameter
                setattr(agent, 'influencer{}'.format(influencer_id), getattr(agent, 'influencer{}'.format(influencer_id)) - (sig*self.model.neg_sig_atn))
                decreased_score.append(agent.unique_id)
            sent_neg_pulse.append(self.unique_id)
            # if agent didn't already send a pulse for this influencer this step
            # and signal wil be bigger than 1
            if agent.unique_id not in sent_neg_pulse and sig*self.model.neg_sig_atn >= 1:
                # spread signal with signal strength * attenuation parameter
                agent.send_neg_pulse(influencer_id, sent_neg_pulse, decreased_score, sig=sig*self.model.neg_sig_atn)
                
    def decrease_own_interest(self, influencer_id):
    # agent that sends out negative pulse: decreases own interest of influencer
        setattr(self, 'influencer{}'.format(influencer_id), getattr(self, 'influencer{}'.format(influencer_id)) - self.model.neg_sig_str)

    def step(self):
        # influencer agents: positive signal
        if self in self.model.influencers:
            self.send_pulse(self.unique_id, sent_pos_pulse=[], increased_score=[], sig=self.sig_str)
        # influencees: negative signal with a predefined strength
        else:
            # probability to send out negative signal
            if choice([1,0], 1, p=[self.model.neg_sig_prob,1-self.model.neg_sig_prob])[0] == 1:
                # which influencer to send negative signal about
                influencer_id = random.choice(self.model.influencers_ids)
                # decrease own interest of that influencer and spread negative signal
                self.decrease_own_interest(influencer_id)
                self.send_neg_pulse(influencer_id, sent_neg_pulse=[], decreased_score=[], sig=self.model.neg_sig_str)
        

### Class for model

In [41]:
class SocialModel(Model):

    def __init__(self, N, pos_sig_atn, mab, neg_sig_atn, max_pos_sig_str, neg_sig_str, neg_sig_prob, frac_size, nb_parameter):
        
        self.G = nx.barabasi_albert_graph(N,mab)
        # number of nodes
        N = len(self.G.nodes())
        # parameters
        # how much a positive signal weakens
        self.pos_sig_atn = pos_sig_atn
        # how much a negative signal weakens
        self.neg_sig_atn = neg_sig_atn
        # max positive signal strength
        self.max_pos_sig_str = max_pos_sig_str
        # negative signal strength
        self.neg_sig_str = neg_sig_str
        # probability to start a negative signal
        self.neg_sig_prob = neg_sig_prob
        # % of nodes that become an influencer
        self.frac_size = frac_size
        if self.frac_size <= 0:
            raise ValueError('Please input a valid value for fraction size')
        # interact with random nb, random fraction, or all nbs
        if nb_parameter == 'one':
            self.nb_parameter = 1
        elif nb_parameter == 'tournament':
            self.nb_parameter = 2
        elif nb_parameter == 'fraction':
            self.nb_parameter = 3
        elif nb_parameter == 'all':
            self.nb_parameter = 4
        else:
            raise ValueError('"neighbors" parameter does not exist.')
        
        self.grid = NetworkGrid(self.G)
        self.schedule = RandomActivation(self)
        
        for u, v in self.G.edges():
            # add random weights between 0 and 1
            self.G[u][v]["weight"] = random.random()
        
        # create agents by looping over nodes; uniqe id == position
        for i, node in enumerate(sorted(self.G.nodes())):
            a = SocialAgent(i, self)
            self.schedule.add(a)
            self.grid.place_agent(a, node)
        
        # sort nodes/agent_ids by degree centrality
        dict_betw = dict(sorted(nx.degree_centrality(self.G).items(), key=lambda item: item[1], reverse=True))
        # get the _ most central nodes/agent_ids; the influencers
        self.influencers_ids = list(dict(itertools.islice(dict_betw.items(), int(N*self.frac_size))).keys())
        # get the remaining nodes/agent_ids; the watchers
        self.influencees_ids = list(set(a.unique_id for a in self.schedule.agents) - set(self.influencers_ids))
        # get the agents using the influencer/watcher ids/nodes
        self.influencers = [self.schedule.agents[i] for i in self.influencers_ids]
        self.influencees = [self.schedule.agents[i] for i in self.influencees_ids]
        
        # initiate interest of influencer_ids for each agent
        for agent in self.schedule.agents:
            for influencer in self.influencers_ids:
                setattr(agent, 'influencer{}'.format(influencer), 0)
        # initiate positive signal strength (from 1 to max) for each influencer agent
        for influencer in self.influencers:
            if self.max_pos_sig_str > 0:
                influencer.sig_str = random.randint(1,self.max_pos_sig_str)
            else:
                influencer.sig_str = 0
                                   
        self.datacollector = DataCollector(   
                             model_reporters= {"% of influenced that are fan of the 20% most popular influencers": fans_percentage}
                            )

    def step(self): 
        self.schedule.step()
        self.datacollector.collect(self)

### Visualisation of steps and results for single model run

Widget for running the model with different parameter values

In [36]:
interact_manual(run_social_model,
                runs=IntSlider(min=1, max=50, step=1, value=1),
                fill_val=['fill','lines'],
                nr_steps=IntSlider(min=0, max=200, step=10, value=150),
                N=IntSlider(min=0, max=500, step=20, value=500),
                pos_sig_atn=FloatSlider(min=0, max=1, step=0.1, value=0.8),
                mab=IntSlider(min=1, max=10, step=1, value=3),
                neg_sig_atn=FloatSlider(min=0, max=1, step=0.1, value=0.8),
                max_pos_sig_str=IntSlider(min=0, max=20, step=1, value=10),
                neg_sig_str=IntSlider(min=0, max=20, step=1, value=5),
                neg_sig_prob=FloatSlider(min=0, max=1, step=0.05, value=0.2),
                frac_size=FloatSlider(min=0, max=1, step=0.1, value=0.1),
                nb_parameter=['one','tournament','fraction','all']);

interactive(children=(IntSlider(value=1, description='runs', max=50, min=1), Dropdown(description='fill_val', …

### Individual parameter testing

Make a list with (valid!) values for the parameter that you want to test. Then input into the function, as below:

In [None]:
social_model_inputs = [0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1]
parameter_space_social_model(runs=10,nr_steps=150,N=500,pos_sig_atn=0.8,mab=3,neg_sig_atn=social_model_inputs,
                max_pos_sig_str=10,neg_sig_str=5,neg_sig_prob=0.2,frac_size=0.1,neighbors='fraction')

In [None]:
social_model_inputs = ['one', 'tournament', 'fraction', 'all']
parameter_space_social_model(runs=10,nr_steps=150,N=500,pos_sig_atn=0.8,mab=3,neg_sig_atn=0.8,
                max_pos_sig_str=10,neg_sig_str=5,neg_sig_prob=0.2,frac_size=0.1,neighbors=social_model_inputs)