# IMPLEMENTAZIONE

In [1]:
import sys
sys.path.insert(0, '../')


import gymnasium as gym
print(f"Gym Version = {gym.__version__}")

import AddictiveReward

import numpy as np
import numba as nb
from numba.experimental import jitclass

from gymnasium.wrappers import RecordEpisodeStatistics
from gymnasium import spaces
import random
from numpy.random import choice
#from tqdm.notebook import tqdm
# from tqdm import tqdm, tqdm_notebook
from tqdm.notebook import tqdm

import matplotlib.pyplot as plt
from typing import Optional

#serializer
from EnvSerializer import *

Gym Version = 0.29.1


## IMPLEMENTAZIONE AMBIENTE

In [2]:
# env = gym.make('AddictiveEnv_Avanzato')
env = gym.make('AddictiveEnv_Semplificato')
# env = gym.make('AddictiveEnv_Raffinato')

## AGENTE MF

### IMPLEMENTAZIONE AGENTE MF
L'agente MF basato su Q-Learning volto a stimare il valore di una Q(s, a) ottimale (dalla quale poi estrarre la politica ottimale) in un ambiente senza avere conoscenza diretta del modello di transizione di stato o delle ricompense. 

In [3]:
class MFLearningAgent:
    def __init__(self, 
                 learning_rate: float,  
                 initial_epsilon: float, 
                 epsilon_decay: float,  
                 final_epsilon: float, 
                 discount_factor: float = 0.9):
        
        # Initialize parameters
        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon
        
        # Initialize Q-values table
        self.q_values = np.zeros([env.observation_space.n, env.action_space.n])

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.training_error = []  # Track training error during training

    def get_action(self, state):
        # Choose action using epsilon-greedy strategy
        if random.uniform(0, 1) < self.epsilon: # epsilon greedy 
            return env.action_space.sample() # Explore action space
        else:
            return np.argmax(self.q_values[state]) # Exploit learned values

    def update(self, obs: int, action: int, reward: float, terminated: bool, next_obs: int):
        # Update Q-values using Q-learning equation
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (reward + self.discount_factor * future_q_value - self.q_values[obs][action])
        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

        
    def decay_epsilon(self):
       # Decay epsilon value
       self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

## AGENTE MB

##### JIT OPTIMIZED FUNCTIONS ####

In [4]:
def transition_model_to_numpy(transition_model, obs_space, action_space):
    transition_array = np.zeros((obs_space, action_space, obs_space, 3)) 
    for state in range(1, obs_space):
        for action in range(action_space):
            for next_state in range(1, obs_space):
                transition_info = transition_model[state][action].get(next_state, {'count': 0, 'probability': 0, 'reward': 0})
                transition_array[state, action, next_state] = [transition_info.get('count', 0), transition_info.get('probability', 0), transition_info.get('reward', 0)]
    return transition_array



@nb.jit
def value_iteration_gpu(iterations, obs_space, action_space, transition_model, discount_factor, q_values):
    epsilon = 0.0000001
    delta = 1.0  
    cl = 0
    while iterations > 0 and delta > epsilon: # check for iteration (MBUS) or convergence
        delta = 0
        
        for state in range(obs_space):
            tmp = -1e100
            prev = q_values[state].copy() * 1.0
            for action in range(action_space):
                t = 0
                for next_state in range(obs_space):
                    count, probability, reward = transition_model[state, action, next_state]
                    t += probability * (reward + discount_factor * np.max(q_values[next_state]))
                q_values[state, action] = t
            
                # Calculate delta
            Vs = np.max(np.abs(prev - q_values[state]))
            delta = max(delta, Vs)
        
        iterations -= 1  # Decrease the number of iterations
        cl += 1
    return q_values

@nb.jit
def softmax(x, temperature):
        e_x = np.exp(x / temperature)
        return e_x / np.sum(e_x)

