In [None]:
import numpy as np
#from math import comb (deprecated)
from scipy.special import comb
from time import time
from tqdm.notebook import tqdm, trange

In [None]:
class Distribution:
#     A class to manage discret distributions
#     Contains a support list (self.n)
#     and a probability list (self.p) 
#     giving the probability we draw the corresponding value from the support
    def __init__(self, supports, values):
        self.n = supports
        self.p = values
        
    def order(self):
#         to sort the support, keeping the corresponding probability 
        self.n, self.p = zip(*sorted(zip(self.n, self.p)))
    
    def fuse(self):
#         if a support is sorted, and has repeated values, this fuses them with their probability
        supp = [self.n[0]]
        prob = [self.p[0]]
        for n,p in zip(self.n[1:],self.p[1:]):
            if n == supp[-1]:
                prob[-1] += p
            else : 
                supp.append(n)
                prob.append(p)
        self.n = supp
        self.p = prob
        
    def normalize(self):
#         some manipulations (like filtering) lose normalization, this brings it back
        prob_sum = sum(self.p)
        self.p = [p/prob_sum for p in self.p]
    
    def order_n_fuse(self):
#         applies the corresponding methods
        self.order()
        self.fuse()
        
    def filtering(self, cond):
#         filters over a condition (then normalize)
        self.p = [p for n,p in zip(self.n, self.p) if cond(n)]
        self.n = [n for n in self.n if cond(n)]
        self.normalize()
    
    def sum_cond(self,cond):
#         gives the probability we are in a set defined by the condition cond
        return sum([p for n,p in zip(self.n, self.p) if cond(n)])
        
    def function(a,f,b, sup_filter=None):
#         returns the probability distribution of f(a,b), we can force a condition sup_filter(a,b) over the supports of a and b 
        if type(b) == Distribution :
            true_func = lambda x,y:True
            if not sup_filter : 
                sup_filter = true_func
            support_values = []
            prob_values = [] 
            for a_v, a_p in zip(a.n, a.p) :
                for b_v, b_p in zip(b.n, b.p) :
                    if sup_filter(a_v,b_v) :
                        support_values.append(f(a_v, b_v))
                        prob_values.append(a_p * b_p)
            distrib = Distribution(support_values, prob_values) 
            distrib.order_n_fuse()
            if sup_filter != true_func:
                distrib.normalize()
            return distrib
        distrib = Distribution([f(a,b) for n in a.n], a.p) 
        return distrib
            
    def __add__(a,b):
#         returns the probability distribution of a+b, allows to actually use the a+b syntax
        return a.function(lambda x, y: x+y, b)
    
    def __mul__(a,b):
#         returns the probability distribution of a*b, allows to actually use the a*b syntax
        return a.function(lambda x, y: x*y, b)

    def __truediv__(a,b):
#         returns the probability distribution of a/b, allows to actually use the a/b syntax
#         WARNING: no control for /0
        return a.function(lambda x, y: x+y, b)

    def __neg__(a):
#         returns the probability distribution of -b, allows to actually use the -b syntax
        distrib = Distribution([-n for n in a.n], a.p) 
        return distrib
    
    def __sub__(a,b):
#         returns the probability distribution of a-b, allows to actually use the a-b syntax
        return a.function(lambda x, y: x-y, b)
    
    def cut_geometric(N,p):
#         To create distribution that correspond to a truncated geometric distribution 
#         This corresponds to the probability distribution of the best priority you get at any level
        support = list(range(N+1))
        probs = []
        for n in support :
            if n==N: 
                probs.append((1-p)**N)
            else :
                probs.append(p*(1-p)**n)
        return Distribution(support, probs)
    
    def binomial(N,p):
#         To create distribution that correspond to a binomial distribution 
#         This corresponds to the probability distribution of the number of endorsement we get at a level 
        supports = list(range(N+1))
        probs = [comb(N,n) * s**n * (1-s)**(N-n) for n in supports]
        return Distribution(supports, probs)
        
    def __str__(self):
        s = ''
        for n, p in zip(self.n, self.p):
            s = s + f'{n:10d} : {p:.2e}\n'
        return s[:-1]

note: probably want to upgrade the distribution class's fields to numpy arrays or torch tensors for speed

In [None]:
# very basic class for representing delay functions, 
# and sub class for emmy family
class Delay:
    def __init__(self, f):
        self.f = f
        
    def __call__(self, prio, endo):
        return self.f(prio, endo)
    
    def distrib(self, prio_distrib, endo_distrib):
        delay_values = []
        prob_values = []
        for prio_v, prio_p in zip(prio_distrib.n, prio_distrib.p) :
            for endo_v, endo_p in zip(endo_distrib.n, endo_distrib.p) :
                delay_values.append(self(prio_v,endo_v))
                prob_values.append(prio_p * endo_p)
        delay_distrib = Distribution (delay_values, prob_values) 
        delay_distrib.order_n_fuse()
        return delay_distrib
        
class Delay_emmy(Delay):
    def __init__(self, emmy):
        
        if emmy=='emmy':
            self.E = 32
            super().__init__( lambda p,e: 60 + 75*p)
        elif emmy=='emmy+':
            self.E = 32
            super().__init__( lambda p,e: 60 + 40*p + 8*max(self.E*3/4-e,0))
        elif emmy=='emmy*':
            self.E = 256
            def f(p,e):
                if p==0 and e>=3*self.E/5:
                    return 30
                else:
                    return 60 + 40*p + 8*max(self.E*3/4-e,0)
                    
            super().__init__( f)

