# Utilities Functions

In [None]:
import numpy as np
import pandas as pd

from numpy.linalg import inv, pinv, solve
from scipy.special import expit
from numpy.random import default_rng
from scipy.stats import entropy, zscore, skew, tstd, percentileofscore

from sklearn.metrics.pairwise import polynomial_kernel
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mutual_info_score
from sklearn.feature_selection import mutual_info_regression

from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import cross_val_score
import numpy as np

The `ridge_regression` function is a Ridge Regression implementation that trains a model, predicts, and computes metrics. 

Ridge Regression is a linear regression variant that includes a regularization term, regulated by `lambda_val`, to prevent model overfitting. It accepts four arguments: `X_train`, `Y_train`, `X_test`, and `lambda_val`. `X_train` and `Y_train` are numpy arrays representing the training design matrix and the training response vector respectively, where each row in `X_train` represents an observation and each corresponding element in `Y_train` represents the target value. 

An optional argument is `X_test`, the test design matrix, used for making predictions with the trained model. Another optional argument is `lambda_val`, a float that defaults to 0.001, regulating the strength of the regularization term in the Ridge Regression formula.

The function returns a dictionary containing various elements. The training residuals `e_train` represents the difference between the actual and predicted values. `MSE_emp` denotes the mean squared error on the training data, giving an indication of the model's prediction error. `NMSE`, or Normalized Mean Squared Error, provides a relative measure of the prediction error, allowing comparisons between different models or datasets. `MSE_loo` is the mean squared error computed using leave-one-out cross-validation, a robust way to estimate model performance. 

The function also returns the predicted values for the training data, `Y_train_hat`, and the test data, `Y_test_hat` (which is None if no test data was provided). Lastly, the trained Ridge Regression model itself is returned as `model`.


In [None]:
def ridge_regression(X_train, Y_train, X_test=None, lambda_val=1e-3):
    """
    Perform ridge regression and returns the trained model, predictions, and metrics.

    Args:
        X_train (np.ndarray): The training design matrix.
        Y_train (np.ndarray): The training response vector.
        X_test (np.ndarray, optional): The test design matrix. Defaults to None.
        lambda_val (float, optional): The regularization parameter. Defaults to 1e-3.

    Returns:
        dict: Dictionary containing the trained model, predictions, and computed metrics.
    """
    model = Ridge(alpha=lambda_val)
    model.fit(X_train, Y_train)

    Y_train_hat = model.predict(X_train)
    e_train = Y_train - Y_train_hat
    MSE_emp = mean_squared_error(Y_train, Y_train_hat)
    NMSE = MSE_emp / (np.var(Y_train)**2)

    e_loo = cross_val_score(model, X_train, Y_train, scoring='neg_mean_squared_error', cv=len(X_train))
    MSE_loo = -np.mean(e_loo)

    Y_test_hat = None
    if X_test is not None:
        Y_test_hat = model.predict(X_test)

    return {
        'e_train': e_train,
        'MSE_emp': MSE_emp,
        'NMSE': NMSE,
        'MSE_loo': MSE_loo,
        'Y_train_hat': Y_train_hat,
        'Y_test_hat': Y_test_hat,
        'model': model,
    }


The `column_based_correlation` function computes the correlation between each column in a matrix `X` and a vector `Y`. 


In [None]:
def column_based_correlation(X,Y,verbose=True):
    #TODO: multidimensional Y 
    if verbose: print('column_based_correlation')
    columns_of_X = X.shape[1]  # Number of columns in X

    correlation_vector = np.zeros(columns_of_X)  # Initialize correlation vector

    for i in range(columns_of_X):
        correlation_matrix = np.corrcoef(X.iloc[:, i], Y.iloc[:, 0])
        correlation_value = correlation_matrix[0, 1]
        correlation_vector[i] = correlation_value

    correlation_array = correlation_vector.reshape(1, -1)

    # Print the correlation vector
    return(correlation_array[0])


The `co2i` function converts correlations to mutual information, a quantity which measures the dependency between variables. It calls the `column_based_correlation` function to compute correlation between each column in `X` and `Y`. It squares this correlation and then uses it to compute mutual information. If `Y` is a `pandas.Series`, it converts it to a `DataFrame`. The function also supports a `verbose` flag that, when set to `True`, prints the operations being performed and the computed mutual information.


