In [99]:
import random
import pandas as pd
import numpy as np
from catboost import CatBoostRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from statsmodels.tsa.arima_model import ARIMA
from sklearn.compose import make_column_transformer, make_column_selector
from sklearn.preprocessing import OneHotEncoder
from skforecast.ForecasterAutoreg import ForecasterAutoreg
from skforecast.model_selection import backtesting_forecaster
from skforecast.model_selection import bayesian_search_forecaster
from sklearn.metrics import mean_absolute_error

In [59]:
df = pd.read_excel('../dataset/atm_historical_data_with_features.xlsx')
df['Date'] = pd.to_datetime(df['Date'])

In [60]:
exogenous_variables = ['Holiday']

In [62]:
df_catboost = df[['Date','Soles_Withdrawn'] + exogenous_variables].copy()

In [50]:
df = df[['Date', 'Soles_Withdrawn']]

In [51]:
df = df.set_index('Date')

In [52]:
def evaluate_catboost_model(params, X_train, y_train, X_test, y_test):
    try:
        # Desempaquetamos los parámetros
        depth, learning_rate, iterations = params
        
        # Creamos el modelo CatBoost
        model = CatBoostRegressor(
            depth=int(depth), 
            learning_rate=float(learning_rate),
            iterations=int(iterations),
            verbose=0
        )
        
        # Entrenamos el modelo
        model.fit(X_train, y_train)
        
        # Realizamos predicciones
        y_pred = model.predict(X_test)
        
        # Calculamos el error cuadrático medio
        mse = mean_squared_error(y_test, y_pred)
        return mse
    
    except Exception as e:
        print(f"Error al ajustar el modelo CatBoost con los parámetros {params}: {e}")
        return float('inf')

In [53]:
# # Parámetros de IGAPSOSA
# population_size = 20
# max_iterations = 100
# pso_weight = 0.5  # Peso de PSO
# sa_temp = 100  # Temperatura inicial para Simulated Annealing
# cooling_rate = 0.9  # Tasa de enfriamiento

# # Generar una población inicial aleatoria de soluciones (valores de p, d, q)
# def generate_initial_population():
#     population = []
#     for _ in range(population_size):
#         p = random.randint(0, 5)
#         d = random.randint(0, 2)
#         q = random.randint(0, 5)
#         population.append((p, d, q))
#     return population

# # Función objetivo: minimizar el MSE
# def objective_function(solution, data):
#     return evaluate_arima_model(data, solution)

# # Función de enfriamiento para Simulated Annealing
# def cool_temperature(temp, cooling_rate):
#     return temp * cooling_rate

# # IGAPSOSA: combinación de GA, PSO y SA
# def igapsosa(data):
#     # Generar población inicial
#     population = generate_initial_population()
    
#     # Evaluar aptitud inicial (fitness)
#     fitness = [objective_function(solution, data) for solution in population]
#     best_solution = population[np.argmin(fitness)]
#     best_fitness = min(fitness)
    


In [54]:
#     temperature = sa_temp  # Inicializar temperatura para Simulated Annealing
    
#     for iteration in range(max_iterations):
#         # Paso 1: Algoritmo Genético (Selección y Cruce)
#         selected_population = random.sample(population, population_size // 2)
#         new_population = selected_population.copy()

#         for i in range(0, len(selected_population), 2):
#             parent1 = selected_population[i]
#             parent2 = selected_population[(i + 1) % len(selected_population)]
            
#             # Aplicamos cruce (crossover)
#             child1 = (parent1[0], parent2[1], parent1[2])
#             child2 = (parent2[0], parent1[1], parent2[2])
#             new_population.append(child1)
#             new_population.append(child2)
        
#         # Paso 2: PSO (actualización de velocidad y posición)
#         for i in range(len(new_population)):
#             if random.random() < pso_weight:
#                 new_population[i] = tuple(random.choice(new_population))
        
#         # Paso 3: Recocido Simulado (SA)
#         for i in range(len(new_population)):
#             candidate_fitness = objective_function(new_population[i], data)
#             if candidate_fitness < fitness[i] or random.random() < np.exp((fitness[i] - candidate_fitness) / temperature):
#                 population[i] = new_population[i]
#                 fitness[i] = candidate_fitness
        
#         # Actualizar la mejor solución global
#         current_best = min(fitness)
#         if current_best < best_fitness:
#             best_fitness = current_best
#             best_solution = population[np.argmin(fitness)]
        
#         # Reducir la temperatura
#         temperature = cool_temperature(temperature, cooling_rate)
    
#     return best_solution, best_fitness

