In [1]:
from abc import ABC, abstractmethod
import numpy as np


class BasePointsGenerator(ABC):
    def __init__(self, n):
        """
        Generates points array on the segment [0.0; 1.0] with shape (n + 2, n)
        Attributes:
            n (int): dimension
        """
        self._n = n
        self._points = None
    
    @abstractmethod
    def generate(self):
        """
        Creates points array on the segment [0.0; 1.0]
        Returns:
            np.ndarray: array of points
        """
        raise NotImplemented()
        
        
class FullFactorialGenerator(BasePointsGenerator):
    
    @staticmethod
    def _gen_binary_arr(dim):
        bin_seed = np.array([[0.], [1.]])
        bin_arr = bin_seed
        for i in range(1, dim):
            bin_arr = np.hstack((np.repeat(bin_seed, bin_arr.shape[0], 0), np.tile(bin_arr, (2, 1))))
        return bin_arr
    
    def _max_permissible_power(self):
        return int(np.log2(self._n + 2.))

    def _fill_horizontal(self, points, fill_arr, start_row, start_col, finish_col):
        n_repeats = (finish_col - start_col) // fill_arr.shape[1]
        last_col = start_col + n_repeats * fill_arr.shape[1]
        points[start_row:(start_row+fill_arr.shape[0]),start_col:last_col] = np.tile(fill_arr, (1, n_repeats))
        return points, last_col
    
    def _fill_vertical(self, points, fill_arr, start_col, start_row, finish_row):
        n_repeats = (finish_row - start_row) // fill_arr.shape[0]
        last_row = start_row + n_repeats * fill_arr.shape[0]
        points[start_row:last_row, start_col:(start_col + fill_arr.shape[1])] = np.tile(fill_arr, (n_repeats, 1))
        return points, last_row
    
    def _generate(self):
        # Special case
        if self._n == 1:
            return np.array([
                [0],
                [0.5],
                [1]
            ])

        points = np.zeros((self._n + 2, self._n))
        max_power = self._max_permissible_power()
        binary_arr = FullFactorialGenerator._gen_binary_arr(max_power)
        # Copy binary array to result matrix and fill at along the cols        
        points, unfilled_col = self._fill_horizontal(points, binary_arr, 0, 0, self._n)
        # Fill right part of array if needed
        if unfilled_col < self._n:
            additional_power = self._n % max_power
            additional_binary_arr = FullFactorialGenerator._gen_binary_arr(additional_power)
            points, _ = self._fill_vertical(points, additional_binary_arr, unfilled_col, 0, binary_arr.shape[0])
            reverse_arr = additional_binary_arr[::-1]
            points, unfilled_row = self._fill_vertical(points, reverse_arr, unfilled_col, binary_arr.shape[0], self._n + 2)
            if unfilled_row < self._n+2:
                rows_to_fill = self._n+2 - unfilled_row
                points[unfilled_row:unfilled_row + rows_to_fill, unfilled_col:] = reverse_arr[:rows_to_fill]

        # Fill bottom of array with shift
        temp_root = 0
        temp_root_shift_idx = temp_root
        for i in range(binary_arr.shape[0], self._n + 2):
            temp_horiz_idx = 0
            while temp_horiz_idx < unfilled_col:
                points[i, temp_horiz_idx:temp_horiz_idx + max_power] = points[temp_root_shift_idx, 0:max_power]
                temp_root_shift_idx += 1
                temp_horiz_idx += max_power
            temp_root += 1
            temp_root_shift_idx = temp_root

        return points
    
    def generate(self):
        if self._points is None:
            self._points = self._generate()
        return self._points.copy()
       
    

In [19]:
import numpy as np
from numpy.linalg import det


