<a href="https://colab.research.google.com/github/djurjo/DABI/blob/main/PSO-project/pso_groups.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install numba
from numba import jitclass, types, typed, njit, prange
from numba import int32, float32
import random
import numpy as np
import itertools
import time
import csv



In [None]:
def create_random_array(size, list_intervals):
    r = np.zeros(size, dtype = np.float32)
    for i in range(0, size):
      b = random.random() # We use random randoms instead of np.randoms
      interval = list_intervals[i]
      b = (interval[1] - interval[0]) * b + interval[0]
      r[i] += b
    return r

@njit
def add(v1, v2):
  r = np.zeros(v1.size, dtype = np.float32)
  if v1.size == v2.size:
    for i in range(0, v1.size):
      r[i] = v1[i] + v2[i]
  return r

@njit
def escalar(v1, val):
  r = np.zeros(v1.size, dtype = np.float32)
  for i in range(v1.size):
    r[i] = v1[i]* val
  return r

@njit
def sub(v1, v2):
  r = np.zeros(v1.size, dtype = np.float32)
  if v1.size == v2.size:
    for i in range(0, v1.size):
      r[i] = v1[i] - v2[i]
  return r

In [None]:
def create_swarn(search_space,dimension, B):
  swarn = []
  for i in prange(B):
    position = create_random_array(dimension, search_space)
    direction = create_random_array(dimension, search_space)
    b = np.vstack((position, direction, position))
    swarn.append(b)
  t_swarn = typed.List()    
  [t_swarn.append(x) for x in swarn]   
  return t_swarn

@njit
def in_search_space(current_pos, search_space):
    for i in range(len(search_space)):
        if (search_space[i][0] <= current_pos[i]) and (current_pos[i] <= search_space[i][1]):
           pass
        else:
            if (abs(search_space[i][0] - current_pos[i]) < abs(search_space[i][1] - current_pos[i])):
                current_pos[i] = current_pos[i] % search_space[i][0]
            else:
                current_pos[i] = current_pos[i] % search_space[i][1]
    return current_pos

@njit
def take_best_global(swarn, f):
  b_g_p = swarn[0][2]
  b_g_v = f(b_g_p)
  for i in prange(len(swarn)):
    bird = swarn[i]
    if f(bird[2]) < b_g_v :
      b_g_p = bird[2]
      b_g_v = f(bird[2])
    else:
      pass
  return (b_g_p, b_g_v)


In [None]:
@njit
def move_bird(bird, search_space):
    n_pos = add(bird[0], bird[1])
    #bird[0] = in_search_space(n_pos, search_space)
    return bird
@njit
def bird_update_local(bird, f):
    if f(bird[0]) < f(bird[2]):
        bird[2] = bird[0]
    return bird
@njit
def update_bird(bird, best_global_postion, cog, soc, inertia, r1, r2):
    s1 = escalar(bird[1], inertia)
    s2 = escalar(sub(bird[2], bird[0]) , cog*r1)
    s3 = escalar(sub(best_global_postion, bird[0]) , soc*r2)
    bird[1] = add(add(s1, s2), s3)
    return bird
@njit
def bird_upd(bird, b_g_p, cog, soc, inertia ,r1, r2, search_space, f):
    bird = update_bird(bird, b_g_p, cog, soc, inertia ,r1, r2)
    bird = move_bird(bird, search_space)
    bird = bird_update_local(bird, f)
    return bird

@njit(parallel=True)
def swarn_upd(swarn, b_g_p, cog, soc, inertia ,r1, r2, search_space, f):
  for j in prange(0, len(swarn)):
    swarn[j] = bird_upd(swarn[j], b_g_p, cog, soc, inertia ,r1, r2, search_space, f)
  return swarn


In [None]:
def splitBySize(rectangle, sizes):
  """
  A rectangle is an n-dimensional list of tuples (min, max)
  Sizes is a n-dimensional list (or tuple) of positive floats with the expected size
  """
  rects = list([])
  for i in range(len(rectangle)):
    interval  = rectangle[i]
    sz = interval[1] - interval[0]
    wanted_sz = sizes[i]
    complete_ints = int(sz/wanted_sz) #int rounds to floor 
    parts = [(interval[0] + k*wanted_sz, interval[0] + (k + 1)*wanted_sz) for k in range(0, complete_ints)]
    if complete_ints != int(sz/2):
      parts.append((interval[0] + complete_ints * wanted_sz, interval[1]))  
    rects.append(parts)
  lt = list(itertools.product(*rects))
  t_lt = typed.List()
  for x in lt:
    t_lt.append(x)
  return t_lt

