# Linear GP

Let us implement (from scratch) a linear GP system based on stack architecture.

Recall that a stack has, like a stack of plates, two main operations:
- ```push``` put a new element on the top of the stack
- ```pop``` remove the element at the top of the stack

To illustrate how a stack architecture works we start with an example.

The program is ```3 4 5 + * 30 -``` and the stack is initially empty (i.e., ```[]```). Here the execution one instruction at a time:

| Instruction | Stack |
|-|-|
| ```3``` | ```[3]``` |
| ```4``` | ```[3, 4]``` |
| ```5``` | ```[3, 4, 5]``` |
| ```+``` | ```[3, 9]``` |
| ```*``` | ```[27]``` |
| ```30``` | ```[27 30]``` |
| ```-``` | ```[3]``` |

There are entire programming languages (called stack-based languages) where programming is done in this way. The most famous is Forth (see [Starting Forth](https://www.forth.com/starting-forth/) for an introduction).

In [131]:
import enum
import math
import random

The opcodes that we use are:
- PLUS, MINUS, TIMES, DIVIDE, MOD. They perform exactly what their names imply using as arguments the two elments on the top of the stack.
- DUP. Duplicate the element at the top of the stack.
- SWAP. Swap the two elements at the top of the stack.
- NOP. Perform no operation.

In [132]:
opcodes = enum.Enum('opcodes', 'PLUS MINUS TIMES DIVIDE MOD DUP SWAP NOP')

In [133]:
def print_prg(prg : list):
    for block in prg:
        if type(block) is not int:
            print(f"{block.name} ", end = "")
        else:
            print(f"{block} ", end = "")
    print()

A program $p = p_1, \ldots, p_n$ is a sequence of instructions and constants. 

The stack has initial state $s = s_1, \ldots, s_m$ with $s_m$ at the _top_ of the stack.

The evaluation of $p_i$ is as follows:
- If $p_i$ is an instruction then its arguments are removed from the top of the stack, the instruction is executed, and the results are pushed on the stack.
- If $p_i$ is a constant, then it is pushed on the stack.
- In any case, we move to evaluate $p_{i+1}$ or we halt returning the current stack if $i = n$.

In [134]:
def eval(stack, program):
  while program != []:
      op = program[0]
      program = program[1:]
      if op == opcodes.PLUS:
        op1 = stack.pop()
        op2 = stack.pop()
        stack.append(op1 + op2)
      elif op == opcodes.MINUS:
        op1 = stack.pop()
        op2 = stack.pop()
        stack.append(op1 - op2)
      elif op == opcodes.TIMES:
        op1 = stack.pop()
        op2 = stack.pop()
        stack.append(op1 * op2)
      elif op == opcodes.DIVIDE:
        op1 = stack.pop()
        op2 = stack.pop()
        stack.append(op1 / op2)
      elif op == opcodes.MOD:
        op1 = stack.pop()
        op2 = stack.pop()
        stack.append(op1 % op2)
      elif op == opcodes.DUP:
        tmp = stack.pop()
        stack.append(tmp)
        stack.append(tmp)
      elif op == opcodes.SWAP:
        tmp1 = stack.pop()
        tmp2 = stack.pop()
        stack.append(tmp1)
        stack.append(tmp2)
      elif op == opcodes.NOP:
        pass
      else:
        stack.append(op)
  return stack

We generate a random program of length $n$ alternating randomly selected opcodes and constants (in this case integers between $-2$ and $2$).

In [135]:
def random_program(n):
  prg = []
  func = list(opcodes)
  for i in range(0, n):
    if random.random() < 0.5:
      op = random.choice(func)
    else:
      op = random.randint(-2, 2)
    prg.append(op)
  return prg

In [136]:
print_prg(random_program(6))

0 PLUS MINUS 2 1 2 


Tournament selection, being based on the phenotype, requires no changes.

In [137]:
def tournament_selection(fit, pop, t_size=4):
  tournament = random.choices(pop, k=t_size)
  return min(tournament, key=fit)

Also two points crossover is a direct generalization of one-point crossover. One important difference  here is that we are selecting _possibly different_ crossover points for the two parents, thus allowing individuals to shrink and grow.

In [138]:
def two_points_crossover(x, y):
  k1 = random.randint(0, len(x)-1)
  k2 = random.randint(k1, len(x)-1)
  h1 = random.randint(0, len(y)-1)
  h2 = random.randint(h1, len(y)-1)
  of1 = x[0:k1] + y[h1:h2] + x[k2:]
  of2 = y[0:h1] + x[k1:k2] + y[h2:]
  return of1, of2

One easy mutation is to replace an instuction (opcode or constant) with another random opcode or constant.

In [139]:
def mutation(x, p_m):
  def change(b):
    if random.random() < p_m:
      if random.random() < 0.5:
        op = random.choice(list(opcodes))
      else:
        op = random.randint(-2, 2)
      return op
    else:
      return b

  return [change(b) for b in x]

Now LinearGP can be implemented in a way similar to GA.

In [None]:
def linear_GP(fit, pop_size, n_iter = 100):
  p_m = 0.3
  pop = [random_program(7) for _ in range(0, pop_size)]
  best = []
  for i in range(0, n_iter):
    p_m = max(p_m * 0.99, 0.1) # linear scheduler for the mutation probability 
    selected = [tournament_selection(fit, pop) for _ in range(0, pop_size)]
    pairs = zip(selected, selected[1:] + [selected[0]])
    offsprings = []
    for x, y in pairs:
      of1, of2 = two_points_crossover(x, y)
      offsprings.append(of1)
      # offsprings.append(of2)
    pop = [mutation(x, p_m) for x in offsprings]
    candidate_best = min(pop, key=fit)
    if fit(candidate_best) < fit(best):
      best = candidate_best
    if fit(best) == 0:
      return best
    # print(f"Best individual at generation {i}: {best}")
    print(f"Best fitness at generation {i}: {fit(best)} [pm = {p_m}]")
  return best

Our fitness function computes the MSE on $100$ points (all integers from $0$ to $99$) where the target function that we want to learn is $x^2 + 3x + 2$.

Notice that the input is given by putting a non-empty stack in the initial state and that if an error occur (e.g., trying to pop for an empty stack) $+ \infty$ fitness value is returned

In [145]:
def fit_with_target(target_fun):
  def fit(prg):
    data = [(i, target_fun(i)) for i in range(0, 100)]
    sq_errors = 0
    for x, y in data:
      try:
        stack = eval([x], prg)
      except Exception:
        return math.inf
      if stack == []:
        return math.inf
      else:
        sq_errors += (y - stack.pop())**2
    return sq_errors/len(data)
  return fit

Let us see if we can learn the function...

In [None]:
random.seed(0)
best = linear_GP(fit_with_target(lambda x : x**2), 300)

Best fitness at generation 0: 477640270.5 [pm = 0.297]
Best fitness at generation 1: 475284826.0 [pm = 0.29402999999999996]
Best fitness at generation 2: 475284826.0 [pm = 0.29108969999999995]
Best fitness at generation 3: 465733162.0 [pm = 0.28817880299999993]
Best fitness at generation 4: 465733162.0 [pm = 0.28529701496999993]
Best fitness at generation 5: 465733162.0 [pm = 0.28244404482029994]
Best fitness at generation 6: 465733162.0 [pm = 0.27961960437209693]
Best fitness at generation 7: 302154058.3 [pm = 0.276823408328376]
Best fitness at generation 8: 302154058.3 [pm = 0.27405517424509224]
Best fitness at generation 9: 302154058.3 [pm = 0.2713146225026413]
Best fitness at generation 10: 302154058.3 [pm = 0.2686014762776149]
Best fitness at generation 11: 302154058.3 [pm = 0.26591546151483875]
Best fitness at generation 12: 302154058.3 [pm = 0.2632563068996904]
Best fitness at generation 13: 302154058.3 [pm = 0.2606237438306935]
Best fitness at generation 14: 302154058.3 [pm = 0

In [144]:
print(eval([1], best))
print_prg(best)

[-4]
DUP -2 -2 PLUS TIMES TIMES 


$$
[x] \\
[x,x] \\
[(x*x)] \\
[(x*x),-2] \\
[(x*x)-(-2)]\\
x^2 + 2
$$