# # Aplicamos IGAPSOSA al DataFrame 'df' para optimizar los parámetros ARIMA
# best_params, best_score = igapsosa(df['Soles_Withdrawn'].values)
# print(f"Mejores parámetros: {best_params}")
# print(f"Mejor MSE: {best_score}")

In [46]:
# Función de optimización (IGAPSOSA)
def igapsosa(data, target):
    # Parámetros iniciales del algoritmo IGAPSOSA
    # Ejemplo: Hiperparámetros que queremos optimizar
    param_bounds = {
        'depth': (4, 10),  # Profundidad del árbol
        'learning_rate': (0.01, 0.3),  # Tasa de aprendizaje
        'iterations': (100, 1000)  # Número de iteraciones
    }
    
    # Función para generar la población inicial
    def generate_initial_population(pop_size=10):
        population = []
        for _ in range(pop_size):
            depth = np.random.uniform(*param_bounds['depth'])
            learning_rate = np.random.uniform(*param_bounds['learning_rate'])
            iterations = np.random.uniform(*param_bounds['iterations'])
            population.append([depth, learning_rate, iterations])
        return population

    # Dividimos el dataset en entrenamiento y prueba
    X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.2, random_state=42)

    # Generamos la población inicial
    population = generate_initial_population()

    # Evaluamos el fitness inicial (MSE)
    fitness = [evaluate_catboost_model(solution, X_train, y_train, X_test, y_test) for solution in population]
    best_solution = population[np.argmin(fitness)]
    best_fitness = min(fitness)

    # Aquí puedes seguir aplicando los operadores de IGAPSOSA (crossover, mutation, etc.)
    # Para simplificar, solo estamos generando una población inicial y evaluando su aptitud

    return best_solution, best_fitness

# Creamos un DataFrame de ejemplo
df = pd.DataFrame({
    'Date': pd.date_range(start='2020-01-01', periods=100, freq='D'),
    'Soles_Withdrawn': np.random.rand(100) * 1000
})

# Llamamos a IGAPSOSA con los datos
X = df.drop(columns=['Soles_Withdrawn'])
y = df['Soles_Withdrawn'].values

best_params, best_score = igapsosa(X, y)
print(f"Mejores parámetros: {best_params}")
print(f"Mejor MSE: {best_score}")

Mejores parámetros: [9.002376021156365, 0.0688220742357123, 201.51952771548463]
Mejor MSE: 111684.08197728037


In [69]:
import mlforecast
print(mlforecast.__version__)

0.13.4


In [None]:
# Definir las variables exógenas
exog_cols = ['tu_variable_exogena_1', 'tu_variable_exogena_2']  # Ajusta según tu dataset
fin_validacion = int(len(df) * 0.8)  # División 80-20
datos_train = df.iloc[:fin_validacion]
datos_validacion = df.iloc[fin_validacion:]

# One hot encoding
one_hot_encoder = make_column_transformer(
    (OneHotEncoder(sparse_output=False, drop='if_binary'), 
     make_column_selector(dtype_exclude=np.number)),
    remainder='passthrough',
    verbose_feature_names_out=False
).set_output(transform='pandas')

# Crear forecaster
forecaster = ForecasterAutoreg(
    regressor=CatBoostRegressor(
        random_state=123,
        silent=True,
        allow_writing_files=False,
        boosting_type='Plain',
        leaf_estimation_iterations=3,
    ),
    lags=24,  # Ajusta según sea necesario
    transformer_exog=one_hot_encoder
)

# Búsqueda de hiperparámetros
lags_grid = [48, 72, [1, 2, 3, 23, 24, 25, 167, 168, 169]]

def search_space(trial):
    search_space = {
        'n_estimators': trial.suggest_int('n_estimators', 100, 1000, step=100),
        'max_depth': trial.suggest_int('max_depth', 3, 10, step=1),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 1),
        'lags': trial.suggest_categorical('lags', lags_grid)
    }
    return search_space

# Ejecutar búsqueda bayesiana de hiperparámetros
results_search, frozen_trial = bayesian_search_forecaster(
    forecaster=forecaster,
    y=datos_train['Soles_withdrawn'],
    exog=datos_train[exog_cols],
    search_space=search_space,
    steps=36,
    refit=False,
    metric='mean_absolute_error',
    initial_train_size=len(datos_train),
    fixed_train_size=False,
    n_trials=20,
    random_state=123,
    return_best=True,
    n_jobs='auto',
    verbose=False,
    show_progress=True
)