class Model:
    EPS = 1e-11
    
    class Result:
        
        def __init__(self, weights, error, is_linear):
            self.weights = weights
            self.error = error
            self.is_linear = is_linear
        
        def __str__(self):
            return "Results\n" +\
                 "Weights: {}\n".format(self.weights) +\
                 "RMSE error: {}\n".format(self.error) +\
                 "Is linear: {}".format(self.is_linear)
            
        def __repr__(self):
            return self.__str__()
    
    def __init__(self, f, generator, boundaries, max_error=1e-5):
        self._f = f
        self._generator = generator
        self._boundaries = np.array(boundaries, copy=False, dtype=float, ndmin=2)
        self._max_error = max_error
        
        assert self._boundaries.shape[1] == 2
    
    @staticmethod
    def _rcondh(xtx):
        """
        Calculate reciprocal condition number for the symmetrical, positive semi-definite matrix xtx
        """
        ev = np.linalg.eigvalsh(xtx) 
        return max(0., ev.min() / max(ev.max(), np.finfo(float).eps))
    
    @staticmethod
    def _min_rcond_xtx_submatrices(x):
        """
        Calculate minimal reciprocal condition number of the matrix x'x
        and all matrices y_i'y_i where y_i is matrix x without i-th row
        """
        xtx = np.dot(x.T, x)
        min_rcond = Model._rcondh(xtx)
      
        for i in range(x.shape[0]):
            # we don't need to recalculate the whole matrix, we can just update it
            yty = xtx - np.dot(x[i].reshape(-1, 1), x[i].reshape(1, -1))
            min_rcond = min(min_rcond, Model._rcondh(yty))
            if not min_rcond:
                return min_rcond # zero is the absolute minimum
      
        return min_rcond
      
    def _regularize_matrix(self, matrix, bounds_threshold=0.1, rcond_threshold=1.e-5, seed=65521):
        """
        Regularize matrix by stepping out of boundaries
        """
        if Model._min_rcond_xtx_submatrices(matrix) >= rcond_threshold:
            return matrix # the initial matrix is well-conditioned
      
        rnd_state = np.random.get_state() # save global random generator state
        try:
            np.random.seed(seed) # let's make it deterministic
            indices_lo = matrix == self._boundaries[:, 0].reshape(1, -1) # points at the lower bound
            indices_up = matrix == self._boundaries[:, 1].reshape(1, -1) # points at the upper bound
            n_indices_lo, n_indices_up = np.count_nonzero(indices_lo), np.count_nonzero(indices_up) # cache number of lower and upper bounds
        
            # We modify zeros and ones only while in general case 
            # the matrix can have linearly dependent "fixed" columns and/or rows
            # Such a problem is unsolvable. So, let's limit the number of trials.
        
            for _ in range(100):
                update_matrix = np.random.uniform(0, bounds_threshold, size=matrix.shape) * np.ptp(self._boundaries, axis=1).reshape(1, -1)
          
                reg_matrix = matrix.copy()
                reg_matrix[indices_lo] += update_matrix[indices_lo]
                reg_matrix[indices_up] -= update_matrix[indices_up]
                if Model._min_rcond_xtx_submatrices(reg_matrix) >= rcond_threshold:
                    return reg_matrix
          
            return None
        finally:
            np.random.set_state(rnd_state) # restore global random generator state
    
    def _map_boundaries(self, matrix):
        indices_hi = matrix == 1.
      
        matrix = matrix * np.ptp(self._boundaries, axis=1).reshape(1, -1) + self._boundaries[:, 0].reshape(1, -1)
      
        # due to numerical issues the upper bounds may be out of upper boundaries 
        # but the _regularize_matrix method expects exact positions
        for j, ub in enumerate(self._boundaries[:, 1]):
            matrix[indices_hi[:, j], j] = ub
        
        return matrix
    
    def _fit(self, user_points, bounds_threshold=0.1, rcond_threshold=1.e-5):
        gen_points = self._generator.generate()
        gen_points = self._map_boundaries(gen_points)
        gen_points = self._regularize_matrix(gen_points, bounds_threshold=bounds_threshold, rcond_threshold=rcond_threshold)
        if user_points is None: 
            return gen_points
        
        n = self._boundaries.shape[0]
        if (n == user_points.shape[1]) and (user_points.shape[0] == n + 2):
            if Model._min_rcond_xtx_submatrices(user_points) > rcond_threshold:
                return
        
        ind_to_exclude = []
        
        for i in range(user_points.shape[0]):
            determinant = 0
            index = None
            for j in range(gen_points.shape[0]):
                if j in ind_to_exclude:
                    continue
                temp_points = gen_points[j].copy()
                gen_points[j] = user_points[i]
                ptp = np.dot(gen_points.T, gen_points)
                if Model._min_rcond_xtx_submatrices(gen_points) > rcond_threshold:
                    temp_det = np.abs(det(ptp))
                    if temp_det > determinant:
                        determinant = temp_det
                        index = j
                gen_points[j] = temp_points
            if index is not None:
                gen_points[index] = user_points[i]
                ind_to_exclude.append(index)
        return gen_points

    @staticmethod
    def _predict_weights(X, y):
        X = np.hstack((np.ones((X.shape[0], 1)), X))
        weights = np.linalg.lstsq(X, y, rcond=Model.EPS)[0]
        return weights
        
    def predict(self, points=None, bounds_threshold=0.1, rcond_threshold=1.e-5, verbose=False):
        points = self._fit(points, bounds_threshold, rcond_threshold)
        if points is None:
            raise Exception("Regularization failed")
        n_points = points.shape[0]
        
        y = np.array([self._f(x_0) for x_0 in points]).reshape(n_points, -1)
        y_norm = np.std(y, axis=0)
        y_norm[y_norm > 0.] = 1. / y_norm[y_norm > 0.]
        
        # Array of residuals on cross-validation.
        residuals = np.zeros(y.shape)

        # Leave-One cross validation
        idx_vec = np.ones(n_points, dtype=bool)
        for i in range(n_points):
            idx_vec[i] = False # mark i-th vector to exclude
            weights = Model._predict_weights(points[idx_vec], y[idx_vec]) # calculate weights
            
            # reshapes are requered to handle 1D as well as ND
            y_predicted = np.dot(np.hstack((1., points[i])).reshape(1, -1), weights).reshape(-1)
            residuals[i] = (y_predicted - y[i]) * y_norm # note the sign does not matter because we're going to calculate squared errors
        
            if verbose:
                print("rcond = %g" % self._rcondh(np.dot(points[idx_vec].T, points[idx_vec])))
                print("resid = %s" % residuals[i])
                print(weights)
                print("-"*10)
            
            idx_vec[i] = True # restore i-th vector state
          
        # calculate componentwise normalized root-mean-squared error
        RRMSE = np.hypot.reduce(residuals) / np.sqrt(y.shape[0])
        
        # now reduce RRMSE to a single value
        RRMSE = np.hypot.reduce(RRMSE) / np.sqrt(y.shape[1])
        
        weights = Model._predict_weights(points, y)
        
        return Model.Result(weights, RRMSE, RRMSE <= self._max_error)


