<a href="https://colab.research.google.com/github/50-Course/swarm-optimizers/blob/main/SwarmOptimization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Problem scope: Hydrostatic Thrust Bearing for Minimizing Power Loss

```math
The power loss depends on the decision vector x = (x1, x2, x3, x4)⊤ ∈ X ⊂ R4, where, x1 = Q
is the flow rate, x2 = R0 is the recess radius, x3 = R is the bearing step radius, and finally x4 = μ
is the viscosity of the fluid.
The optimisation problem is expressed as:
min
x∈X f (x) = P0(x)x1
0.7 + Ef (x)
```

The power loss in our system is influenced by four factors

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Todo: use numpy.random.seed somewhere
# You must ensure that all the results are pre-generated and repeatable2,
# otherwise you may lose marks.

In [None]:
# Counter for function evaluations with all values initialized to zero
# a simple hack to get by not using, `collections.Counter`
COUNTER_MAP = {
    'f': 0,
    'g1': 0,
    'g2': 0,
    'g3': 0,
    'g4': 0,
    'g5': 0,
    'g6': 0,
    'g7': 0
}

# Function to reset counters
def reset_counters() -> None:
    for key in COUNTER_MAP:
        COUNTER_MAP[key] = 0

# Function to increment counters
def increment_counter(func_name) -> None:
    COUNTER_MAP[func_name] += 1

# ==================================================
# Function Definitions
#
# - W (x) is the load carrying capacity
# - P0(x) is the inlet pressure,
# - Ef (x) is the friction loss
# - ∆T (x) is the temperature,
# - P (x) is the pressure
# - h(x) is the oil thickness.

def W(x):
  """
  Load carrying capacity
  """
  increment_counter('g1')
  return ((np.pi * P0(x)) / 2) * ((x**(2/3)) - (x**(3/2))) / np.log(x[3]/x[2])

def P0(x):
  """
  Inlet pressure
  """
  increment_counter('g2')
  return ((6e-6 * x[3] * x[0]) / np.pi * h(x)**3 ) * np.log(x[3] / x[2])

def Ef(x):
  """
  Friction loss
  """
  increment_counter('f')
  return 143.308 * Tdelta(x) * x[1]

def Tdelta(x):
  """
  Differential temperature
  """
  increment_counter('g3')
  return 2*(10**P(x) - 560)

def P(x):
  increment_counter('g4')
  return np.log10(np.log10(8.122 * x[3] + 0.8)) - 10.04 - 3.55


def h(x):
  increment_counter('g5')
  return (1500 * np.pi / 60)**2 * 2e-6 * np.pi * x[3] * Ef(x) * (x[3]**4 - x[3]**2) / 4


# ==================================================
# Objective function
# f(x) is the function we want to minimize (or optimize)

def f(x):
  """
  Finds the values of x that minimize this objective function
  within the defined optimization space X

  The objective function f(x) represents the operational power loss,
  considering the inlet pressure, flow rate, friction loss,
  and relevant geometrical and fluid parameters

  Mathematically,

    Operational power loss, f(x) = P0​(x)⋅x1/0.7 ​+ Ef​(x)
  """
  increment_counter('f')
  return ((P0(x) * x[0])/0.7) + Ef(x)


# ==================================================
# Constraint Definitions
#
# gi​(x), where gi [1,,7]
#
# where:
# - g1(x) is weight capacity > weight of generator
# - g2(x) is inlet oil pressure
# - g3(x) is oil temperature rise
# - g4(x) is oil film thickness
# - g5(x) is step radius > recess radius
# - g6(x) is limits on significance off exit loss < 0.001
# - g7(x) is the limit for contact pressure < 5000
def g1(x):
  increment_counter('g1')
  return 101000 - W(x)

def g2(x):
  increment_counter('g2')
  return P0(x) - 1000

def g3(x):
  increment_counter('g3')
  return Tdelta(x) - 50

def g4(x):
  increment_counter('g4')
  return 0.001 - h(x)

def g5(x):
  increment_counter('g5')
  return x[2] - x[3]