# Backtesting con datos de test
metrica_catboost, predicciones = backtesting_forecaster(
    forecaster=forecaster,
    y=df['Soles_withdrawn'],
    exog=df[exog_cols],
    initial_train_size=len(datos_train),
    fixed_train_size=False,
    steps=36,
    refit=False,
    metric='mean_absolute_error',
    n_jobs='auto',
    verbose=False
)

print("Mejores parámetros:", frozen_trial.params)
print("Mejor MAE:", metrica_catboost)

In [None]:
exog_cols = ['Holiday']  # Ajusta según tu dataset
fin_validacion = int(len(df_catboost) * 0.8)  # División 80-20
datos_train = df_catboost.iloc[:fin_validacion]
datos_validacion = df_catboost.iloc[fin_validacion:]

# One hot encoding
one_hot_encoder = make_column_transformer(
    (OneHotEncoder(sparse_output=False, drop='if_binary'), 
     make_column_selector(dtype_exclude=np.number)),
    remainder='passthrough',
    verbose_feature_names_out=False
).set_output(transform='pandas')

# Crear forecaster
forecaster = ForecasterAutoreg(
    regressor=CatBoostRegressor(
        random_state=123,
        silent=True,
        allow_writing_files=False,
        boosting_type='Plain',
        leaf_estimation_iterations=3,
    ),
    lags=24,  # Ajusta según sea necesario
)

# Búsqueda de hiperparámetros
search_space = {
    'n_estimators': (100, 1000),
    'max_depth': (3, 10),
    'learning_rate': (0.01, 1),
}

initial_train_size = len(datos_train)

def search_space(trial):
    return {
        'n_estimators': trial.suggest_int('n_estimators', 100, 1000, step=100),
        'max_depth': trial.suggest_int('max_depth', 3, 10, step=1),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 1),
    }

# Ejecutar búsqueda bayesiana de hiperparámetros
results_search = bayesian_search_forecaster(
    forecaster=forecaster,
    y=datos_train['Soles_Withdrawn'],
    exog=datos_train[exog_cols],
    search_space=search_space,  # Ahora es una función
    steps=36,
    refit=False,
    metric='mean_absolute_error',
    n_trials=20,
    random_state=123,
    verbose=False,
    initial_train_size=initial_train_size
)

# Backtesting con datos de test
metrica_catboost, predicciones = backtesting_forecaster(
    forecaster=forecaster,
    y=df_catboost['Soles_Withdrawn'],
    exog=df_catboost[exog_cols],
    initial_train_size=initial_train_size,
    fixed_train_size=False,
    steps=36,
    refit=False,
    metric='mean_absolute_error',
    verbose=False
)

print("Mejores parámetros:", results_search['best_params'])
print("Mejor MAE:", metrica_catboost)

In [83]:
# Supongamos que df_catboost es tu DataFrame con los datos
exog_cols = ['Holiday']  # Usar directamente si ya está codificada en 0 y 1
fin_validacion = int(len(df_catboost) * 0.8)  # División 80-20
datos_train = df_catboost.iloc[:fin_validacion]
datos_validacion = df_catboost.iloc[fin_validacion:]

# Crear forecaster
forecaster = ForecasterAutoreg(
    regressor=CatBoostRegressor(
        random_state=123,
        silent=True,
        allow_writing_files=False,
        boosting_type='Plain',
        leaf_estimation_iterations=3,
    ),
    lags=24,  # Ajusta según sea necesario
)

# Búsqueda de hiperparámetros
search_space = {
    'n_estimators': (100, 1000),
    'max_depth': (3, 10),
    'learning_rate': (0.01, 1),
}

# Asegúrate de que initial_train_size sea menor que la longitud de datos_train
initial_train_size = int(len(datos_train) * 0.8)  # Por ejemplo, 80% de datos_train

def search_space_function(trial):
    return {
        'n_estimators': trial.suggest_int('n_estimators', 100, 1000, step=100),
        'max_depth': trial.suggest_int('max_depth', 3, 10, step=1),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 1),
    }

# Ejecutar búsqueda bayesiana de hiperparámetros
results_search = bayesian_search_forecaster(
    forecaster=forecaster,
    y=datos_train['Soles_Withdrawn'],
    exog=datos_train[exog_cols],
    search_space=search_space_function,  # Usar la función de búsqueda
    steps=36,
    refit=False,
    metric='mean_absolute_error',
    n_trials=20,
    random_state=123,
    verbose=False,
    initial_train_size=initial_train_size
)

# Backtesting con datos de test
metrica_catboost, predicciones = backtesting_forecaster(
    forecaster=forecaster,
    y=df_catboost['Soles_Withdrawn'],
    exog=df_catboost[exog_cols],
    initial_train_size=initial_train_size,
    fixed_train_size=False,
    steps=36,
    refit=False,
    metric='mean_absolute_error',
    verbose=False
)

