In [2]:
from feature_engine.selection import SmartCorrelatedSelection, DropConstantFeatures, SelectByShuffling
from sklearn.feature_selection import mutual_info_regression
from collections import defaultdict
from copy import deepcopy


## LECTURA DE DATOS

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import optuna

def clean_value(x):
    if isinstance(x, str):
        x = x.replace('%', '').replace('$', '').replace(',', '')
        x = x.replace(' ', '')
    try:
        return float(x)
    except:
        return None

plt.style.use("seaborn-v0_8")
sns.set_context("talk")

# Ruta del archivo
ruta = r"C:\Users\ASUS\Documents\GitHub\tuya\BASE_DS.xlsx"

# Cargar BASE
base = pd.read_excel(ruta, sheet_name="BASE")
base = base[["Fecha", "facturacion", "saldo"]]

# Cargar MACROS
macros = pd.read_excel(ruta, sheet_name="MACROS")
# macros.columns = macros.columns.str.strip()
macros.columns = (
    macros.columns
    .str.replace('\n', ' ', regex=True)  # Reemplaza saltos de línea por espacio
    .str.replace('\s+', ' ', regex=True) # Colapsa espacios múltiples en uno solo
    .str.strip()                          # Quita espacios al inicio y fin
)
# Apply cleaning to all except the first column (Fecha)
for col in macros.columns[1:]:
    macros[col] = macros[col].apply(clean_value)

# Convertir fecha 
macros['Fecha'] = pd.to_datetime(macros['Fecha'], format="%b-%y", errors='coerce')


# Convertir fecha 
base['Fecha'] = pd.to_datetime(base['Fecha'], format="%b-%y", errors='coerce')
base.set_index('Fecha', inplace=True)

# ===============================
# 1. Cargar datos
# ===============================
df = pd.merge(macros, base, left_on='Fecha', right_index=True, how='inner')
df = df.set_index('Fecha')
df = df.sort_index()

# ===============================
# 2. Crear variables target como cambio porcentual (0-1)
# ===============================
df['facturacion_change'] = df['facturacion'].pct_change(fill_method=None)
df['saldo_change'] = df['saldo'].pct_change(fill_method=None)

# ===============================
# 3. Features y targets
# ===============================
feature_cols = [col for col in df.columns if col not in ['facturacion','saldo','facturacion_change','saldo_change']]
target_fact = ['facturacion_change']
target_sald = ['saldo_change']

escenario_keywords = ['pesimista', 'alterno', 'optimista']
feature_cols = [
    col for col in feature_cols 
    if not any(keyword.lower() in col.lower() for keyword in escenario_keywords)
]

data_fact = df[feature_cols + target_fact].dropna(subset=feature_cols).dropna(how='any')
data_sald = df[feature_cols + target_sald].dropna(subset=feature_cols).dropna(how='any')

  from .autonotebook import tqdm as notebook_tqdm


### Estrategia utilizada
El procedimiento seguido se centró en:

1. **Evaluación de la multicolinealidad:**  
   Se analizaron las correlaciones entre variables para detectar redundancias.  
   Cuando dos o más variables mostraron una correlación muy alta, se consideró que aportaban información similar.

2. **Selección basada en información mutua:**  
   Para cada grupo de variables altamente correlacionadas, se utilizó la **información mutua** con la variable objetivo como criterio de selección.  
   La información mutua mide la dependencia entre dos variables y nos indica cuánto conocimiento adicional sobre la variable objetivo aporta una variable específica.  
   De este modo, se conservó la variable que maximiza la información mutua con el target, garantizando mayor relevancia predictiva.

## FUNCIONES

