In [None]:
import matplotlib.pyplot as plt
import time
import numpy as np
import numba as nb
from random import random, randint, shuffle, choices
from numba import jit #acceleration
from google.colab import files  #to download the text

# Define functions

In [None]:
@jit(nopython=True, parallel=True, fastmath=True, cache=True)
def rand_sym(N_max, L):
  '''
  generate L*L random hollow symmetric matrices
  '''
  
  Lambda_k = np.random.rand(N_max, L, L)
  for C_ij in Lambda_k:
    np.fill_diagonal(C_ij, 0)
    C_ij += C_ij.T
    C_ij /= 2

  Lambda_d = [Lambda_k[0]]
  
  for i in range(1, N_max):
    Lambda_d.append(Lambda_d[i - 1] + Lambda_k[i])
  
  return Lambda_d, Lambda_k

def Choose_Prob_index(P):
  '''
  give a probability distribution P = [P1, P2, ..., PN], 
  choose one of then according to P and return the index (begin from 1)
  '''

  a = random()
  P_index = 1
  P_boundary = 0
  for P_i in P:
    P_boundary += P_i
    if P_boundary > a:
      return P_index
    else:
      P_index += 1
  return P_index

@jit(nopython=True, fastmath=True, cache=True)
def Prob_Iq(q, Lambda_d, vec_N):
  '''
  In mode (I)
  calcualte total FC vector for q and probaility that f_q using s_j
  '''
  tot_FC_q = [Lambda_d[ min(vec_N[q-1], vec_N[j]) -1 ][q-1][j] for j in range(q)]
  
  tot_Prob_I = 0
  for i in tot_FC_q:
    tot_Prob_I += i

  Prob_I = [x/tot_Prob_I for x in tot_FC_q] #Prob_I(q, j)
  
  return Prob_I, tot_FC_q

@jit(nopython=True, fastmath=True, cache=True)
def h_p(p, freq):
  '''
  calculate h_p(B) based on the definition
  '''

  a = 0
  for i in freq:
    if i != 0: #lim_(x->0)(-x log x) = 0
      a -= i * np.log(i)
  return a/np.log(p)

def total_effort(lam, h, p, q):
  '''
  calculate total effort
  
  ---paras
  lam: system parameter
  h: h_p(B)
  p: size of Block inventory
  q: size of Book
  '''
  return ( (1 - lam * (1 + np.log(p)/np.log(q) )) * h + (1 - lam)* q* np.log(q)/np.log(p) ) / q

def I_ht(h0, freq_t, p):
  '''
  calculate h_p(B) at time t in mode (I)

  freq_t is the frequency of selected block at time t
  '''
  
  return h0 - (freq_t * np.log(freq_t) - (freq_t - 1) * np.log(freq_t - 1)) / np.log(p)

@jit(nopython=True, fastmath=True, cache=True)
def P_kq(z, k, q, Lambda_k, vec_N):
  '''
  return P_k used in mode (II)
  '''
  #ref: https://stackoverflow.com/questions/9987483/elif-in-list-comprehension-conditionals
  weight_k = [Ck_qj if (vec_N[q-1] >= k) and (vec_N[j] >= k) else 0
           for j, Ck_qj in enumerate(Lambda_k[k-1][q-1][:q-1])
           ]
  weight_k.append(z)

  sum_weight_k = 0
  for i in weight_k:
    sum_weight_k += i

  P_k = [n/sum_weight_k for n in weight_k]

  return P_k

@jit(nopython=True, fastmath=True, cache=True)
def P_kj_mut(z, k, q, j, Lambda_k, vec_N):
  '''
  return P_k used in de novo like mutation

  ---parameters
  z: effective connection
  k: k-th component, k = 1, ...
  q: the Book length
  j: j is a number in range(q)
  
  '''
  #ref: https://stackoverflow.com/questions/9987483/elif-in-list-comprehension-conditionals
  #Since C_jj =0, the de novo block can't use its original component to construct new block
  weight_k = [Ck_ju if (vec_N[j] >= k) and (vec_N[u] >= k) else 0
           for u, Ck_ju in enumerate(Lambda_k[k-1][j][:q])
           ]
  weight_k.append(z) #len(weight_k = q + 1)

  sum_weight_k = 0
  for i in weight_k:
    sum_weight_k += i

  P_k = [n/sum_weight_k for n in weight_k]

  return P_k