In [21]:
def noisy_linear(x):
    # Тут шум будет стохастический, поэтому RMSE будет порядка 1e-2
    return 3 * x[0] - 2 * x[1] + 5 * x[2] + 5 + np.random.normal(scale=0.01)
  
def quadratic(x):
    return 3 * x[0] - 2 * x[1] * x[2] + 5 * x[2] + 5.

borders = np.array([
                    [2, 4],
                    [5, 6],
                    [7, 8]
                   ])

gen = FullFactorialGenerator(3)
print("Noisy linear model 5 + 3 * x0 - 2 * x1 + 5 * x2 + eps")
m = Model(noisy_linear, gen, borders, max_error=0.1)
print(m.predict(bounds_threshold=0.1, rcond_threshold=1.e-8, verbose=False))
print("-"*40)
print(m.predict(bounds_threshold=0.1, rcond_threshold=1.e-5, verbose=False))
print("-"*40)
print(m.predict(bounds_threshold=0.1, rcond_threshold=1.e-4, verbose=False))
print("-"*40)
m = Model(noisy_linear, gen, borders, max_error=0.1)
m.predict(points=np.array([
    [2.5, 5.5, 8],
    [2.8, 5.2, 7.6]
]))
print(m.predict())
print("Qaudratic model 5 + 3 * x0 - 2 * x1 * x2 + 5 * x2")
m = Model(quadratic, gen, borders, max_error=0.1)
print(m.predict(bounds_threshold=0.1, rcond_threshold=1.e-5, verbose=False))


Noisy linear model 5 + 3 * x0 - 2 * x1 + 5 * x2 + eps
Results
Weights: [[ 5.05256907]
 [ 2.99901179]
 [-1.98683505]
 [ 4.98415707]]
RMSE error: 0.07867046689102043
Is linear: True
----------------------------------------
Results
Weights: [[ 5.02286826]
 [ 2.99891798]
 [-1.99811325]
 [ 4.99517225]]
RMSE error: 0.07572613038460857
Is linear: True
----------------------------------------
Results
Weights: [[ 4.80983152]
 [ 3.00282172]
 [-2.00648437]
 [ 5.02734062]]
RMSE error: 0.06400495569995628
Is linear: True
----------------------------------------
Results
Weights: [[ 5.02837195]
 [ 3.00622423]
 [-2.01686151]
 [ 5.0063991 ]]
RMSE error: 0.07519750358994039
Is linear: True
Qaudratic model 5 + 3 * x0 - 2 * x1 * x2 + 5 * x2
Results
Weights: [[ 85.]
 [  3.]
 [-16.]
 [ -5.]]
RMSE error: 1.2070419244868311
Is linear: False
