In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl

from sklearn.preprocessing import PolynomialFeatures

In [None]:
class LinearRegression:
    """
    Linear Regression.
    
    Parameters
    -------
        data: np.ndarray
            A (N x D) matrix containing data.
        
        target: np.ndarray
            A (N x 1) vector which corresponds to the outcomes of given data.
        
        prior_variance: float
            The variance of the prior distribution.
            
    Methods
    -------
    _validate_params_type():
        Checks data types of the parameters, which must be np.ndarray for 'data' and 'target', float for 'prior_variance'.
    
    _validate_params_dim():
        Checks the dimensions of the parameters, 'data' is a nxm matrix, 'target' is a nx1 vector, 'prior_variance' is a single number.
    
    _validate_pred_method():
        Checks whether the given prediction method is availble, current list of prediction methods is:
            Maximum Likelihood Estimation, or MLE
            Maximum A Posteriori Estimation, or MAP
            Noise Variance Estimation, or NVE
            Root Mean Square Error, or RMSE/RMSD
        
    _select_method(): 
        Selects the preferred prediction method.
    
    MLE():
        Computes the most likely set of parameters given the data and target,
            theta = ( X.T @ X )^-1 @ X.T @ Y
    
    NVE():
        Computes the estimated noise variance,
            sigma = 1/N * sum( y_n - x_n.T . theta )^2
    
    MAP():
        Computes the most likely set of parameters given the data, target and prior distribution,
            theta = ( X.T @ X + sigma^2/b^2 * I )^-1 @ X.T @ Y
    
    RMSE():
        Compute the average squared error (deviation),
            RMSD = sqrt(sigma)
        
        for:
            X := (n x d) data matrix
            Y := (n x 1) target vector
            I := (n x n) identity matrix
            theta := (d x 1) optimal parameter vector
            sigma := estimated noise variance
            RMSD := root mean squared deviation (error)
            x_n := nth data point
            y_n := nth target value
            b := prior variance
    
    predict():
        Produces a prediction based on optimal theta
    """
    def __init__(self, data: np.ndarray, target: np.ndarray, prior_variance=0):
        self._validate_params_type(data, target, prior_variance)
        self._validate_params_dim(data, target, prior_variance)
        self.data = data
        self.target = target
        self.prior_variance = prior_variance
        self.prediction_method = prediction_method
        
    def _validate_params_type(self, data: np.ndarray, target: np.ndarray, prior_variance: float) -> None:
        if type(data) is not np.ndarray:
            raise Exception("Data must be a NumPy array.")
        if type(target) is not np.ndarray:
            raise Exception("Data must be a NumPy array.")
        if prior_variance != 0:
            if type(prior_variance) is not float:
                raise Exception("Prior variance must be a number.")
    
    def _validate_params_dim(self, data: np.ndarray, target: np.ndarray, prior_variance: float) -> None:
        n_features = data.shape[0]
        n_targets = target.shape[0]
        if n_features != n_targets:
            raise Exception(f"Mismatched dimensions between data matrix and target vector. \
                            data_ndim = {data.shape}, target_ndim = {target.shape}")
        if len(prior_variance) > 1:
            raise Exception("Prior variance must be a single number.")
                
    def _validate_pred_method(self, prediction_method: str) -> None:
        method_list = [
            "Maximum Likelihood Estimation", "MLE",
            "Maximum A Posteriori Estimation", "MAP",
            "Noise Variance Estimation", "NVE",
            "Root Mean Square Error", "RMSE",
        ]
        if prediction_method not in method_list:
            raise Exception("Prediction method not available.")
    
    def _select_method(self, prediction_method: str):
        if (prediction_method == "Maximum Likelihood Estimation") or (prediction_method == "MLE"):
            optimal_param = self.MLE()
        if (prediction_method == "Maximum A Posteriori Estimation") or (prediction_method == "MAP"):
            optimal_param = self.MAP()
        
        return optimal_param
    
    def MLE(self) -> np.ndarray:
        data = self.data
        target = self.target
        inv_matrix = np.linalg.inv(data.T @ data)
        optimal_param = inv_matrix @ data.T @ target
        return optimal_param
    
    def NVE(self) -> float:
        n_data = self.data.shape[0]
        prediction = self.predict(prediction_method)
        total_error_squared = np.sum((self.target - prediction)**2)
        mean_square_error = total_error_squared / n_data
        return mean_square_error
    
    def MAP(self) -> np.ndarray:
        if self.prior_variance == 0:
            raise Exception(f"Prior variance must be provided. prior_variance = {prior_variance}")
        
        data = self.data
        target = self.target
        noise_variance = self.NVE()
        diag_value = noise_variance / prior_variance
        diag_matrix = np.diag(np.array([diag_value for _ in range(target.shape[0])]))
        inv_matrix = np.linalg.inv((data.T @ data) + diag_matrix)
        optimal_param = inverse_matrix @ data.T @ target
        return optimal_param
    
    def RMSE(self) -> float:
        mean_square_error = self.NVE()
        root_mse = np.sqrt(mean_square_error)
        return root_mse
    
    def predict(self, prediction_method: str)  -> np.ndarray:
        self._validate_pred_method(prediction_method)
        optimal_param = self._select_method(prediction_method)
        data = self.data
        prediction = np.matmul(self.data, optimal_param)
        return prediction