@jit(nopython=True, fastmath=True, cache=True)
def U(n_1, n_2):
  '''
  generates a uniform random integer (not n_1) in the range [0, n_2 - 1]
  '''
  a = randint(0, n_2 - 2)
  if a < n_1:
    return a
  elif a >= n_1:
    return a + 1

@jit(nopython=True, fastmath=True, cache=True)
def mutation_list(q, p, P_mu):
  '''
  For all blocks in Book, they have the mutation probability P_mu.
  When mutation happens, the block will be replaced by another block
  This function is uniform mutation

  ---paras
  q: size of Book
  p: size of Block inventory
  P_mu: mutation probability
  '''
  m = [(j, U(j, p)) for j in range(q) if random() < P_mu]
  return m

@jit(nopython=True, fastmath=True, cache=True)
def mutation_list_FC(q, B, P_mu, Prob_FC):
  '''
  For all blocks in Book, they have the mutation probability P_mu.
  When mutation happens, the block will be replaced by another block
  This function is non-uniform mutation (based on FC probability)

  ---paras
  q: size of Book
  B: Block inventory
  P_mu: mutation probability
  Prob_FC: FC probability
  '''
  m = [(j, choices(B, weights = Prob_FC)[0]) for j in range(q) if random() < P_mu]
  return m


# Define parameters and build FC network

In [None]:
#define parameters (affect FC)
L = 10000
N_max = 5

#build FC
Lambda_d, Lambda_k = rand_sym(N_max, L)
Lambda_d = np.array(Lambda_d)

In [None]:
#define parameters (not affect FC)
P_N = np.array([0.15, 0.40, 0.25, 0.15, 0.05])
lam = 0.47        #system parameter lambda
z_cof = 0.004
z = z_cof * L     #effective connection
P_mu = 0.0005      #mutation probability
T = 1             #repeat counts for mutation
ini_num_Block = 3 #initial number of Blocks in Book
#https://stackoverflow.com/questions/658763/how-to-suppress-scientific-notation-when-printing-float-values
#the digit after decimal point is set to be 15
name = f'{z_cof:.15f}'.rstrip('0').split('.')[1] + '_' + f'{lam:.15f}'.rstrip('0').split('.')[1]
name = name + '_' + f'{P_mu:.15f}'.rstrip('0').split('.')[1] + '_' + str(T)

# The program in this section will generate the initial "Book".

One should remind if the initial Book only contain one kind of block, there wil be `error`. 

When this *happens*, just run the following codes again. 

In [None]:
#restore information
vec_N = np.array([Choose_Prob_index(P_N) for j in range(L)], dtype = int) #number of components
#Book[j] = [j+1, c1, c2, ...], where ck is the component at position k
Book = [[j+1] for j in range(L)] #Block[j] = Book[j][1:] = [c1, c2, ...]
Block_inventory = [] #namely the set B = [Block[i]] (non-repeat)
Block_freq = {} #frequency of Block appears in Book, format: {str(Block[i]): frequency}
index_Block = {} #index in B, format: {str(Block[i]): index}
Component_inventory = [0] #Component = 0,1,2,....
Direction_count = [] #count Direction of evolution with 1: co-option, 2:de novo
#count Direction of mutation with 0: reamin, 1: co-option like, 2:de novo like
Direction_mut = []

#total FC tensor = [C_1, C_2, ...], where C_i = [C_i1, C_i2, ...]
tot_FC_list = [ [] for q in range(L)]
for q in range(ini_num_Block):
    tot_FC_list[q] = [Lambda_d[ min(vec_N[q-1], vec_N[j]) -1 ][q-1][j] for j in range(q)]


#initialize begining blocks
ib = -1 #index of block in B
for j in range(ini_num_Block):
  b = []
  for i in range(vec_N[j]):
    b.append(i)
    if i > Component_inventory[-1]:
      Component_inventory.append(i)
  shuffle(b)
  Book[j].extend(b)
  str_b = str(b)
  if str_b not in Block_freq:
    ib += 1
    index_Block[str_b] = ib
    Block_freq[str_b] = 1
    Block_inventory.append(b)
  else:
    Block_freq[str_b] += 1