In [4]:
def remove_correlated_features(df, numvars, target, corr_limit=0.85, selection_method="variance"):
    """   
    Parameters:
    df (pd.DataFrame): The dataframe containing the dataset from which features will be selected and reduced.
    numvars (list): A list of numeric variable names (columns in the dataframe) that are to be considered for the feature selection process.
    target (str or list): The name of the target variable column in the dataframe. If a list is provided, the first element is considered as the target variable.
    corr_limit (float, optional): The correlation threshold used to identify highly correlated pairs of variables. Default is 0.90.
    selection_method (str): Takes the values “missing_values”, “cardinality”, “variance” and "mutual_information_score".
        “missing_values”: keeps the feature from the correlated group with the least missing observations. Developed by feature_engine library.
        “cardinality”: keeps the feature from the correlated group with the highest cardinality. Developed by feature_engine library.
        “variance”: keeps the feature from the correlated group with the highest variance. Developed by feature_engine library.
        "mutual_information_score": keeps the feature from the correlated group that has the highest mutual information score with the target. Developed by featurewiz library.


    Returns:
    list: A list of selected features that are uncorrelated and have high mutual information scores with the target.
    """
    def remove_correlated_features_simple(df, numvars, threshold, selection_method):
        """
        Removes correlated features using smartcorrelatedselection from feature_engine
        """
        scs = SmartCorrelatedSelection(threshold=threshold, selection_method=selection_method)
        no_correlated_features = scs.fit_transform(df[numvars])
        no_correlated_features_ = no_correlated_features.columns
        return no_correlated_features_

    def find_remove_duplicates(list_of_values):
        """
        Removes duplicates from a list to return unique values
        """
        output = []
        seen = set()
        for value in list_of_values:
            if value not in seen:
                output.append(value)
                seen.add(value)
        return output
    
    def return_dictionary_list(lst_of_tuples):
        """ 
        Returns a dictionary of lists if you send in a list of Tuples
        """
        orDict = defaultdict(list)
        # iterating over list of tuples
        for key, val in lst_of_tuples:
            orDict[key].append(val)
        return orDict

    def left_subtract(l1, l2):
        """
        Returns a list of elements in l1 that are not in l2
        """
        lst = []
        for i in l1:
            if i not in l2:
                lst.append(i)
        return lst
    
    # Deep copy of the dataframe to avoid modifying the original one
    df = deepcopy(df)
    # Extract the target column
    print('#COLUMNS')
    print(df.columns)
    df_target = df[target]

    # Deep copy of the target to avoid modifying the original one
    target = deepcopy(target)

    if selection_method == "mutual_information_score":

        # Calculate the absolute correlation matrix, sort it, and remove duplicates
        correlation_dataframe = df.corr().abs().unstack().sort_values().round(7).drop_duplicates()

        # Convert the correlation data to a dataframe
        corrdf = pd.DataFrame(correlation_dataframe[:].reset_index())
        corrdf.columns = ['var1', 'var2', 'coeff']

        # Filter the correlation dataframe to get pairs with correlation above the threshold
        corrdf1 = corrdf[corrdf['coeff'] >= corr_limit]

        # Remove self-correlations
        corrdf1 = corrdf1[corrdf1['var1'] != corrdf1['var2']]
        
        # Create a list of correlated pairs
        correlated_pair = list(zip(corrdf1['var1'].values.tolist(), corrdf1['var2'].values.tolist()))
        
        # Convert the list of correlated pairs to a dictionary
        corr_pair_dict = dict(return_dictionary_list(correlated_pair))
        
        # Get a list of unique variables that are part of correlated pairs
        corr_list = find_remove_duplicates(corrdf1['var1'].values.tolist() + corrdf1['var2'].values.tolist())
        
        # Get keys from the correlation dictionary
        keys_in_dict = list(corr_pair_dict.keys())
        
        # Create a reverse mapping of correlated pairs
        reverse_correlated_pair = [(y, x) for (x, y) in correlated_pair]
        reverse_corr_pair_dict = dict(return_dictionary_list(reverse_correlated_pair))
        
        # Merge the original and reverse correlation dictionaries
        for key, val in reverse_corr_pair_dict.items():
            if key in keys_in_dict:
                if len(key) > 1:
                    corr_pair_dict[key] += val
            else:
                corr_pair_dict[key] = val
        
        # Check if there are no correlated variables
        if len(corr_list) == 0:
            final_list = list(correlation_dataframe)
            print('    Selecting all (%d) variables since none of numeric vars are highly correlated...' % len(numvars))
            return numvars
        
        # Handle target variable if it is provided as a list
        if isinstance(target, list):
            target = target[0]
        
        max_feats = len(corr_list)
        sel_function = mutual_info_regression

        # Ensure there are no infinite or null values in the dataframe
        df_fit = df[corr_list]
        
        # Drop rows with NaN values if present
        if df_fit.isnull().sum().sum() > 0:
            df_fit = df_fit.dropna()
        else:
            print('    there are no null values in dataset...')
        
        # Check for null values in the target column
        if df_target.isnull().sum().sum() > 0:
            print('    there are null values in target. Returning with all vars...')
            return numvars
        else:
            print('    there are no null values in target column...')
        
        try:
            # Calculate mutual information scores
            fs = mutual_info_regression(df_fit, df_target, n_neighbors=5, discrete_features=False, random_state=42)
        except:
            print('    function is erroring. Returning with all %s variables...' % len(numvars))
            return numvars
        
        try:
            #################################################################################
            #######   This is the main section where we use mutual info score to select vars        
            #################################################################################
            mutual_info = dict(zip(corr_list, fs))
            
            # Sort variables by mutual information score in descending order
            sorted_by_mutual_info = [key for (key, val) in sorted(mutual_info.items(), key=lambda kv: kv[1], reverse=True)]
            
            selected_corr_list = []
            
            # Make multiple copies of the sorted list since it is iterated many times
            orig_sorted = deepcopy(sorted_by_mutual_info)
            copy_sorted = deepcopy(sorted_by_mutual_info)
            copy_pair = deepcopy(corr_pair_dict)
            
            # Select each variable by the highest mutual info and see what vars are correlated to it
            for each_corr_name in copy_sorted:
                selected_corr_list.append(each_corr_name)
                for each_remove in copy_pair[each_corr_name]:
                    if each_remove in copy_sorted:
                        copy_sorted.remove(each_remove)
            
            # Combine the uncorrelated list to the selected correlated list above
            rem_col_list = left_subtract(numvars, corr_list)
            final_list = rem_col_list + selected_corr_list
            removed_cols = left_subtract(numvars, final_list)
        except Exception as e:
            print('    SULOV Method crashing due to %s' % e)
            removed_cols = remove_highly_correlated_vars_fast(df, corr_limit)
            final_list = left_subtract(numvars, removed_cols)
        print(f'Completed using {selection_method}. Features selected: {len(final_list)}')

    elif selection_method in ["missing_values", "cardinality", "variance", "model_performance"]:    
        final_list = list(remove_correlated_features_simple(df, numvars, threshold=corr_limit, selection_method=selection_method))
        print(f'Completed using {selection_method}. Features selected: {len(final_list)}')
    return final_list

