### Imports

In [23]:
import pandas as pd
import numpy as np



from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import StandardScaler

from scipy.stats import ttest_ind

from common_code import test_hopkins


### Loading the data

In [5]:
countries_df = pd.read_csv("data/countries.csv").set_index("country")

### Small Feature Engineering

In [6]:
class ColumnsCombinator(TransformerMixin, BaseEstimator):
    """
    Combines columns of a pandas DataFrame using arithmetic operators and creates a new column.

    Parameters:
        arguments (list of str): The column names to use as operands in the arithmetic operations.
        operators (list of str): The arithmetic operators to use between the columns. Must be one of ['+', '-', '*', '/'].
        new_column_name (str): The name of the new column to create.

    Raises:
        AssertionError: If the number of arguments is not equal to the number of operators plus one.
                        If any of the operators is not one of ['+', '-', '*', '/'].

    Methods:
        fit(X, y=None)
            Stores a copy of the input DataFrame and returns the instance.

        transform(X)
            Applies the arithmetic operations to the stored copy of the input DataFrame and returns the result.

    Attributes:
        arguments (list of str): The column names to use as operands in the arithmetic operations.
        operators (list of str): The arithmetic operators to use between the columns. Must be one of ['+', '-', '*', '/'].
        new_column_name (str): The name of the new column to create.
        X_new (pandas DataFrame): The copy of the input DataFrame used to store the result.
    """
    def __init__(self, arguments, operators, new_column_name):
        assert len(arguments) == len(operators) + 1, "number of arguments must be equal to number of operators + 1"
        assert all(op in ['+', '-', '*', '/'] for op in operators), "operators must be +, -, * or /"
        self.arguments = arguments
        self.operators = operators
        self.new_column_name = new_column_name

    def fit(self, X, y=None):
        self.X_new = X.copy()
        return self

    def transform(self, X):
        for i in range(len(self.operators)):
            if self.operators[i] == "+":
                self.X_new[self.new_column_name] = X[self.arguments[i]] + X[self.arguments[i+1]]
            elif self.operators[i] == "-":
                self.X_new[self.new_column_name] = X[self.arguments[i]] - X[self.arguments[i+1]]
            elif self.operators[i] == "*":
                self.X_new[self.new_column_name] = X[self.arguments[i]] * X[self.arguments[i+1]]
            elif self.operators[i] == "/":
                self.X_new[self.new_column_name] = X[self.arguments[i]] / X[self.arguments[i+1]]
                
        return self.X_new
    
saldo_creator = ColumnsCombinator(arguments=["exports", "imports"],
                                  operators=["-"],
                                  new_column_name="saldo")
total_health_exp_est_creator= ColumnsCombinator(arguments=["health", "gdpp", "income"],
                                                operators=["*", "+"],
                                                new_column_name="total_health_exp")
# add scaling
scaler = StandardScaler()

feature_pipeline = make_pipeline(saldo_creator, total_health_exp_est_creator, scaler)
feature_pipeline.fit(countries_df)
# add scaling here
X_extended = pd.DataFrame(feature_pipeline.transform(countries_df),
                          columns=feature_pipeline[-1].get_feature_names_out(),
                          index=countries_df.index)

In [7]:
from visualization_functions import plot_corr

plot_corr(X_extended)

### PCA

In [12]:
def de_correlate_df(df: pd.DataFrame) -> pd.DataFrame:
    """
    Creates a de-correlated version of the input DataFrame.

    Parameters:
        df (pd.DataFrame): The input DataFrame.

    Returns:
        A de-correlated version of the input DataFrame.
    """
    X_aux = df.copy()
    for col in df.columns:
        X_aux[col] = df[col].sample(frac=1).values
    return X_aux

def determin_n_pca_by_permutations(X: np.ndarray, n_permutations: int = 1000) -> np.ndarray:
    """
    Performs a permutation test to determine the significance of the explained variance of PCA.
    Determines the number of componenets that are significant.

    Parameters:
        X (np.ndarray): The input data of shape (n_samples, n_features).
        n_permutations (int): The number of permutations to perform. Defaults to 1000.

    Returns:
        A numpy array containing the p-values for each principal component.
    """
    pca = PCA()
    pca.fit(X)
    original_variance = pca.explained_variance_ratio_

    variance = np.zeros((n_permutations, X.shape[1]))
    for i in range(n_permutations):
        X_aux = de_correlate_df(pd.DataFrame(X))
        pca.fit(X_aux)
        variance[i, :] = pca.explained_variance_ratio_

    p_val = np.sum(variance > original_variance, axis=0) / n_permutations
    return np.argmax(p_val > 0.05) + 1

best_componenets_n = determin_n_pca_by_permutations(X_extended)
print(f"Best number of componenets: {best_componenets_n}")

best number of componenets: 4


In [22]:


pca = PCA(n_components=best_componenets_n)

pca_df = pd.DataFrame(pca.fit_transform(X_extended),
                      index=countries_df.index,
                      columns=[f"PC{i}" for i in range(1, best_componenets_n+1)])

# no correlation between PCs
(pca_df.corr() > 0.05)

Unnamed: 0,PC1,PC2,PC3,PC4
PC1,True,False,False,False
PC2,False,True,False,False
PC3,False,False,True,False
PC4,False,False,False,True


In [32]:
# strangely, it decreased:)
print(f"hopkins score: {test_hopkins(pca_df)}")

hopkins score: 0.8263252902564866
