In [1]:
from ppsim import Simulation, StatePlotter, HistoryPlotter
from dataclasses import dataclass
import dataclasses
import numpy as np
from matplotlib import pyplot as plt
import ipywidgets as widgets
%matplotlib widget

In [2]:
def make_display(sim):
    sp, hp = StatePlotter(), HistoryPlotter()
    plt.ioff()
    sim.add_snapshot(sp)
#     sim.add_snapshot(hp)
    

    sim.layout = widgets.AppLayout(
        pane_heights = [1,10,1],
        center=sp.fig.canvas
#         right_sidebar=hp.fig.canvas
    )
    plt.ion()
    display(sim.layout)
    def set_yscale(yscale):
        for snap in sim.snapshots:
            if hasattr(snap, 'ax'):
                snap.ax.set_yscale(yscale)
                snap.update()

    yscale_button = widgets.interactive(set_yscale, yscale=widgets.ToggleButtons(options=['linear', 'symlog'], description = 'y scale:'))
    sim.layout.header=yscale_button
    sim.pane_heights = [1,10]
    
def add_time_bar(sim):
    time_bar = widgets.interactive(sim.set_snapshot_time, time=widgets.FloatSlider(min=sim.times[0], max=sim.times[-1], layout=widgets.Layout(width='100%'), step=0.01))
    sim.layout.footer=time_bar
    
def add_index_bar(sim):
    index_bar = widgets.interactive(sim.set_snapshot_index, index=widgets.IntSlider(min=0, max=len(sim.times)-1, layout=widgets.Layout(width='100%'), step=1))
    sim.layout.footer=index_bar

In [3]:
from fractions import Fraction

@dataclass(unsafe_hash=True)
class Agent:
    opinion: int = 0
    exponent: int = 0
        
    @property
    def bias(self):
        return self.opinion * 2 ** self.exponent
    
    @bias.setter
    def bias(self, value):
        if value == 0:
            self.opinion = self.exponent = 0
        else:
            self.opinion = int(np.sign(value))
            exponent = np.log2(abs(value))
            if exponent.is_integer():
                self.exponent = int(exponent)
            else:
                raise ValueError(f'bias = {value} must an integer power of 2')
    
    def __str__(self):
        if self.bias == 0:
            return '0'
        s = ''
        if self.bias > 0:
            s += '+'
        if abs(self.bias) > 1/100:
            s += str(Fraction(self.bias))
        else:
            if self.bias < 0:
                s += '-'
            s += '1/2^' + str(abs(self.exponent))
        return s
    
def init_config(a, b):
    return {Agent(opinion = 1): a, Agent(opinion = -1): b}

In [4]:
def cancel_split(a, b, L):
    a, b = dataclasses.replace(a), dataclasses.replace(b)
    
    # cancel reaction
    if a.bias == -b.bias:
        a.opinion = b.opinion = 0
        a.exponent = b.exponent = 0
    
    # split reaction
    if a.bias == 0 and abs(b.bias) > 2 ** (-L):
        a.opinion = b.opinion
        a.exponent = b.exponent = b.exponent - 1
    
    if b.bias == 0 and abs(a.bias) > 2 ** (-L):
        b.opinion = a.opinion
        b.exponent = a.exponent = a.exponent - 1
    
    return a, b

Simulation(init_config(1, 1), cancel_split, L = 4).print_reactions()

    0,    +1  -->   +1/2,  +1/2
    0,  +1/2  -->   +1/4,  +1/4
    0,  +1/4  -->   +1/8,  +1/8
    0,  +1/8  -->  +1/16, +1/16
    0,    -1  -->   -1/2,  -1/2
   +1,    -1  -->      0,     0
    0,  -1/2  -->   -1/4,  -1/4
 +1/2,  -1/2  -->      0,     0
    0,  -1/4  -->   -1/8,  -1/8
 +1/4,  -1/4  -->      0,     0
    0,  -1/8  -->  -1/16, -1/16
 +1/8,  -1/8  -->      0,     0
+1/16, -1/16  -->      0,     0


In [5]:
from itertools import product

def bias_average(a, b, L):
    a, b = dataclasses.replace(a), dataclasses.replace(b)
    
    # all allowable bias values
    biases = [0] + [2 ** i for i in range(-L,1)] + [-2 ** i for i in range(-L, 1)]
    # all pairs of bias values that preserve the sum
    legal_outputs = [(x,y) for (x,y) in product(biases, biases) if x + y == a.bias + b.bias]
    # choose the pair of bias values which are closest together
    a.bias, b.bias = legal_outputs[np.argmin(np.array([abs(x-y) for (x,y) in legal_outputs]))]
    
    return a, b