# Prioritized sweeping test
@nb.jit
def prioritized_sweeping(state, transition_model, q_values, mbus, T_MB, observation_space, action_space, discount_factor):
    
    H = [0] * len(range(observation_space))
    V = [0] * len(range(observation_space))
    
    for s in range(observation_space):
        H[s] = 0
        V[s] = 0
    
    steps = 0

    all_states = list(range(observation_space))
    while steps < mbus:
        steps += 1

        state_probs = softmax(np.array([H[s] for s in range(observation_space)]), T_MB)
        
        s_idx = all_states[np.searchsorted(np.cumsum(state_probs), np.random.random(), side="right")]
        s = s_idx 
        

        for action in range(action_space):
            t = 0
            for next_state in range(observation_space):
                _, probability, reward = transition_model[s, action, next_state]
                t += probability * (reward + discount_factor * np.max(q_values[next_state]))
            q_values[s, action] = t
        
        # Update V value
        M = -1e100
        for a in range(action_space):
            M = max(q_values[s, a], M)
        delta = abs(V[s] - M)
        V[s] = M
        # Update H values
        for s_prime in range(observation_space):
            tmp = 0
            for a in range(action_space):
                _, probability, _ = transition_model[s_prime, a, s]
                tmp = max(tmp, probability)
            h_s_prime = delta * tmp
            if s_prime != s:
                H[s_prime] = max(h_s_prime, H[s_prime])
            else:
                H[s] = h_s_prime
    return q_values

### IMPLEMENTAZIONE AGENTE MB
L'agente MB è realizzato utilizzando la value iteration per ricavare le politiche ottimali

In [5]:
class MBLearningAgent:
    def __init__(self, learning_rate: float, initial_epsilon: float, epsilon_decay: float, final_epsilon: float, discount_factor: float = 0.9, mbus = 50 ,theta=0.01):
        
        # MB params
        self.q_values = np.zeros([env.observation_space.n, env.action_space.n])

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon
        
        self.mbus = mbus
        self.transition_model = {} 
        self.initialize_transition_model()
        
        self.training_error = []
        
        self.theta = theta 
        self.upd = 0

        # Parameters
        self.T_MB = 1.0  
    
    def get_action(self, state):
        if random.uniform(0, 1) < self.epsilon: # epsilon greedy 
            return env.action_space.sample() # Explore action space
        else:
            return np.argmax(self.q_values[state]) # Exploit learned values

    def value_iteration(self, iter): # no convergence old
        epsilon = 0.1
        for _ in range(1, iter):
            delta = 0
            for state in range(1, env.observation_space.n):
                for action in range(0,  env.action_space.n):
                    t = 0
                    for next_state in self.transition_model[state][action]:
                        probability = self.transition_model[state][action][next_state]['probability']
                        reward = self.transition_model[state][action][next_state]['reward']
                        t += probability * (reward + self.discount_factor * np.max(self.q_values[next_state]))
                    self.q_values[state][action] = t
                     
                Vs = abs(self.q_values[state][action] - np.max(self.q_values[state]))
                delta = max(delta, Vs)
                
            if delta < epsilon and delta > 0: # convergenza
                break
                    
    def initialize_transition_model(self):
        for state in range(env.observation_space.n):
            self.transition_model[state] = {}
            for action in range(env.action_space.n):
                self.transition_model[state][action] = {}
                for next_state in range(env.observation_space.n):
                    self.transition_model[state][action][next_state] = {'count': 0, 'probability': 0, 'reward': 0}
    
    # Facade for MF compability
    def update(self, obs, action, reward, terminated, next_obs):
        self.update_transition_model(obs, action, next_obs, reward)
        
    def update_transition_model(self, state, action, next_state, reward):
        self.transition_model[state][action][next_state]['count'] += 1
        self.transition_model[state][action][next_state]['reward'] = reward
        self.calculate_transition_probabilities()

        # For changing calculation method remove the comment and comment the other
        #1. prioritized sweeping
        transition_array = transition_model_to_numpy(self.transition_model, env.observation_space.n, env.action_space.n)
        self.q_values = prioritized_sweeping(state, transition_array, self.q_values, self.mbus, self.T_MB, env.observation_space.n, env.action_space.n, self.discount_factor)
        
        #2. value iteration JIT optimized (FAST)
        #transition_array = transition_model_to_numpy(self.transition_model, env.observation_space.n, env.action_space.n)
        #self.q_values = value_iteration_gpu(self.mbus, env.observation_space.n, env.action_space.n, transition_array, self.discount_factor, self.q_values)
        
        #3. value iteration (SLOW)
        #self.value_iteration(self.mbus)
        
    def calculate_transition_probabilities(self):
        for state in range(env.observation_space.n):
            for action in range(env.action_space.n):
                total_count = sum(self.transition_model[state][action][next_state]['count'] for next_state in self.transition_model[state][action])
                for next_state in self.transition_model[state][action]:
                    
                    c = self.transition_model[state][action][next_state]['count']
                    if total_count == 0:
                        total_count = 1
                    self.transition_model[state][action][next_state]['probability'] = c / total_count
    
    def decay_epsilon(self):
       self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)
    