### FEATURES SELECCIONADAS PARA FACTURACION

In [5]:
res_fact = remove_correlated_features(data_fact, feature_cols, target_fact[0], corr_limit=0.8, selection_method="mutual_information_score")

#COLUMNS
Index(['PIB (var. % anual, nominal)', 'CH2', 'Desempleo', 'Inflacion', 'DTF',
       'IBR', 'Tasa_Cambio', 'TASA REPO', 'facturacion_change'],
      dtype='object')
    there are no null values in dataset...
    there are no null values in target column...
Completed using mutual_information_score. Features selected: 4


In [6]:
res_fact

['Desempleo', 'Tasa_Cambio', 'TASA REPO', 'PIB (var. % anual, nominal)']

### FEATURES SELECCIONADAS PARA SALDO

In [7]:
res_sald = remove_correlated_features(data_sald, feature_cols, target_sald[0], corr_limit=0.8, selection_method="mutual_information_score")

#COLUMNS
Index(['PIB (var. % anual, nominal)', 'CH2', 'Desempleo', 'Inflacion', 'DTF',
       'IBR', 'Tasa_Cambio', 'TASA REPO', 'saldo_change'],
      dtype='object')
    there are no null values in dataset...
    there are no null values in target column...
Completed using mutual_information_score. Features selected: 4


In [8]:
res_sald

['Desempleo', 'Tasa_Cambio', 'TASA REPO', 'CH2']

### Resultados del proceso
Tras aplicar este método, se definieron los conjuntos óptimos de variables para cada cuenta a pronosticar:

- **Para Facturación:**  
  - `Desempleo`  
  - `Tasa_Cambio`  
  - `TASA REPO`  
  - `PIB (var. % anual, nominal)`

- **Para Saldo:**  
  - `Desempleo`  
  - `Tasa_Cambio`  
  - `TASA REPO`  
  - `CH2`

---

### Beneficios obtenidos
Este proceso permitió:
- Reducir la **complejidad del modelo** al trabajar solo con las variables más relevantes.
- Disminuir el riesgo de **sobreajuste** (overfitting).
- Mejorar la **capacidad de generalización** de los modelos y aumentar la precisión de los pronósticos.