def g6(x):
  increment_counter('g6')
  return 0.0307 * x[0]**(772.8 * np.pi * P0(x) * h(x) * x[2]) - 0.001

def g7(x):
  increment_counter('g7')
  return W(x) / (np.pi * (x[2]**3 - x[1]**2)) - 50

In [None]:
# So here is how the random search algo works
# We have a method, that takes in some iterations, a function distribution, seed - a random intertia
def random_search(
    seed: int,
    iterations: int,
    fn_dist: list[t.Callable]
    ) -> int | float:

    # lets set a random see
    np.random.seed(seed)
    # set our best index to the starting point in our space
    best_index = None
    # setting our best value to negative infinity
    best_value = -np.inf

    # walk all the way through our iterations
    for idx in range(iterations):
      # grab a function index from our function distrbution
      # TODO: We don't just want to grab a function index, we want to randomly
      # select the function index
      fn_idx = np.random.randint(0, len(fn_dist))
      # then we call the function using its index
      val = fn_dist[fn_idx]()
      # compare the function returned from our function
      # the best value yet is if ours is greater than the best value
      # reset the index to the current's value index
      if val < best_value:
        best_value = val
        best_index = fn_idx
    return best_index, best_value


NameError: ignored

In [None]:
import typing as t
import secrets
import numpy as np


from numpy.typing import ArrayLike
from pprint import pprint

""" Seeds are 39 digit, 128-bit based integers that ensures the randomnesss of a function.

    In our case, we're defining a constant that would allow for reproducability of
    the results from a mathematical function.

    These can easily be generated using the `secrets` library from the Python's Standard Lib.

    Example:

      seed = secrets.randbits(128)
      pprint(f'Seed value: {seed}')

    Usage:

      f(seed=SEED_VAL, ...)

"""
SEED_VAL = 121460500873514285830857861626501934360


def random_search(seed: int, fn: t.Callable[[ArrayLike], t.Any], iterations: int = 21):
  reset_counters()
  np.random.seed(seed=seed) or np.random.seed(2023)
  best_val, best_idx = -np.inf, None

  return



'Seed value: 132913521146920668028001316869887677952'


In [None]:
# Let's test our random search


def fnOne:
  return np.random.rand() + 1


def fnTwo():
  return np.random.rand() * 2


def fnThree():
  import math
  return np.random.rand() + math.pow(3, 2)


def fnFour():
  return np.random.rand() + 5

# Our optimization space
decision_space = [fnOne, fnTwo, fnThree, fnFour]

# Set the seed
seed = 25

# perform random serch with 21 iterations
best_idx, best_val = random_search(seed, 1000, decision_space)

print(f'Best value: {best_val}, Best Index: {best_idx}')

In [None]:
# Validation code
x = np.array([4.19, 11.57, 6.69, 10.65])
print("Objective function output, f(x) = ", f(x))
print("Objective function output, g1(x) = ", g1(x))
print("Objective function output, g2(x) = ", g2(x))
print("Objective function output, g3(x) = ", g3(x))
print("Objective function output, g4(x) = ", g4(x))
print("Objective function output, g5(x) = ", g5(x))
print("Objective function output, g6(x) = ", g6(x))
print("Objective function output, g7(x) = ", g7(x))


Objective function output, f(x) =  -3.460621322707576e+24
Objective function output, g1(x) =  [-1.16758738e+25 -6.68779377e+25 -2.68633923e+25 -5.84310809e+25]
Objective function output, g2(x) =  -5.781467603568741e+23
Objective function output, g3(x) =  -1170.0
Objective function output, g4(x) =  2443562876.0280323
Objective function output, g5(x) =  -3.96
Objective function output, g6(x) =  inf
Objective function output, g7(x) =  [2.24492270e+22 1.28586351e+23 5.16503007e+22 1.12345562e+23]


  return 0.0307 * x[0]**(772.8 * np.pi * P0(x) * h(x) * x[2]) - 0.001


In [2]:
import matplotlib.pyplot as plt
import numpy as np
import typing as t
import numpy.typing as npt
from pprint import pprint