@njit
def in_space(point, space):
  for i in range(len(point)):
    if not (point[i] >= space[i][0] and point[i] <= space[i][1]):
      return False
  return True

@njit
def agrupate_swarn(swarn, spl_space):
  n_groups = len(spl_space)
  sw_g = typed.List()
  for i in range(n_groups):
    group = typed.List()
    for bd in swarn:
      if in_space(bd[0], spl_space[i]):
        group.append(bd) 
      sw_g.append(group)
  return sw_g

@njit
def merge(gr_swarn):
  sw = typed.List()
  sw.append(gr_swarn[0]) #This force the sw typedList to have the needed type
  for k in range(1, len(gr_swarn)):
    if not (gr_swarn[k] in sw):
      sw.append(gr_swarn[k])
  return sw

@njit
def run_pso(swarn, iterations, space, target, cog, soc, inertia, r1, r2):
  t = take_best_global(swarn, target)
  b_g_p = t[0]
  b_g_v = t[1]
  for it in range(1, iterations + 1):
    swarn = swarn_upd(swarn, b_g_p, cog, soc, inertia ,r1, r2, space, target) ## In each iteration share the info.
    t = take_best_global(swarn, target)
    b_g_p = t[0]
    b_g_v = t[1]
  return swarn

@njit(parallel = True)
def par_swarns(gr_swarn, b_g_p, cog, soc, inertia ,r1, r2, space, target):
  new_swarn = []
  for i in prange(len(gr_swarn)):
    new_swarn.append(run_pso(gr_swarn[i], 50,space, target, cog, soc, inertia ,r1, r2))
  return new_swarn

def run_pso_gr(iterations, B, space, sizes, dimensions, target, cog, soc, inertia, r1, r2):
  swarn = create_swarn(space, dimensions,B)
  t = take_best_global(swarn, target)
  b_g_p = t[0]
  b_g_v = t[1]
  splited_space = splitBySize(space, sizes)
  gr_swarn = agrupate_swarn(swarn, splited_space)
  k = round(iterations /50) 
  for it in range(1, k + 1):
      swarn = par_swarns(gr_swarn, b_g_p, cog, soc, inertia ,r1, r2, space, target)
      t = take_best_global(swarn, target)
      b_g_p = t[0]  
      b_g_v = t[1]
      print("Best global in iteration ", k, " is ", b_g_v)
      gr_swarn = agrupate_swarn(swarn, splited_space)
  return b_g_v

In [None]:
t = typed.List()
x = [1, 2, 3, 4, 1, 1, 5]
t.append(x[0])
for i in x:
  if not (i in t):
    t.append(i)
print(t)

[1, 2, 3, 4, 5]


In [None]:
@njit
def sphere(x):
  s = 0
  for i in range(len(x)):
    s += x[i]**2
  return s


In [None]:
runs = 10

#### Global variables for rosenbrock and sphere.

cog = 1.8  # Cognitive importance
soc = 2.3   # Social importance
inertia = 0.8

# Two random values that will change each time
r1 = random.random() 
r2 = random.random()

# About the search space:

dimensions = 5
search_space = [(-5, 5)]*dimensions
sizes = [2.5]*dimensions

t_space = typed.List()
[t_space.append(x) for x in search_space]

# Number of birds
B = 500

# Number of iterations

iterations = 1000


In [None]:
t1 = time.time()
sol = run_pso_gr(iterations, B, search_space, sizes, dimensions, sphere, cog, soc, inertia, r1, r2)
ttotal = time.time() - t1

print("We spent: ", ttotal)
print("We get: ", sol)

Encountered the use of a type that is scheduled for deprecation: type 'reflected list' found for argument 'search__space' of function '__numba_parfor_gufunc_0x7f4db7ae9390'.

For more information visit https://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-reflection-for-list-and-set-types

File "<string>", line 1:
<source missing, REPL/exec in use?>

  new_swarn.append(run_pso(gr_swarn[i], 50,space, target, cog, soc, inertia ,r1, r2))
Encountered the use of a type that is scheduled for deprecation: type 'reflected list' found for argument 'search_space' of function 'bird_upd'.

For more information visit https://numba.pydata.org/numba-doc/latest/reference/deprecation.html#deprecation-of-reflection-for-list-and-set-types

File "<ipython-input-4-4a69dc34897d>", line 19:
@njit
def bird_upd(bird, b_g_p, cog, soc, inertia ,r1, r2, search_space, f):
^

  new_swarn.append(run_pso(gr_swarn[i], 50,space, target, cog, soc, inertia ,r1, r2))
Encountered the use of a 