Simulation(init_config(1, 1), bias_average, L = 4).print_reactions()

    0,    +1  -->   +1/2,  +1/2
    0,  +1/2  -->   +1/4,  +1/4
    0,  +1/4  -->   +1/8,  +1/8
    0,  +1/8  -->  +1/16, +1/16
    0,    -1  -->   -1/2,  -1/2
   +1,    -1  -->      0,     0
 +1/2,    -1  -->   -1/4,  -1/4
 +1/4,    -1  -->   -1/2,  -1/4
    0,  -1/2  -->   -1/4,  -1/4
   +1,  -1/2  -->   +1/4,  +1/4
 +1/2,  -1/2  -->      0,     0
 +1/4,  -1/2  -->   -1/8,  -1/8
 +1/8,  -1/2  -->   -1/4,  -1/8
    0,  -1/4  -->   -1/8,  -1/8
   +1,  -1/4  -->   +1/2,  +1/4
 +1/2,  -1/4  -->   +1/8,  +1/8
 +1/4,  -1/4  -->      0,     0
 +1/8,  -1/4  -->  -1/16, -1/16
+1/16,  -1/4  -->   -1/8, -1/16
    0,  -1/8  -->  -1/16, -1/16
 +1/2,  -1/8  -->   +1/4,  +1/8
 +1/4,  -1/8  -->  +1/16, +1/16
 +1/8,  -1/8  -->      0,     0
+1/16,  -1/8  -->      0, -1/16
 +1/4, -1/16  -->   +1/8, +1/16
 +1/8, -1/16  -->      0, +1/16
+1/16, -1/16  -->      0,     0