Let's go over what we know:

    Decision Vector, x:
    
    x is a set of parameters that defines the design of the hydrostatic thrust bearing. It's a vector (x1,x2,x3,x4)⊤, where:
      * x1​ is the flow rate,
      * x2​ is the recess radius,
      * x3 is the bearing step radius,
      * x4​ is the viscosity of the fluid.

    Optimization Space, X:

    x belongs to the optimization space X, which is a subset of R4; meaning each component of x can take real value.
    Hence, the optimization problem is concerned with finding the values of x within this space that minimize a certain objective function.

Objective Function:

    Objective Function f(x):
    
    The optimization problem, to minimize the operational power loss, given by the objective function f(x), expressed as:
    $𝑓(x)=P0(x)*x1/0.7 + Ef(x)

    where:
        P0(x) is the inlet pressure,
        Ef(x) is the friction loss.



In [6]:
class HydrostaticThrustBearing:

  def __init__(self) -> None:
    # counter map for evaluating functions
    self.counter_map = {
        'f': 0,
        'g1': 0,
        'g2': 0,
        'g3': 0,
        'g4': 0,
        'g5': 0,
        'g6': 0,
        'g7': 0
    }
    self.constraint_fn_type = ('g1', 'g2', 'g3', 'g4', 'g5' 'g6', 'g7')

  def increment_counter(self, func):
    if func in self.counter_map.keys():
      self.counter_map[func] += 1

  def reset_counter(self):
    for fn in self.counter_map:
      self.counter_map[fn] = 0

  def W(self, x):
    self.increment_counter('g1')
    return ((np.pi * self.P0(x)) / 2) * ((x*(2/3)) - (x*(2/2))) / np.log(x[2]/x[1])

  def P0(self, x):
      self.increment_counter('g2')
      return ((6e-6 * x[3] * x[0]) / (np.pi * self.h(x)**3)) * np.log(x[2] / x[1])

  def Ef(self, x):
      self.increment_counter('f')
      return 143.308 * self.Tdelta(x) * x[0]

  def Tdelta(self, x):
      self.increment_counter('g3')
      return 2*(10**self.P(x) - 560)

  def P(self, x):
      self.increment_counter('g4')
      return (np.log10(np.log10(8.122 * x[3] + 0.8)) - 10.04) / -3.55

  def h(self, x):
      self.increment_counter('g5')
      return ((1500 * np.pi / 60)**2) * (2e-6 * np.pi * x[3] / self.Ef(x)) * ((x*(4/3)/4) - (x *(4/2) / 4))

  def f(self, x):
    """
    Finds the values of x that minimize this objective function
    within the defined optimization space X

    The objective function f(x) represents the operational power loss,
    considering the inlet pressure, flow rate, friction loss,
    and relevant geometrical and fluid parameters

    Mathematically,

      Operational power loss, f(x) = (P0​(x) * x1/0.7) ​+ Ef​(x)
    """
    self.increment_counter('f')
    return ((self.P0(x) * x[0])/0.7) + self.Ef(x)

  # ==================================================
  # Constraint Definitions
  #
  # gi​(x), where gi [1,,7]
  #
  # where:
  # - g1(x) is weight capacity > weight of generator
  # - g2(x) is inlet oil pressure
  # - g3(x) is oil temperature rise
  # - g4(x) is oil film thickness
  # - g5(x) is step radius > recess radius
  # - g6(x) is limits on significance off exit loss < 0.001
  # - g7(x) is the limit for contact pressure < 5000

  def g1(self, x):
      self.increment_counter('g1')
      return 101000 - self.W(x)

  def g2(self, x):
      self.increment_counter('g2')
      return self.P0(x) - 1000

  def g3(self, x):
      self.increment_counter('g3')
      return self.Tdelta(x) - 50

  def g4(self, x):
      self.increment_counter('g4')
      return 0.001 - self.h(x)

  def g5(self, x):
      self.increment_counter('g5')
      return x[1] - x[2]

  def g6(self, x):
      self.increment_counter('g6')
      return (0.0307 * x[0]/(772.8 * np.pi * self.P0(x) * self.h(x) * x[2])) - 0.001

  def g7(self, x):
      self.increment_counter('g7')
      return self.W(x) / (np.pi * (x*(2/3) - x(2/2))) - 5000


