In [2]:
import numpy as np
import pandas as pd

In [78]:
def var12_func(x, params):
    b1, b2, b3, b4 = params
    return b1 - b2 * x - (1 / np.pi) * np.arctan(b3 / (x - b4))

In [91]:
class Model:
    def __init__(self, X, y, regression_function, *, train_split=0.8):
        '''
        Intialize Model class

        X: 1-D array. Features array
        y: 1-D array. Labels array
        train_split: float in (0, 1) 
        regression_funcion: function to be fitted.
        '''
        self.train_test_partition(X, y, train_split)

        self.fit_function = regression_function

    def train_test_partition(self, X, y, train_split):
        '''
        Splitting data into test and train sets
        '''
        n = int(len(X) * train_split)

        data = np.array(list(zip(X, y)))
        np.random.shuffle(data)
        self.X_train, self.y_train = data[:n, 0], data[:n, 1]
        self.X_test, self.y_test = data[n:, 0], data[n:, 1]


    def loss(self, params):
        '''
        Computes loss function
        '''
        y_hat = self.fit_function(x=self.X_train, params=params)
        mse = np.mean((y_hat - self.y_train) ** 2) / 2
        return mse
    
    def test_loss(self, params):
        '''
        Computes loss function on test set
        '''
        y_hat = self.fit_function(x=self.X_test, params=params)
        mse = np.mean((y_hat - self.y_test) ** 2) / 2
        return mse

In [94]:
class PSO:
    def __init__(self, num_pop, num_dim, cognitive, social, bounds, min_velocity, max_velocity):
        '''
        num_pop: number of particles
        num_dim: dimesionality
        cognitive: cognitive component coeficient
        social: social component coeficient
        left_bound: left bound of particles search area
        right_bound right bound of particles search area
        min_velocity: minimal allowed velocity
        max_velocity: maximum allowed velocity
        '''
        assert 0 < cognitive < 4, "Cognitive coeficient must be in (0, 4)"
        assert 0 < social < 4, "Social coeficient must be in (0, 4)"
        assert np.all(max_velocity > 0), "Maximum velocity must be positive"

        self.num_pop = num_pop
        self.num_dim = num_dim

        self.cognitive = cognitive
        self.social = social

        self.left_bound, self.right_bound = bounds[:, 0], bounds[:, 1]

        self.min_velocity = min_velocity
        self.max_velocity = max_velocity

        # Initializing particles
        
        self.positions = np.random.rand(self.num_pop, self.num_dim) * (self.right_bound - self.left_bound) + self.left_bound
        
        self.best_positions = np.copy(self.positions) # initializing best positions as starting positions

        self.velocities = np.random.rand(self.num_pop, self.num_dim) * (self.max_velocity - self.min_velocity) + self.min_velocity

    def clip_position_higer(self, position):
        return np.where(position > self.right_bound, self.right_bound - abs(position - self.right_bound), position)
    
    def clip_positon_low(self, position):
        return np.where(position > self.left_bound, self.left_bound + abs(position - self.left_bound), position)
    
    def main_loop(self, model, num_iter=20, *, verbose=True):
        objective_function = model.loss
        best_fittnes = np.apply_along_axis(objective_function, axis=1,  arr=self.positions)
        mask = np.zeros(self.num_pop, bool)

        best_fittnes = np.where(mask, np.inf, best_fittnes) # Filtering values

        leader = np.argmin(best_fittnes)

        # Initializing result variables
        global_minimum_position = self.positions[leader]
        global_minimum_value = np.min(best_fittnes)

        for i in range(num_iter):
            # Random vectors
            r1 = np.random.rand(1, self.num_dim)
            r2 = np.random.rand(1, self.num_dim)

            # Modifying velocities
            self.velocities += (self.cognitive * (self.best_positions - self.positions) * r1 + 
                                    self.social * (global_minimum_position - self.positions) * r2)
            
            self.velocities = np.maximum(self.velocities, self.min_velocity)
            self.velocities = np.minimum(self.velocities, self.max_velocity)
            
            # Modifying positons
            self.positions += self.velocities

            # inverting velocities depending on particle position
            self.velocities = np.where((self.positions < self.left_bound) | (self.best_positions > self.right_bound), -self.velocities, self.velocities)

            # Clipping positions
            self.positions = self.clip_position_higer(self.positions)
            self.positions = self.clip_positon_low(self.positions)

            # Updating personal record
            fittnes = np.apply_along_axis(objective_function, axis=1,  arr=self.positions)

            # Updating postions
            for j in range(self.num_pop):
                if fittnes[j] < best_fittnes[j]:
                    self.best_positions[j] = self.positions[j]

            best_fittnes = np.minimum(best_fittnes, fittnes)

            best_fittnes = np.where(mask, np.inf, best_fittnes) # Filtering values
            
            leader = np.argmin(best_fittnes)


            # Upadating result
            if global_minimum_value > objective_function(self.positions[leader]) and not np.isinf(best_fittnes[leader]):
                global_minimum_position = np.copy(self.positions[leader])
                global_minimum_value = objective_function(self.positions[leader])

            if verbose:
                print("Ітерація %i\nЗначення %.3f\nТочка %s\n" % (i + 1, global_minimum_value, global_minimum_position))


        return np.squeeze(global_minimum_position)
    

In [95]:
var12 = pd.read_excel("DataRegression.xlsx", sheet_name="Var12")
X, y = var12['x'].to_numpy(), var12['y'].to_numpy()

In [98]:
bounds_var12 = np.array([
    (-1, 1), (-1, 1), (1000, 2000), (-200, 200)
])
model = Model(X, y, var12_func)
test = PSO(50, len(bounds_var12), 0.2, 0.4, bounds_var12, -1, 1)
params_var12 = test.main_loop(model, 100, verbose=False)
print("Loss on test set", model.test_loss(params_var12))

Loss on test set 0.19499228405757243