In [2]:
class LinearRegression:
    """
    """
    def __init__(self, data, n_features, target, n_targets, \
                 pred_method=None, prior_variance=None):
        self.data = np.array(data)
        assert type(n_features) == int
        self.n_features = n_features
        
        self.target = np.array(target)
        assert type(n_targets) == int
        self.n_targets = n_targets
        
        self.method_dict = {
                "Maximum Likelihood Estimation": self.MLE,
                "Noise Variance Estimation": self.NVE,
                "Maximum A Posteriori Estimation": self.MAP,
            }
        
        if (pred_method is not None) \
            and (pred_method not in self.method_dict.keys()):
            raise Exception("There's no prediction method as " + pred_method + "\n" + \
                            "The only methods available are: \n" + self.method_dict_keys())
        elif (pred_method == "Maximum A Posteriori Estimation") \
            and (prior_variance is None):
            raise Exception("Please provide the prior variance if you wish to choose" + \
                            "this method of prediction")
        else:
            self.pred_method = pred_method
            self.prior_variance = prior_variance
    
    # Check if data and target matrix are in correct shapes
    def _check_ndim(method):
        def exe_method_wcond(self):
            data = self.data
            target = self.target

            if (data.shape[-1] == self.n_features) \
                and (target.shape[0] == self.n_targets):
                try:
                    return method()
                except Exception as error:
                    print(error)
            if data.shape[-1] != self.n_features:
                raise Exception("Incorrect dimensions for input.")
            if target.shape[-1] != self.n_targets:
                raise Exception("Incorrect dimensions for target.")
            if (self.pred_method == "Maximum A Posteriori Estimation") \
                and (self.prior_variance.ndim > 1):
                raise Exception("Incorrect dimensions for prior variance.")                    

        return exe_method_wcond

    # Compute the parameter values using log-likelihood technique
    @_check_ndim
    def MLE(self):
        data_matrix = self.data
        trans_matrix = np.tranpose(data_matrix)
        inv_matrix = np.linalg.inv(np.matmul(trans_matrix, data_matrix))
        pred_parameter = inv_matrix
        for operand in [trans_matrix, self.target]:
            pred_parameter = np.matmul(pred_parameter, operand)

        return pred_parameter
    
    # Compute the noise variance using log-likelihood technique
    @_check_ndim
    def NVE(self):
        pred_target = self.predict("Noise Variance Estimation")
        total_error = np.sum( (self.target - pred_target)**2 )
        mean_squared_error = total_error / self.n_targets

        return mean_squared_error

    # Compute the parameter values using log-posterior technique,
    # which requires knowdledge about the prior distribution and noise variance
    @_check_ndim
    def MAP(self):
        noise_variance = self.NVE()
        diag_value = noise_variance / self.prior_variance
        temp_matrix = np.array([diag_value for num in range(self.n_targets)])
        diag_matrix = np.diag(temp_matrix)

        data_matrix = self.data
        trans_matrix = np.tranpose(data_matrix)
        inv_matrix = np.linalg.inv(np.matmul(trans_matrix, data_matrix) + \
                                   diag_matrix)
        pred_parameter = inv_matrix
        for operand in [trans_matrix, self.target]:
            pred_paramater = np.matmul(pred_parameter, operand)

        return pred_parameter
            
    # Compute the predictive values
    def predict(self, pred_method=None):
        if pred_method is None:
            pred_method = self.pred_method
        trans_matrix = np.transpose(self.data)
        parameter = self.method_dict[pred_method]()
        pred_target = np.matmul(trans_matrix, parameter)
        
        return pred_target
    
    # Compute the standard deviation of the predictive errors/noise
    def RMSE(self):
        mean_squared_error = self.NVE()
        rmse = np.sqrt(mean_squared_error)
        
        return rmse