In [5]:
import secrets

def generate_seed() -> int:
  """ Seeds are 39 digit, 128-bit based integers that ensures the randomnesss of a function.

    In our case, we're defining a constant that would allow for reproducability of
    the results from a mathematical function.

    These can easily be generated using the `secrets` library from the Python's Standard Lib.


    Example:

      seed = secrets.randbits(128)
      pprint(f'Seed value: {seed}')


    Usage:

      SEED_VAL = generate_seed()
      f(seed=SEED_VAL, ...)
  """
  return secrets.randbits(128)


SeedGenerator = t.NewType('SeedGenerator', t.Callable[[...], int])


class Optimizer:
  """
  This is the main algorithm of our system. It runs input through `random_search`
  to find the probablistic distrbution and calculate the global optimal solution
  using the `simulated_annealing` method.

  Usage:

    optimizer = Optimizer(HydrostaticThrustBearing.f, constraint_functions, search_space)

  """

  def __init__(self) -> None:
    pass

  @staticmethod
  def random_search(self, seed: int | SeedGenerator, fn: t.Callable[[npt.ArrayLike], t.Any], iterations: int = 21):
    np.random.seed(seed)

    best_solution = None
    best_fitness = np.inf

    for _ in range(iterations):
        candidate_solution = np.random.uniform(self.search_space[:, 0], self.search_space[:, 1])

        # Check constraints
        constraints = [g(candidate_solution) for g in self.constraint_functions]
        if all(c <= 0 for c in constraints):
            fitness = self.objective_function(candidate_solution)

            if fitness < best_fitness:
                best_fitness = fitness
                best_solution = candidate_solution

    return best_solution, best_fitness

  def simulated_annealing(self, iterations=10000, seed=None):
      np.random.seed(seed)

      current_solution = np.random.uniform(self.search_space[:, 0], self.search_space[:, 1])
      current_fitness = self.objective_function(current_solution)

      best_solution = current_solution
      best_fitness = current_fitness

      temperature = 1.0
      cooling_rate = 0.95

      for _ in range(iterations):
          new_solution = np.random.uniform(self.search_space[:, 0], self.search_space[:, 1])

          # Check constraints
          constraints = [g(new_solution) for g in self.constraint_functions]
          if all(c <= 0 for c in constraints):
              new_fitness = self.objective_function(new_solution)

              if new_fitness < current_fitness or np.random.rand() < np.exp((current_fitness - new_fitness) / temperature):
                  current_solution = new_solution
                  current_fitness = new_fitness

              if current_fitness < best_fitness:
                  best_solution = current_solution
                  best_fitness = current_fitness

          temperature *= cooling_rate

      return best_solution, best_fitness


In [None]:
# Validation for testing our swarm optimization algorithm
#
# Driver code
# Validation code
x = np.array([4.19, 11.57, 6.69, 10.65])

hydrostatic_problem = HydrostaticThrustBearing()

# Objective function
print("Objective function output, f(x) = ", hydrostatic_problem.f(x))

# Constraint functions
print("Constraint function output, g1(x) = ", hydrostatic_problem.g1(x))
print("Constraint function output, g2(x) = ", hydrostatic_problem.g2(x))
print("Constraint function output, g3(x) = ", hydrostatic_problem.g3(x))
print("Constraint function output, g4(x) = ", hydrostatic_problem.g4(x))
print("Constraint function output, g5(x) = ", hydrostatic_problem.g5(x))
print("Constraint function output, g6(x) = ", hydrostatic_problem.g6(x))
print("Constraint function output, g7(x) = ", hydrostatic_problem.g7(x))

Objective function output, f(x) =  -1857042.3872
Constraint function output, g1(x) =  [101000. 101000. 101000. 101000.]
Constraint function output, g2(x) =  -1000.0
Constraint function output, g3(x) =  -1170.0
Constraint function output, g4(x) =  2443562876.0280323
Constraint function output, g5(x) =  -3.96
Constraint function output, g6(x) =  0.0297
Constraint function output, g7(x) =  [-50. -50. -50. -50.]