In [None]:
def co2i(X,Y, verbose=True):

    # check if Y is a pd.series and make it dataframe
    if isinstance(Y, pd.Series):
        Y = pd.DataFrame(Y)

    if verbose: print('co2i')

    correlation_vector = column_based_correlation(X,Y, verbose=verbose)
    corr_sq = np.square(correlation_vector)

    I = -0.5 * np.log(1 - corr_sq)
    if verbose: print('I: ', I)

    return I


This function calculates the mutual information between each column of `X` and `Y`, and returns the indices of the columns in `X` that have the highest mutual information with `Y`. The number of indices returned is determined by thenmax parameter.

If the variance of `Y` is less than 0.01, the function returns a list of indices ranging from 1 to nmax. If regr is False, the function uses the co2i function to calculate the mutual information. If regr is True, ridge regression is performed for each column of X with Y as the target variable, and the maximum coefficient value is
used as the mutual information.

The input arrays X and Y are expected to have compatible shapes, where X has shape (N, n) and Y has shape (N,). The function assumes that the columns of X and Y correspond to the same samples.

Example:
```
X =    [[1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]]

Y = [10, 20, 30]

top_features = rankrho(X, Y, nmax=2, regr=True)

# Output: [3, 2]
# The third column of X has the highest mutual information with Y, followed by the second column.
```

In [None]:
def rankrho(X, Y, nmax=5, regr=False, verbose=True):
    """
    Perform mutual information ranking between two arrays.

    Parameters:
        X (array-like): Input array with shape (N, n), representing N samples and n features.
        Y (array-like): Input array with shape (N,). Target variable.
        nmax (int, optional): Number of top-ranked features to return. Defaults to 5.
        regr (bool, optional): Flag indicating whether to use ridge regression for ranking. Defaults to False.
        verbose (bool, optional): Flag indicating whether to display progress information. Defaults to True.

    Returns:
        list: Indices of the top-ranked features in X based on mutual information with Y.


       
    """
    if verbose: print('rankrho')
    # Number of columns in X and Y
    n = X.shape[1]
    # m = Y.shape[1] #TODO: handle the multivariate case
    N = X.shape[0]

    if np.var(Y) < 0.01:
        if verbose: print('np.var(Y) < 0.01')
        return list(range(1, nmax + 1))
    
    # Scaling X
    X = (X - np.mean(X, axis=0)) / np.std(X, axis=0)

    Iy = np.zeros(n)

    if not regr:
        if verbose: print('not regr')
        Iy = co2i(X, Y, verbose=verbose)
        if verbose: print(Iy)
    else:
        if verbose: print('regr')
        for i in range(n):
            Iy[i] = abs(ridge_regression(X[:, i], Y)['beta_hat'][1])

    # if m > 1:
    #     Iy = np.mean(Iy, axis=1)

    return (np.argsort(Iy)[::-1] + 1)[:nmax]

The `mRMR` function implements the Maximum Relevance Minimum Redundancy (mRMR) feature selection method. It calculates the mutual information between each feature in `X` and `Y`, then iteratively selects the features with high relevance to `Y` and low redundancy with the already selected features. 

The function starts by selecting the feature with the maximum mutual information with `Y`. Then, for `nmax - 1` iterations, it calculates the mutual information between each remaining feature and the currently selected features, computes the mRMR score for each remaining feature (the difference between its mutual information with `Y` and the average mutual information with the selected features), and selects the feature with the maximum mRMR score.

It returns the indices of the selected features in the order they were selected. The function supports a `verbose` flag that, when set to `True`, prints the operations being performed and intermediate results.


In [None]:
def mRMR(X, Y, nmax, verbose=True):

    if verbose: print('mRMR')
    num_features = X.shape[1]
    
    # Calculate mutual information between each feature in X and Y
    mi_XY = mutual_info_regression(X, Y)
    if verbose: print("mi_XY: ", mi_XY)

    # Start with the feature with maximum MI with Y
    indices = [np.argmax(mi_XY)]
    
    for _ in range(nmax - 1):
        remaining_indices = list(set(range(num_features)) - set(indices))
        if verbose: print("remaining_indices: ", remaining_indices)
        mi_XX = np.zeros(len(remaining_indices))
        
        # Calculate mutual information between selected features and remaining features
        for i in range(len(remaining_indices)):
            mi_XX[i] = mutual_info_regression(X.iloc[:, indices], X.iloc[:, remaining_indices[i]])[0]
        
        # Calculate MRMR score for each remaining feature
        mrmr_scores = mi_XY[remaining_indices] - np.mean(mi_XX)
        if verbose: print("mrmr_scores: ", mrmr_scores)
        # Select feature with maximum MRMR score
        indices.append(remaining_indices[np.argmax(mrmr_scores)])
    
    return indices