## AGENTE MX

### IMPLEMENTAZIONE AGENTE MX

In [6]:
class MXLearningAgent:
    def __init__(self, lr, ie, ed, fe, Beta, Epsilon, MBUS):
        self.MF_agent = MFLearningAgent(
                            learning_rate = lr,
                            initial_epsilon = ie,
                            epsilon_decay = ed,
                            final_epsilon = fe,
                        )
        self.MB_agent = MBLearningAgent(
                            learning_rate = lr,
                            initial_epsilon = ie,
                            epsilon_decay = ed,
                            final_epsilon = fe,
                            mbus = MBUS
                        )
        self.beta = Beta # Balance model based and model free
        self.epsilon = Epsilon
        self.q_valuesMX = np.zeros([env.observation_space.n, env.action_space.n])
        
    def get_action(self, state):
        if random.uniform(0, 1) > self.epsilon:
            if len(set(self.q_valuesMX[state])) == 1:
                return env.action_space.sample() 
            else:
                return np.argmax(self.q_valuesMX[state]) 
        else:
            return env.action_space.sample()
            
    def update(self, obs, action, reward, terminated, next_obs):
        self.MF_agent.update(obs, action, reward, terminated, next_obs)
        self.MF_agent.decay_epsilon()
        self.MB_agent.update_transition_model(obs, action, next_obs, reward)
        self.MB_agent.decay_epsilon()
        for s in range(env.observation_space.n):
            for a in range(env.action_space.n):
                 self.q_valuesMX[s][a] = self.beta * self.MB_agent.q_values[s][a] + (1-self.beta) * self.MF_agent.q_values[s][a]
        

### TEST MX

In [7]:
lr = 0.1
n_episodes = 1

ie = 0.4
ed = ie / (env.unwrapped.get_iter() / 2)  # reduce the exploration over time
fe = 0.1

beta = 1.0



In [8]:
SIMULATION = 900

pbar = tqdm(total = SIMULATION)

#Serialize info
envSerializerObject = EnvSerializer()

for a in range(SIMULATION):
    envSerializerObject.clean_value_at_step()
    agentMX = MXLearningAgent(
        lr = lr,
        ie = ie,
        ed = ed,
        fe = fe,
        Beta = beta,
        Epsilon = 0.1,
        MBUS = 50,
    )
    

    obs, info = env.reset()

    done = False

    step = 1
    # play one episode
    while not done:
        action = agentMX.get_action(obs)
        next_obs, agentReward, terminated, truncated, info = env.step(action)

        # update the agent
        agentMX.update(obs, action, agentReward, terminated, next_obs)
        
        done = terminated or truncated
        state = obs
        obs = next_obs

        recommender = env.unwrapped.get_recommender()
        q_valuesRecommender = recommender.get_qValues()
        # Cumulative recommender rewards 
        rewards_recommender = recommender.get_rewards()

        selected_arm = recommender.get_arm()
        transition_model = agentMX.MB_agent.transition_model
        value_at_step_structure = [state, action, agentReward, agentMX.q_valuesMX.copy(), selected_arm, q_valuesRecommender.copy(), rewards_recommender.copy(), transition_model.copy()]

        envSerializerObject.add_value_at_step(value_at_step_structure)
    envSerializerObject.serialize_data()
    pbar.update(1)
pbar.close()

  0%|          | 0/900 [00:00<?, ?it/s]

  logger.warn(f"{pre} should be an int or np.int64, actual type: {type(obs)}")


KeyboardInterrupt: 