In [6]:
n = 10 ** 6
sim = Simulation(init_config(n // 2 + 1, n // 2 - 1), bias_average, L=int(np.log2(n)))
sim.run(recording_step = 0.5)

 Time: 14749.140

In [10]:
import seaborn as sns
plt.figure()
sns.barplot(['a', 'b', 'c'], [1,2,3])

Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …



<AxesSubplot:>

In [7]:
make_display(sim)
add_index_bar(sim)

AppLayout(children=(Canvas(layout=Layout(grid_area='center'), toolbar=Toolbar(toolitems=[('Home', 'Reset origi…

In [14]:
sim.reset(init_config(n // 2 + 1, n // 2 - 1))
sim.run(recording_step = 0.5)
add_index_bar(sim)

In [6]:
@dataclass(unsafe_hash=True)
class ClockedAgent(Agent):
    role: str = 'main'
    hour: int = 0
    minute: int = 0
    
    def __str__(self):
        if self.bias != 0:
            return super().__str__()
        if self.role == 'clock':
            return 'c' + str(self.hour) + ':' + str(self.minute)
        else:
            return 'u' + str(self.hour)
    
def clocked_init_config(a, b, c):
    return {ClockedAgent(opinion = 1): a, ClockedAgent(opinion = -1): b, ClockedAgent(role = 'clock'): c}

In [7]:
def delayed_split(a, b, L, m):
    a, b = dataclasses.replace(a), dataclasses.replace(b)
    
    if a.role == b.role == 'main':
        # cancel reaction
        if a.bias == -b.bias:
            a.opinion = b.opinion = 0
            a.exponent = b.exponent = 0

        # split reaction
        if a.bias == 0 and b.bias != 0 and a.hour > abs(b.exponent):
            a.opinion = b.opinion
            a.exponent = b.exponent = b.exponent - 1
            a.hour = b.hour = 0

        if b.bias == 0 and a.bias != 0 and b.hour > abs(a.exponent) :
            b.opinion = a.opinion
            b.exponent = a.exponent = a.exponent - 1
            a.hour = b.hour = 0
        
    # clock ticks forward one minute per interaction
    for i in [a,b]:
        if i.role == 'clock' and i.hour < L:
            i.minute += 1
            if i.minute == m:
                i.minute = 0
                i.hour += 1
                
    # unbiased agents propagate max hour
    if a.bias == b.bias == 0:
        if a.hour > b.hour:
            b.minute = 0
            b.hour = a.hour
        if b.hour > a.hour:
            a.minute = 0
            a.hour = b.hour
    
    return a, b

s = Simulation(clocked_init_config(1, 1, 1), delayed_split, L = 4, m = 2)

In [8]:
n = 10 ** 2
sim = Simulation(clocked_init_config(n // 4 + 1, n // 4 - 1, n // 2), delayed_split, L=int(np.log2(n)), m = 10 * int(np.log(n)))
sim.run(recording_step = 0.5)

 Time: 98.360

In [9]:
sim.history

opinion,0,0,0,0,0,0,0,0,0,0,...,1,1,1,-1,-1,-1,-1,-1,-1,-1
exponent,0,0,0,0,0,0,0,0,0,0,...,-4,-5,-6,0,-1,-2,-3,-4,-5,-6
role,clock,clock,clock,clock,clock,clock,clock,clock,clock,clock,...,main,main,main,main,main,main,main,main,main,main
hour,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
minute,0,1,2,3,4,5,6,7,8,9,...,0,0,0,0,0,0,0,0,0,0
time,Unnamed: 1_level_5,Unnamed: 2_level_5,Unnamed: 3_level_5,Unnamed: 4_level_5,Unnamed: 5_level_5,Unnamed: 6_level_5,Unnamed: 7_level_5,Unnamed: 8_level_5,Unnamed: 9_level_5,Unnamed: 10_level_5,Unnamed: 11_level_5,Unnamed: 12_level_5,Unnamed: 13_level_5,Unnamed: 14_level_5,Unnamed: 15_level_5,Unnamed: 16_level_5,Unnamed: 17_level_5,Unnamed: 18_level_5,Unnamed: 19_level_5,Unnamed: 20_level_5,Unnamed: 21_level_5
0.00,50,0,0,0,0,0,0,0,0,0,...,0,0,0,24,0,0,0,0,0,0
0.54,14,24,9,2,1,0,0,0,0,0,...,0,0,0,17,0,0,0,0,0,0
1.20,3,9,19,9,6,1,1,1,1,0,...,0,0,0,14,0,0,0,0,0,0
1.84,0,4,13,12,6,5,4,3,1,2,...,0,0,0,10,0,0,0,0,0,0
2.41,0,2,7,4,12,7,7,4,3,2,...,0,0,0,10,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
96.27,0,0,0,0,0,0,0,0,0,0,...,14,36,0,0,0,0,0,0,0,0
96.97,0,0,0,0,0,0,0,0,0,0,...,14,36,0,0,0,0,0,0,0,0
97.86,0,0,0,0,0,0,0,0,0,0,...,14,36,0,0,0,0,0,0,0,0
98.36,0,0,0,0,0,0,0,0,0,0,...,14,36,0,0,0,0,0,0,0,0


In [49]:
int(np.log(n))

4

In [1]:
sim.state_list

NameError: name 'sim' is not defined

In [56]:
make_display(sim)
add_index_bar(sim)

AppLayout(children=(Canvas(layout=Layout(grid_area='center'), toolbar=Toolbar(toolitems=[('Home', 'Reset origi…

In [81]:

# TODO: figure out best way to implement symmetric reactions in the function code

class MajorityAgent(NamedTuple):
    # input: str = 'A'
    # output: Optional[str] = 'A'
    role: str = 'Main'
    minute: Optional[int] = None
    hour: Optional[int] = None
    exponent: Optional[int] = None
    bias: Optional[int] = None


def make_agent(input):
    if input == 'A':
        return MajorityAgent(bias=1, exponent=0)
    if input == 'B':
        return MajorityAgent(bias=-1, exponent=0)
    if input == 'C':
        return MajorityAgent(role='Clock', minute=0)


def majority_main_averaging(a: MajorityAgent, b: MajorityAgent, L: int, k: int, p: float = 1):
    new_a = a._asdict()
    new_b = b._asdict()
    if a.role == b.role == 'Clock':
        if a.minute == b.minute < L * k:
            # clock drip reaction
            new_a['minute'] += 1
            return {(MajorityAgent(**new_a), MajorityAgent(**new_b)): p}
        else:
            # clock epidemic reaction
            new_a['minute'] = new_b['minute'] = max(a.minute, b.minute)

    # clock update reaction
    if a.role == 'Main' and a.bias == 0 and b.role == 'Clock':
        new_a['hour'] = max(a.hour, b.minute // k)
    if b.role == 'Main' and b.bias == 0 and a.role == 'Clock':
        new_b['hour'] = max(b.hour, a.minute // k)

    if a.role == b.role == 'Main':
        # cancel reaction
        if {-1, 1}.issubset({a.bias, b.bias}) and a.exponent == b.exponent:
            new_a['bias'] = new_b['bias'] = 0
            new_a['exponent'] = new_b['exponent'] = None
            new_a['hour'] = new_b['hour'] = -a.exponent
        # split reaction
        if a.bias == 0 and b.bias != 0 and abs(a.hour) > abs(b.exponent):
            new_a['bias'] = b.bias
            new_a['hour'] = None
            new_a['exponent'] = new_b['exponent'] = b.exponent - 1
        if b.bias == 0 and a.bias != 0 and abs(b.hour) > abs(a.exponent):
            new_b['bias'] = a.bias
            new_b['hour'] = None
            new_a['exponent'] = new_b['exponent'] = a.exponent - 1
    return MajorityAgent(**new_a), MajorityAgent(**new_b)


def get_one_field(df, field):
    return df.transpose().groupby(level=field).sum().transpose()

NameError: name 'NamedTuple' is not defined