The `normalized_prediction` function computes the normalized mean squared error of the dependency, which is a measure of the quality of a predictive model. It performs data preprocessing on `X` to remove constant and `NaN`-containing columns, and normalizes `X` by subtracting the mean and dividing by the standard deviation.

The function begins by ensuring `X` is a DataFrame if it is initially a `pd.Series`. It checks if `X` has more than one feature. If so, it removes constant and `NaN`-containing columns. If `X` only contains one feature or less, it checks for `NaN` values and, if found, it returns 1.

Once `X` is processed, it's standardized to have zero mean and unit variance for each feature. The function checks if there are too few samples (`N < 5`) or if `NaN` values exist after normalization, in which case it returns the variance of `Y`.

If the `lin` argument is `True`, which is the default, the function calculates the normalized mean squared error using `ridge_regression` function and returns it. In case a nonlinear prediction method is desired, it's flagged with a `TODO` comment indicating it has not been implemented yet.

Verbose mode is supported which, when set to `True`, prints the operations being performed.


In [None]:

def normalized_prediction(X, Y, lin=True, verbose=True):
    """
    Normalized mean squared error of the dependency
    """

    if verbose: print('normalized_prediction')
    if isinstance(X, pd.Series):
        X = pd.DataFrame(X)
    if verbose: print(X.shape)    
    N, n = X.shape

    if n > 1: # TODO: check the case if all columns are constant, return 1
        if verbose: print('n > 1')
        X = np.delete(X, np.where(np.std(X, axis=0) < 0.01)[0], axis=1) # if there is any constant column, remove it
        X = np.delete(X, np.where(np.isnan(np.sum(X, axis=0)))[0], axis=1) # if there is any nan, remove it
    else:
        if verbose: print('n <= 1')
        if np.any(np.isnan(X)): return 1 # TODO: check this
            
    XX = (X - np.mean(X, axis=0)) / np.std(X, axis=0)

    if N < 5 or np.any(np.isnan(XX)): 
        if verbose: print('N < 5 or np.any(np.isnan(XX))')
        return np.var(Y)
    if lin: 
        if verbose: print('lin')
        return max(1e-3, ridge_regression(XX, Y)['MSE_loo'] / (1e-3 + np.var(Y)))

    #TODO: implement the nonlinear case


The `normalized_conditional_information` function computes the normalized conditional information between two datasets `x1` and `y`, given a dataset `x2`. This is a measure of the amount of information that `x1` provides about `y` that is not already provided by `x2`. 

The function checks if `x2` is provided. If not, it computes the normalized prediction of `y` given `x1` (which serves as an estimate of the entropy of `y` given `x1`), and returns the quantity `1 - entropy_y_given_x1` as the normalized information between `x1` and `y`. 

If `x2` is provided, it computes the normalized prediction of `y` given `x2` and `x1` concatenated with `x2`, and returns the difference between `entropy_y_given_x2` and `entropy_y_given_x1_x2`, normalized by `entropy_y_given_x2`. 

This method essentially quantifies how much `x1` can further explain `y` even when `x2` is already known. The function supports a `verbose` mode, which, when set to `True`, prints the operations being performed and intermediate results.


In [None]:
def normalized_conditional_information(y, x1, x2=None, lin=True, verbose=True):
        """
        Normalized conditional information of x1 to y given x2
        I(x1;y| x2)= (H(y|x2)-H(y | x1,x2))/H(y|x2)
        """
        if verbose: print('normalized_conditional_information')

        if x2 is None:  # I(x1;y)= (H(y)-H(y | x1))/H(y)
            if verbose: print('x2 is None')
            entropy_y_given_x1 = normalized_prediction(x1, y, verbose=verbose)
            return max(0, 1 - entropy_y_given_x1)

        if verbose: print('x2 is not None')
        entropy_y_given_x2 = normalized_prediction(x2, y, verbose=verbose)
        x1_x2 = pd.concat([x1, x2],axis=1)
        entropy_y_given_x1_x2 = normalized_prediction(x1_x2, y, verbose=verbose)
        if verbose: print('entropy_y_given_x2: ', entropy_y_given_x2)
        return max(0, entropy_y_given_x2 - entropy_y_given_x1_x2 ) / (entropy_y_given_x2 + 0.01)