In [None]:
# very basic class for representing baking reward functions, 
# and sub class for emmy family
class Reward_baking:
    def __init__(self, f):
        self.f = f
        
    def __call__(self, prio, endo):
        return self.f(prio, endo)

class Reward_baking_emmy(Reward_baking):
    def __init__(self, emmy):
        
        if emmy=='emmy':
            self.E = 32
            super().__init__( lambda p,e: 16)
        elif emmy=='emmy+':
            self.E = 32
            def f(p,e):
                if p==0 :
                    return 40 * e / E
                else:
                    return 6 * e / E
            super().__init__( f)
        elif emmy=='emmy*':
            self.E = 256
            def f(p,e):
                if p==0 :
                    return 20 * e / E
                else:
                    return 3 * e / E
            super().__init__( f)

In [None]:
# very basic class for representing endorsement reward functions, 
# and sub class for emmy family
class Reward_endors:
    def __init__(self, f):
        self.f = f
        
    def __call__(self, prio):
        return self.f(prio)

class Reward_endors_emmy(Reward_baking):
    def __init__(self, emmy):
        
        if emmy=='emmy':
            self.E = 32
            super().__init__( lambda p: 2)
        elif emmy=='emmy+':
            self.E = 32
            super().__init__( lambda p: 40/E if p==0 else 40/E*2/3)
        elif emmy=='emmy*':
            self.E = 256
            super().__init__( lambda p: 20/E if p==0 else 20/E*2/3)

### Test
for emmy+ case

In [None]:
delay = Delay_emmy('emmy+')
E = 32
P = 32
s = 0.2 # stake of attacker

# distribution of the attacker's number of endorsement
endo = Distribution.binomial(E,s)
# distribution of the attacker's best priority
prio = Distribution.cut_geometric(P,s)
# distribution of the honest bakers' best priority 
prio_minus = Distribution.cut_geometric(P,1-s)

In [None]:
# we join the two priority distributions into 1 where we get an integer different from 0
# when negative the attacker has the priority 0 up to the |value|-1, 
# so the best priority for honest bakers is the value (in positive)
# when positive the honest bakers have the priority 0 up to the value-1, 
# so the best priority for the attacker is the value (in positive)
support = [-n for n in prio_minus.n[:0:-1]] + prio.n[1:]
probas = prio_minus.p[:0:-1] + prio.p[1:]
prio_compil = Distribution(support, probas)

# the functions to generate the two Delta distribution detailed in the overleaf 
delta_0_func = lambda p_c, e: delay(max(0,-p_c), E-e)-delay(max(0,p_c), E)
delta_l_func = lambda p_c, e: delay(max(0,-p_c), E-e)-delay(max(0,p_c), e)
delta_0 = prio_compil.function(delta_0_func,endo)
delta_l = prio_compil.function(delta_l_func,endo)

# the list for the probabilities of attacks of length l
F_l = delta_0
F = [None, F_l.sum_cond(lambda x:x>=0)]

F

each execution of the next cell will append a new value to F_l, so we get the probabilities for longer attacks,   
WARNING: takes much more time with each execution

In [None]:
F_l = F_l + delta_l
F.append(F_l.sum_cond(lambda x:x>=0))
F

# Nomadic blog post implementation for length-1

In [None]:
E= 32
P= 32
s = 0.2

def p_e (e, s, E=E):
    return comb(E,e) * s**e * (1-s)**(E-e)

def p_prio(p,s,sat=False, P=P):
    if sat :
        if p > P:
            return 0
        if p == P :
            return (1-s)**p
    return (1-s)**p * s

def p_p(p_d, p_h, s, mode = 'meme_prio', sat = False):
    if mode == 'meme_prio' :
        if p_h == 0:
            return p_prio(p_d, s, sat)
        if p_d == 0:
            return p_prio(p_h, s, sat)
    if mode == 'diff_prio':
        if p_h == 0:
            return p_prio(p_d, s, sat)
        if p_d == 0:
            return p_prio(p_h, 1-s, sat)

def in_P2(p_d,p_h, P2_mode='article'):
    if P2_mode == '00' :
        return (p_d == 0) or (p_h == 0)
    if P2_mode == 'no_00':
        return (p_d == 0) ^ (p_h == 0)

def diff_first (p_d, p_h, e, E=E):
    def delay (p,e) :
        return 60 + 40 * p + 8* max(3/4*E-e, 0)
    return int(delay(p_d, 32) - delay(p_h,E - e))



In [None]:
num_sample = 2000
P2_mode = 'no_00' # 00 no_00
sat= True # False True
mode = 'diff_prio' # meme_prio diff_prio

p_chain_diff = [0] * (2*num_sample + 1)
for p_d in range(P+1):
    for p_h in range(P+1):
        if not in_P2(p_d, p_h, P2_mode) :
            continue
        for e in range(E+1) :
            delta = diff_first(p_d, p_h, e, E)
            if -num_sample <= delta <= num_sample:
                p_chain_diff[delta + num_sample] += p_p(p_d, p_h, s, mode, sat) * p_e (e, s)
sum(p_chain_diff[:num_sample+1])