Best trial: 11. Best value: 8247.27: 100%|██████████| 20/20 [03:04<00:00,  9.23s/it]


`Forecaster` refitted using the best-found lags and parameters, and the whole data set: 
  Lags: [ 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24] 
  Parameters: {'n_estimators': 100, 'max_depth': 10, 'learning_rate': 0.016388840477631006}
  Backtesting metric: 8247.274671481195



100%|██████████| 36/36 [00:03<00:00, 11.61it/s]


In [84]:
print("Resultados de búsqueda:", results_search)

Resultados de búsqueda: (                                                 lags  \
11  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
10  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
14  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
12  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
17  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
15  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
16  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
18  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
5   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
2   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
8   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
4   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
1   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
0   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
6   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
7   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
9   [1

In [85]:
# Suponiendo que el primer elemento del tuple es un dict con los mejores parámetros
best_params = results_search[0]  # Acceder al primer elemento (ajusta según sea necesario)
best_mae = results_search[1]  # Acceder al segundo elemento (ajusta según sea necesario)

print("Mejores parámetros:", best_params)
print("Mejor MAE:", best_mae)

Mejores parámetros:                                                  lags  \
11  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
10  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
14  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
12  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
17  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
15  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
16  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
18  [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
5   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
2   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
8   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
4   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
1   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
0   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
6   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
7   [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14...   
9   [1, 2, 

# IGAPSOSA - Catboost

In [96]:
class Particle:
    def __init__(self, bounds):
        self.position = np.random.uniform(bounds[:, 0], bounds[:, 1])
        self.velocity = np.random.uniform(-1, 1, size=len(bounds))
        self.best_position = self.position.copy()
        self.best_score = float('inf')

class IGAPSOSA:
    def __init__(self, objective_function, bounds, num_particles=30, max_iterations=100, w=0.5, c1=1.5, c2=1.5):
        self.objective_function = objective_function
        self.bounds = bounds
        self.num_particles = num_particles
        self.max_iterations = max_iterations
        self.w = w
        self.c1 = c1
        self.c2 = c2

    def optimize(self):
        particles = [Particle(self.bounds) for _ in range(self.num_particles)]
        global_best_position = None
        global_best_score = float('inf')

        for iteration in range(self.max_iterations):
            for particle in particles:
                score = self.objective_function(particle.position)

                if score < particle.best_score:
                    particle.best_score = score
                    particle.best_position = particle.position.copy()

                if score < global_best_score:
                    global_best_score = score
                    global_best_position = particle.position.copy()

            for particle in particles:
                r1, r2 = np.random.rand(2)
                particle.velocity = (self.w * particle.velocity +
                                     self.c1 * r1 * (particle.best_position - particle.position) +
                                     self.c2 * r2 * (global_best_position - particle.position))
                
                particle.position += particle.velocity
                particle.position = np.clip(particle.position, self.bounds[:, 0], self.bounds[:, 1])

        return global_best_position, global_best_score

In [97]:
def objective_function(params):
    n_estimators = int(params[0])
    max_depth = int(params[1])
    learning_rate = params[2]  # Se mantiene como float
    
    model = CatBoostRegressor(
        n_estimators=n_estimators,
        max_depth=max_depth,
        learning_rate=learning_rate,
        random_state=123,
        silent=True,
        allow_writing_files=False
    )
    
    model.fit(datos_train[exog_cols], datos_train['Soles_Withdrawn'])
    predictions = model.predict(datos_validacion[exog_cols])
    mae = mean_absolute_error(datos_validacion['Soles_Withdrawn'], predictions)
    
    return mae

In [100]:
exog_cols = ['Holiday']  # Ajusta según tu dataset
fin_validacion = int(len(df_catboost) * 0.8)  # División 80-20
datos_train = df_catboost.iloc[:fin_validacion]
datos_validacion = df_catboost.iloc[fin_validacion:]

# Definir los límites de los hiperparámetros
bounds = np.array([[100, 1000],  # n_estimators
                   [3, 10],     # max_depth
                   [0.01, 1]])  # learning_rate

# Ejecutar el optimizador IGAPSOSA
optimizer = IGAPSOSA(objective_function, bounds, num_particles=30, max_iterations=100)
best_position, best_score = optimizer.optimize()

print("Mejores hiperparámetros encontrados:")
print("n_estimators:", int(best_position[0]))
print("max_depth:", int(best_position[1]))
print("learning_rate:", best_position[2])
print("Mejor MAE:", best_score)