In [9]:
class BayesianLinearRegression(LinearRegression):
    def __init__(self, data, n_features, target, n_targets, \
                 prior_mean, ndim_mean, prior_variance, ndim_variance):    
        super().__init__(data, n_features, target, n_targets, 
                         pred_method="Noise Variance Estimation")
        self.prior_mean = np.array(prior_mean)
        self.ndim_mean = ndim_mean
        self.prior_variance = prior_variance
        self.ndim_variance = ndim_variance
              
        self.method_dict = {
                "Prior Predictions": self.prior_prediction,
                "Posterior Distribution": self.posterior_distribution,
                "Posterior Predictions": self.posterior_prediction,
            }
    
    def _check_ndim_prior(method):
        def exe_method_wcond(self, *args, **kwargs):
            n_features = self.n_features
            ndim_mean = self.ndim_mean
            ndim_variance = self.ndim_variance
            
            if ((n_features, 0) == ndim_mean) \
                and ((n_features, n_features) == ndim_variance):
                try:
                    return method(*args, **kwargs)
                except Exception as error:
                    print(error)
            if ndim_mean != (n_features, 0):
                raise Exception("Incorrect dimensions for prior mean.")
            if ndim_variance != (n_features, n_features):
                raise Exception("Incorrect dimensions for prior variance.")

        return exe_method_wcond
    
    def _draw_distribution(self, mean, std):
        fig = plt.figure()
        ax = plt.axes()
        
        x = np.linspace(mean - 4*std, mean + 4*std, 1000)
        y = (1 / np.sqrt( 2*np.pi*(std**2) ) ) * \
         np.exp( -(x - mean)**2 / 2*(std**2) )
        ax.plot(x, y, color="royalblue")
    
        xticks = ax.get_xticks()
        ax.xaxis.set_major_locator(plt.FixedLocator(xticks))
        ax.set_xticklabels(xticks, rotation=-90)
        ax.set_xlabel("Predictive values")
        ax.set_ylabel("Prob. density")
        
        return fig, ax
    
    # Compute a prediction based on the knownledge about 
    # the prior distribution of parameter
    @_check_ndim_prior
    def prior_prediction(self, data):
        data_matrix = np.array(data)
        trans_matrix = np.tranpose(data_matrix)
        likelihood_variance = trans_matrix
        for operand in [self.prior_variance, data_matrix]:
            likelihood_variance = np.matmul(likelihood_variance, 
                                            operand)
        noise_variance = self.NVE()
        
        marginal_mean = np.dot(trans_matrix, self.prior_mean)
        marginal_variance = likelihood_variance + noise_variance
        fig, ax = self._draw_distribution(marginal_mean, np.sqrt(marginal_variance))

        return fig, ax, marginal_mean
    
    # Compute the mean and variance of the posterior distribution
    @_check_ndim_prior
    def posterior_distribution(self):
        data_matrix = self.data
        target = self.target
        noise_variance = self.NVE()
        prior_mean = self.prior_mean
        prior_variance = self.prior_variance
        
        # Posterior variance calculation prep
        trans_matrix = np.tranpose(data_matrix)
        inv_prior_var = np.linalg.inv(prior_variance)
        matrix_product = 1 / noise_variance
        for operand in [trans_matrix, data_matrix]:
            matrix_product = np.matmul(matrix_product, operand)
        
        # Posterior mean calculation prep
        product1 = np.matmul(inv_prior_var, prior_mean)
        product2 = 1 / noise_variance
        for operand in [trans_matrix, target]:
            product2 = np.matmul(product2, operand)
        total = product1 + product2
        
        posterior_variance = np.linalg.inv(inv_prior_var + matrix_product)
        posterior_mean = np.matmul(posterior_variance, total)
        
        return posterior_variance, posterior_mean, noise_variance
    
    # Compute a prediction based on the knownledge about
    # the posterior distribution of parameter
    @_check_ndim_prior
    def posterior_prediction(self, data):
        data_matrix = np.array(data)
        trans_matrix = np.transpose(data_matrix)
        posterior_variance, posterior_mean, noise_variance = self.posterior_distribution()
        
        likelihood_variance = trans_matrix
        for operand in [posterior_variance, data_matrix]:
            likelihood_variance = np.matmul(likelihood_variance, 
                                            operand)
        
        marginal_mean = np.dot(trans_matrix, posterior_mean)
        marginal_variance = likelihood_variance + noise_variance
        fig, ax = self._draw_distribution(marginal_mean, np.sqrt(marginal_variance))
        
        return fig, ax, marginal_mean
    
    # Sample parameters from either the prior or posterior distribution
    # within a specified confidence interval
    def sample(self):
        pass
    

In [10]:
blr = BayesianLinearRegression([1, 1], 2, [1, 1], 2, 0, (2, 0), 0, (2, 2))

In [11]:
blr.prior_prediction()

BayesianLinearRegression.prior_prediction() missing 2 required positional arguments: 'self' and 'data'