#initial effort
h0 = 0
p0 = len(Block_freq)
q0 = ini_num_Block
h0 = h_p(p0, np.fromiter(Block_freq.values(), dtype = int))

print(Block_freq)
print(Book)

# The main algorithm

In [None]:
EFFORT = [] #record the effort after decide the direction of evolution
mut_success = 0

for t in range(1, L - q0 + 1):
  '''
  First part: decide the direction of evolution
  '''
  q = q0 + t

  #mode (I) co-option
  p0 = len(Block_freq)
  Prob_I, tot_FC_list[q-1] = Prob_Iq(q, Lambda_d, vec_N) #Prob_I(q, j)
  nu = Choose_Prob_index(Prob_I)

  s_nu = Book[nu - 1][1:]
  str_s_nu = str(s_nu)
  index_nu = index_Block[str_s_nu]  

  Bf_cooption = np.fromiter(Block_freq.values(), dtype = int)  
  Bf_cooption[index_nu] += 1
  h_0I = h_p(p0, Bf_cooption)
  TF_I = total_effort(lam, h_0I, p0, q) #total effort in mode (I)

  #mode (II) de novo
  p = p0 + 1
  b = []
  II_compo = [Component_inventory[-1] + 1] #component list used in mode (II)
  ini_len_II = 1
  
  #create new block
  for k in range(1, vec_N[q-1] + 1):
    
    P_k = P_kq(z, k, q, Lambda_k, vec_N) #len(P_k) = q
    j_k = Choose_Prob_index(P_k) #j_k <= q

    if j_k == q:
      #create new component
      if ini_len_II != 1:
        new_comp = II_compo[-1] + 1
        II_compo.append(new_comp)
      else:
        new_comp = II_compo[-1]
        ini_len_II += 1
      b.append(new_comp)
    else:
      #used old component
      b.append(Book[j_k - 1][k])


  str_b = str(b)
  Bf_denovo = np.fromiter(Block_freq.values(), dtype = int)

  try:
    #if we "create" an existing block
    Bf_denovo[index_Block[str_b]] += 1
    existing = True
    h_0II = h_p(p0, Bf_denovo)
    TF_II = total_effort(lam, h_0II, p0, q) #total effort in mode (II)

  except:
    #if we create a new block
    existing = False
    h_0II = h_p(p, Bf_denovo)
    TF_II = total_effort(lam, h_0II, p, q) #total effort in mode (II)

  #Principle of least effort
  #decide the direction of evolution: (I) co-option or (II) de novo
  

  if TF_I >= TF_II: #choose de novo
    Component_inventory.extend(II_compo)    

    if existing: #de novo but using existing block
      Block_freq[str_b] += 1
      Direction_count.append(1.5)

    else:
      Block_freq[str_b] = 1
      ib += 1
      index_Block[str_b] = ib
      #Block_inventory.append(b)
      Direction_count.append(2)

    Book[q-1].extend(b)
    EFFORT.append(TF_II)

  else: #choose co-option
    Direction_count.append(1)
    Block_freq[str_s_nu] += 1
    Book[q-1].extend(s_nu)
    vec_N[q-1] = vec_N[nu-1]
    EFFORT.append(TF_I)
  
  '''
  Second part: mutation
  '''
  
  for mut_loop in range(T): #run mutation for T times

    for j in range(q):
      if random() <= P_mu:
        str_s_j = str(Book[j][1:])
        index_j = index_Block[str_s_j] #index of s_j in B
        r_j = Block_freq[str_s_j] #original freq of s_j in B

        #step1: decide the direction of evolution
        ##(1) remain
        Bfv = np.fromiter(Block_freq.values(), dtype = int)

        h_old = h_p(p, Bfv)
        TF_old = total_effort(lam, h_old, p, q)

        ##(2) become existing (uniformly mutate s_j -> s_xi, co-option like)
        xi = U(j, q)
        str_s_xi = str(Book[xi][1:])
        index_xi = index_Block[str_s_xi] #index of s_xi in B

        if str_s_xi == str_s_j: #nothing changes
          ht_I = h_p(p, Bfv)
          TF_I = total_effort(lam, ht_I, p, q)

        else: #s_j -> s_xi and str_s_xi != str_s_j
          Bfv_cooption = Bfv.copy()
          Bfv_cooption[index_j] -= 1
          Bfv_cooption[index_xi] += 1

          ht_I = h_p(p, Bfv_cooption)
          TF_I = total_effort(lam, ht_I, p, q)
        
        ##(3) become new (s_j -> new block, de novo like)
        Bfv_denovo = Bfv.copy()
        Bfv_denovo[index_j] -= 1

        if r_j == 1: #this situation does not affect size of B and effort
          ht_II = h_old
          TF_II = TF_old
        elif r_j > 1:
          ht_II = h_p(p + 1, Bfv_denovo)
          TF_II = total_effort(lam, ht_II, p + 1, q)

        #step3: principle of least effort
        if (TF_old >= TF_I) and (TF_old >= TF_II):
          #mutation successful
          mut_success += 1

          if TF_II > TF_I: #co-option like
            EFFORT.append(TF_I)
            
            if str_s_j != str_s_xi:
              Direction_mut.append(1)
              Block_freq[str_s_j] -= 1
              Block_freq[str_s_xi] += 1
              Book[j][1:] = Book[xi][1:]
              vec_N[j] = len(Book[xi][1:])
            else:
              Direction_mut.append(0.5)


          else: #de novo like
            EFFORT.append(TF_II)

            if r_j == 1: #frequency one word becomes another frequency one
              Direction_mut.append(1.5)
              
            elif r_j > 1:
              Direction_mut.append(2)
              the_same = True  #True when de novo word == existing word
              
              #"while loop" fits our need but it is too slow. Instead, we use "for loop".
              for w in range(100000000000000000000):
                if the_same == False:
                  break
                b_new = []
                II_compo = [Component_inventory[-1] + 1] #components used in de novo
                ini_len_II = 1

                for k in range(1, vec_N[j] + 1):

                  P_k = P_kj_mut(z, k, q, j, Lambda_k, vec_N) #len(P_k) == q+1
                  mu_k = Choose_Prob_index(P_k) #mu_k <= q+1

                  if mu_k == q + 1:
                    #create new component
                    if ini_len_II != 1:
                      new_comp = II_compo[-1] + 1
                      II_compo.append(new_comp)
                    else:
                      new_comp = II_compo[-1]
                      ini_len_II += 1
                    b_new.append(new_comp)
                  else:
                    #used old component
                    b_new.append(Book[mu_k - 1][k])

                str_b_new = str(b_new)
                if str_b_new not in Block_freq:
                  the_same = False
              if the_same == True:
                #after 100000000000000000000, the mutation still falses
                raise ValueError("P_mu is too high to produce de novo like mutation, lower it.")
              
              Book[j][1:] = b_new
              Block_freq[str_s_j] -= 1
              Block_freq[str_b_new] = 1
              #Block_inventory.append(b_new)
              Component_inventory.extend(II_compo)

              p = p + 1 #new block change size of B
              ib += 1   #B becomes larger
              index_Block[str_b_new] = ib

        elif (TF_I > TF_old) and (TF_II > TF_old): #remains old
          Direction_mut.append(0)
          EFFORT.append(TF_old)
  
  '''
  Third part: before next iteration
  '''
  
  B_freq_copy = Block_freq.copy()
  index_Block = {}
  ib = -1
  #Block_inventory = []
  for s_blk, freq in B_freq_copy.items():
    #notice: s_blk is string
    if freq != 0:
      #Block_inventory.append(eval(s_blk)) #the eval can translate string to normal format
      ib += 1
      index_Block[s_blk] = ib
    else:
      del Block_freq[s_blk]

# Show result and save

In [None]:
plt.title('Direction')
plt.plot(Direction_count, '.', markersize = 2)
plt.show()
plt.title('Direction of mutation')
plt.plot(Direction_mut, '.', markersize = 2)
plt.show()
plt.title('effort')
plt.plot(EFFORT)
plt.show()
plt.title('FRD')
rho = list(np.fromiter(Block_freq.values(),  dtype = int))
rho.sort(reverse = True)
rank = [i for i in range(len(rho))]
plt.plot(rank, rho, '.')
plt.xscale('log')
plt.yscale('log')
plt.show()

In [None]:
f = open(name + '.txt', 'w')
for wi in Book:
  k = '-'.join(map(str, wi[1:]))
  f.write(k)
  f.write(' ')
f.close()
files.download(name + '.txt')