In [21]:
import numpy as np
import pandas as pd
import gym
from gym import spaces
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import ParameterGrid

In [22]:
#Pre-procesamiento: Revisamos el dataframe y validamos su calidad

df = pd.read_csv('../data/processed/masked_data.csv')
df = df.drop('Unnamed: 0', axis=1)

null_values = df.isnull().sum()
print(null_values)

df['date_id'] = pd.to_datetime(df['date_id'])
df.sort_values(by=['product_id', 'date_id'], inplace=True)
df['month'] = df['date_id'].dt.month
df['day_of_week'] = df['date_id'].dt.dayofweek # fin de semana es 5 y 6

date_id        0
product_id     0
category_id    0
sales          0
price          0
units          0
dtype: int64


## Entorno (PricingEnv):

* Recibe el dataframe con price, sales y units.
* Define las acciones: aumentar, disminuir o mantener el precio.
* Calcula la demanda usando un modelo de elasticidad-precio con un coeficiente de -1.5.
* Finaliza el episodio cuando se acaban los datos o el inventario llega a cero.

In [None]:
# Entorno de gym para el problema de pricing


class PricingEnv(gym.Env):
    def __init__(self, df):
        super(PricingEnv, self).__init__()
        self.df = df
        self.current_step = 0
        self.max_steps = len(df)
        self.action_space = spaces.Discrete(3)  # 0: bajar precio, 1: mantener, 2: subir precio
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(2,), dtype=np.float32)

    def reset(self):  
        self.current_step = 0
        row = self.df.iloc[self.current_step]
        self.state = np.array([row['price'], row['sales']])
        return self.state

    def estimate_demand(self, price, prev_price, prev_sales): # Modelo de elasticidad de precio para estimar demanda
        price_elasticity = 1  # Ajuste elastico de precio
        return prev_sales * (price / prev_price) ** price_elasticity

    def step(self, action):
        price, sales = self.state
        new_price = price * (0.97 if action == 0 else 1.03 if action == 2 else 1.0)
        new_sales = self.estimate_demand(new_price, price, sales)
        revenue = new_price * new_sales
        
        self.state = np.array([new_price, new_sales])
        self.current_step += 1
        done = self.current_step >= self.max_steps
        
        return self.state, revenue, done, {}

    def render(self):
        print(f"Step: {self.current_step}, Price: {self.state[0]}, Sales: {self.state[1]}")



## Agente (QLearningAgent):

* Usa una tabla Q para almacenar valores de estado-acción.
* Implementa una política epsilon-greedy para la exploración-explotación.
* Actualiza los valores Q con la ecuación de Bellman.

In [24]:
# Agente de Q-Learning para el problema de pricing
class QLearningAgent:
    def __init__(self, state_space, action_space, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.q_table = {}
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.state_space = state_space
        self.action_space = action_space
    
    def get_q_values(self, state):
        return self.q_table.setdefault(tuple(state), np.zeros(self.action_space))
    
    def choose_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.action_space)
        return np.argmax(self.get_q_values(state))
    
    def update(self, state, action, reward, next_state):
        q_values = self.get_q_values(state)
        next_q_values = self.get_q_values(next_state)
        q_values[action] += self.alpha * (reward + self.gamma * np.max(next_q_values) - q_values[action])



## Entrenamiento:

* Se ejecutan múltiples episodios para aprender la mejor política de precios.

## Optimización de hiperparámetros:

* Usa una búsqueda en cuadrícula (GridSearch) probando diferentes valores de alpha, gamma y epsilon.
Evalúa el rendimiento midiendo la recompensa promedio en varias simulaciones.

In [25]:
# Entrenamiento del agente de Q-Learning

def train_q_learning(env, agent, episodes=500):
    for _ in range(episodes):
        state = env.reset()
        done = False
        while not done:
            action = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.update(state, action, reward, next_state)
            state = next_state
    return agent


# Optimización de hiperparámetros

def optimize_hyperparams(df):
    env = PricingEnv(df)
    param_grid = {
        'alpha': [0.1, 0.5, 0.9],
        'gamma': [0.7, 0.9, 0.99],
        'epsilon': [0.1, 0.2, 0.3]
    }
    
    best_score = -np.inf
    best_params = {}
    
    for alpha in param_grid['alpha']:
        for gamma in param_grid['gamma']:
            for epsilon in param_grid['epsilon']:
                agent = QLearningAgent(env.observation_space.shape[0], env.action_space.n, alpha, gamma, epsilon)
                trained_agent = train_q_learning(env, agent)
                score = np.mean([sum(env.step(trained_agent.choose_action(env.reset()))[1] for _ in range(10))])
                if score > best_score:
                    best_score, best_params = score, {'alpha': alpha, 'gamma': gamma, 'epsilon': epsilon}
    
    return best_params



## Ejecutar el modelo

In [None]:
def main():
    df = pd.read_csv('../data/processed/masked_data.csv')  # Cargar datos
    df['date_id'] = pd.to_datetime(df['date_id'])
    df.sort_values(by=['product_id', 'date_id'], inplace=True)
    df['month'] = df['date_id'].dt.month
    df['day_of_week'] = df['date_id'].dt.dayofweek
    
    env = PricingEnv(df)
    best_params = optimize_hyperparams(df)
    
    agent = QLearningAgent(env.observation_space.shape[0], env.action_space.n, 
                           best_params['alpha'], best_params['gamma'], best_params['epsilon'])
    trained_agent = train_q_learning(env, agent)
    
    # Evaluación del modelo
    state = env.reset()
    done = False
    total_reward = 0
    while not done:
        action = trained_agent.choose_action(state)
        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        env.render()
        state = next_state
    
    print(f"Total reward: {total_reward}")

if __name__ == "__main__":
    main()