In [1]:
import numpy as np

class PSOConstrained:
    """
    Particle Swarm Optimization (PSO) for constrained
    continuous / mixed-integer minimization.

    - Feasible-only: initial positions are feasible.
    - Mixed variables: bounds = [(min, max, "any" or "int"), ...]
    """

    def __init__(
        self,
        objective_func,
        bounds,
        constraint_func=None,
        num_particles=30,
        max_iter=1000,
        w=0.7,           # inertia weight
        c1=1.5,          # cognitive coefficient
        c2=1.5,          # social coefficient
        vmax_frac=0.2,   # max velocity as fraction of variable span
        max_resample=10000,
        seed=5
    ):
        """
        Parameters
        ----------
        objective_func : callable
            f(x) to minimize. x is 1D numpy array.
        bounds : list of tuple
            [(x1_min, x1_max, "any" or "int"), ...]
            3rd element omitted => "any".
        constraint_func : callable or None
            g(x) -> bool, True if feasible.
        num_particles : int
            Number of particles.
        max_iter : int
            Number of iterations.
        w, c1, c2 : float
            PSO parameters.
        vmax_frac : float
            Max velocity magnitude = vmax_frac * (max-min) per dimension.
        max_resample : int
            Max attempts for sampling / repairing feasible position.
        seed : int or None
            RNG seed.
        """
        self.objective_func = objective_func
        self.constraint_func = constraint_func

        self.dim = len(bounds)
        self.bounds = np.array([(b[0], b[1]) for b in bounds], dtype=float)
        self.types = [(b[2].lower() if len(b) > 2 else "any") for b in bounds]

        self.num_particles = num_particles
        self.max_iter = max_iter
        self.w = w
        self.c1 = c1
        self.c2 = c2
        self.vmax_frac = vmax_frac
        self.max_resample = max_resample

        self.rng = np.random.default_rng(seed)

        # For velocity limits / mutation scales
        self.span = self.bounds[:, 1] - self.bounds[:, 0]
        self.vmax = self.vmax_frac * self.span

        # Swarm state
        self.X = None          # positions (num_particles, dim)
        self.V = None          # velocities (num_particles, dim)
        self.pbest_x = None
        self.pbest_f = None
        self.gbest_x = None
        self.gbest_f = None

        self.history_best_f = []

    # ----------------- Helpers -----------------
    def _is_feasible(self, x):
        if self.constraint_func is None:
            return True
        return bool(self.constraint_func(x))

    def _sample_random_position(self):
        """Sample one feasible position within bounds."""
        for _ in range(self.max_resample):
            x = self.rng.uniform(self.bounds[:, 0], self.bounds[:, 1])
            # integer rounding
            for j in range(self.dim):
                if self.types[j] == "int":
                    x[j] = round(x[j])
            x = np.clip(x, self.bounds[:, 0], self.bounds[:, 1])
            if self._is_feasible(x):
                return x
        raise RuntimeError("PSO: Failed to sample a feasible position.")

    def _initialize_swarm(self):
        self.X = np.empty((self.num_particles, self.dim))
        for i in range(self.num_particles):
            self.X[i] = self._sample_random_position()

        # Random initial velocities in [-vmax, vmax]
        self.V = self.rng.uniform(-self.vmax, self.vmax, size=(self.num_particles, self.dim))

        # Personal bests = initial positions
        self.pbest_x = self.X.copy()
        self.pbest_f = np.apply_along_axis(self.objective_func, 1, self.X)

        # Global best
        best_idx = np.argmin(self.pbest_f)
        self.gbest_x = self.pbest_x[best_idx].copy()
        self.gbest_f = self.pbest_f[best_idx]

    def _repair_feasible(self, x):
        """
        If x is infeasible, try some small random moves; if still infeasible,
        resample a new feasible position.
        """
        if self._is_feasible(x):
            return x

        y = x.copy()
        for _ in range(self.max_resample // 2):
            # small uniform step within 10% of span
            step = self.rng.uniform(-0.1 * self.span, 0.1 * self.span)
            y = y + step

            # type handling
            for j in range(self.dim):
                if self.types[j] == "int":
                    y[j] = round(y[j])

            y = np.clip(y, self.bounds[:, 0], self.bounds[:, 1])
            if self._is_feasible(y):
                return y

        # fallback: random feasible
        return self._sample_random_position()

    # ----------------- Main loop -----------------
    def run(self, verbose=False):
        """
        Run PSO optimization.

        Returns
        -------
        gbest_x : np.ndarray
            Best solution found.
        gbest_f : float
            Best objective value.
        history_best_f : list[float]
            Global best f per iteration.
        """
        self._initialize_swarm()
        self.history_best_f = [self.gbest_f]

        for it in range(1, self.max_iter + 1):
            for i in range(self.num_particles):
                # Update velocity
                r1 = self.rng.random(self.dim)
                r2 = self.rng.random(self.dim)

                cognitive = self.c1 * r1 * (self.pbest_x[i] - self.X[i])
                social = self.c2 * r2 * (self.gbest_x - self.X[i])

                self.V[i] = self.w * self.V[i] + cognitive + social

                # Clamp velocity
                self.V[i] = np.clip(self.V[i], -self.vmax, self.vmax)

                # Update position
                new_x = self.X[i] + self.V[i]

                # Type handling
                for j in range(self.dim):
                    if self.types[j] == "int":
                        new_x[j] = round(new_x[j])

                # Bounds
                new_x = np.clip(new_x, self.bounds[:, 0], self.bounds[:, 1])

                # Constraint repair (if needed)
                new_x = self._repair_feasible(new_x)

                # Evaluate
                new_f = self.objective_func(new_x)

                # Update current position
                self.X[i] = new_x

                # Update personal best
                if new_f < self.pbest_f[i]:
                    self.pbest_f[i] = new_f
                    self.pbest_x[i] = new_x.copy()

                    # Update global best
                    if new_f < self.gbest_f:
                        self.gbest_f = new_f
                        self.gbest_x = new_x.copy()

            self.history_best_f.append(self.gbest_f)

            if verbose and (it % max(1, self.max_iter // 10) == 0):
                print(f"Iter {it:6d} | Best f = {self.gbest_f:.6e}")

        return self.gbest_x, self.gbest_f, self.history_best_f


In [2]:
# ==========================================================
# Example 1: Tension/compression spring design problem
# ==========================================================

def spring_obj(x):
    """
    Tension/compression spring design objective function.
    x: [x1, x2, x3]
    f(x) = (x3 + 2) * x2 * x1^2
    """
    x = np.asarray(x, dtype=float)
    x1, x2, x3 = x
    return (x3 + 2.0) * x2 * x1**2


def spring_cons(x):
    """
    Spring design constraints g1..g4(x) <= 0.
    Returns True if all constraints are satisfied.
    """
    x = np.asarray(x, dtype=float)
    x1, x2, x3 = x

    # g1(x) = 1 - (x2^3 * x3) / (71,785 x1^4) <= 0
    g1 = 1.0 - (x2**3 * x3) / (71_785.0 * x1**4)

    # g2(x) = (4x2^2 - x1 x2) / (12,566 (x2 x1^3 - x1^4)) + 1/(5,108 x1^2) - 1 <= 0
    denom = 12_566.0 * (x2 * x1**3 - x1**4)
    if abs(denom) < 1e-12:
        return False
    g2 = (4.0 * x2**2 - x1 * x2) / denom + 1.0 / (5_108.0 * x1**2) - 1.0

    # g3(x) = 1 - 140.45 x1 / (x2^2 x3) <= 0
    g3 = 1.0 - (140.45 * x1) / (x2**2 * x3)

    # g4(x) = (x1 + x2) / 1.5 - 1 <= 0
    g4 = (x1 + x2) / 1.5 - 1.0

    return (g1 <= 0.0) and (g2 <= 0.0) and (g3 <= 0.0) and (g4 <= 0.0)


bounds_spring = [
    (0.05, 2.0, "any"),   # x1
    (0.25, 1.3, "any"),   # x2
    (2.0, 15.0, "any")    # x3
]


# ==========================================================
# Example 2: Pressure vessel design problem
# ==========================================================

def pressure_vessel_obj(x):
    """
    Pressure vessel design objective function.
    x: [x1, x2, x3, x4]
    f(x) = 0.6224 x1 x3 x4 + 1.7781 x2 x3^2 + 3.1661 x1^2 x4 + 19.84 x1^2 x3
    """
    x = np.asarray(x, dtype=float)
    x1, x2, x3, x4 = x

    return (
        0.6224 * (0.0625*x1) * x3 * x4 + 1.7781 * (0.0625*x2) * x3**2 + 3.1661 * (0.0625*x1)**2 * x4 + 19.84  * (0.0625*x1)**2 * x3
    )


def pressure_vessel_cons(x):
    """
    Pressure vessel design constraints g1..g4(x) <= 0.
    Returns True if all constraints are satisfied.
    """
    x = np.asarray(x, dtype=float)
    x1, x2, x3, x4 = x

    # g1(x) = -x1 + 0.0193 x3 <= 0
    g1 = -0.0625*x1 + 0.0193 * x3

    # g2(x) = -x2 + 0.00954 x3 <= 0
    g2 = -0.0625*x2 + 0.00954 * x3

    # g3(x) = -pi x3^2 x4 - (4/3) pi x3^3 + 1,296,000 <= 0
    g3 = -np.pi * x3**2 * x4 - (4.0 / 3.0) * np.pi * x3**3 + 1_296_000.0

    # g4(x) = x4 - 240 <= 0
    g4 = x4 - 240.0

    return (g1 <= 0.0) and (g2 <= 0.0) and (g3 <= 0.0) and (g4 <= 0.0)


bounds_pressure_vessel = [
    (1.0, 99.0, "int"),  # x1 (thickness, integer multiple of 0.0625 if scaled)
    (1.0, 99.0, "int"),  # x2
    (10.0,   200.0,        "any"),   # x3
    (10.0,   200.0,        "any"),   # x4
]

# ==========================================================
# Example 3: 3bar truss design
# ==========================================================
def truss_obj(x):
    """
    Objective function:
        f(x) = (2 * sqrt(2) * x1 + x2) * L
    x : [x1, x2]
    """
    x = np.asarray(x, dtype=float)
    x1, x2 = x

    return (2.0 * np.sqrt(2.0) * x1 + x2) * 100

def truss_cons(x):
    """
    Constraint set:
        g1(x) = ((sqrt(2)*x1 + x2) / sqrt(2*x1^2 + 2*x1*x2)) * P - SIGMA <= 0
        g2(x) = (x2 / sqrt(2*x1^2 + 2*x1*x2)) * P - SIGMA <= 0
        g3(x) = (1 / sqrt(2*x2 + x1)) * P - SIGMA <= 0

    Returns True if all constraints are satisfied.
    """
    x = np.asarray(x, dtype=float)
    x1, x2 = x

    # Common denominator term for g1, g2
    denom = np.sqrt(2.0) * x1**2 + 2.0 * x1 * x2
    if denom < 1e-12:
        # Avoid division by zero -> treat as infeasible
        return False

    g1 = ((np.sqrt(2.0) * x1 + x2) / denom) * 2 - 2
    g2 = (x2 / denom) * 2 - 2

    # Denominator for g3
    denom3 = np.sqrt(2.0) * x2 + x1
    if denom3 < 1e-12:
        return False

    g3 = (1.0 / denom3) * 2 - 2

    return (g1 <= 0.0) and (g2 <= 0.0) and (g3 <= 0.0)

bounds_truss = [
    (0.0, 1.0, "any"),  # x1
    (0.0, 1.0, "any")   # x2
]

# ==========================================================
# Example 4: speed reducer optimization
# ==========================================================

def speed_obj(x):
    x = np.asarray(x, dtype=float)
    x1, x2, x3, x4, x5, x6, x7 = x

    return (
        0.7854 * x1 * x2**2 * (3.3333 * x3**2 + 14.9334 * x3 - 43.0934)
        - 1.508 * x1 * (x6**2 + x7**2)
        + 7.4777 * (x6**3 + x7**3)
        + 0.7854 * (x4 * x6**2 + x5 * x7**2)
    )

def speed_cons(x):
    """
    Return True if all constraints g1..g11(x) <= 0.
    """
    x = np.asarray(x, dtype=float)
    x1, x2, x3, x4, x5, x6, x7 = x

    # g1 ~ g4
    g1 = 27.0 / (x1 * x2**2 * x3) - 1.0
    g2 = 397.5 / (x1 * x2**2 * x3**2) - 1.0
    g3 = (1.93 * x4**3) / (x2 * x3 * x6**4) - 1.0
    g4 = (1.93 * x5**3) / (x2 * x3 * x7**4) - 1.0

    # g5, g6
    g5 = (1.0 / (110.0 * x6**3)) * np.sqrt(((745.0 * x4) / (x2 * x3))**2 + 16.9e6) - 1.0
    g6 = (1.0 / (85.0 * x7**3)) * np.sqrt(((745.0 * x5) / (x2 * x3))**2 + 157.5e6) - 1.0

    # g7 ~ g11
    g7  = (x2 * x3) / 40.0 - 1.0
    g8  = (5.0 * x2) / x1 - 1.0
    g9  = (x1 / (12.0 * x2)) - 1.0
    g10 = (1.5 * x6 + 1.9) / x4 - 1.0
    g11 = (1.1 * x7 + 1.9) / x5 - 1.0

    return (
        g1 <= 0 and g2 <= 0 and g3 <= 0 and g4 <= 0 and
        g5 <= 0 and g6 <= 0 and g7 <= 0 and g8 <= 0 and g9 <= 0 and g10 <= 0 and g11 <= 0
    )

bounds_speed = [
    (2.6, 3.6, "any"),   # x1
    (0.7, 0.8, "any"),   # x2
    (17, 28, "int"), # x3
    (7.3, 8.3, "any"),   # x4
    (7.8, 8.3, "any"),   # x5
    (2.9, 3.9, "any"),   # x6
    (5.0, 5.5, "any"),   # x7
]

In [6]:
pso = PSOConstrained(
    objective_func=spring_obj,
    bounds=bounds_spring,
    constraint_func=spring_cons
)

pso_best_x, pso_best_f, pso_hist = pso.run(verbose=True)
print("Best feasible solution:", pso_best_x)
print("Best objective value:", pso_best_f)

Iter    100 | Best f = 1.293939e-02
Iter    200 | Best f = 1.280173e-02
Iter    300 | Best f = 1.280173e-02
Iter    400 | Best f = 1.280173e-02
Iter    500 | Best f = 1.280173e-02
Iter    600 | Best f = 1.280173e-02
Iter    700 | Best f = 1.280173e-02
Iter    800 | Best f = 1.275781e-02
Iter    900 | Best f = 1.275781e-02
Iter   1000 | Best f = 1.275781e-02
Best feasible solution: [ 0.05        0.31718053 14.089026  ]
Best objective value: 0.0127578146636751
