In [None]:
import pandas as pd
import numpy as np

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
from gymnasium import Env
from gymnasium.spaces import MultiDiscrete, Discrete

In [None]:
class RecommenderSysEnv(Env):

    def __init__(self, df: pd.DataFrame):
        super(RecommenderSysEnv, self).__init__()
        # Carico il dataset della sperimentazione
        self.__dataset = df
        # Definisco spazio stato-azione
        self.observation_space = MultiDiscrete([5, 5, 5, 5, 5, 5])
        self.action_space = Discrete(5, start = 1)

    def reset(self):
        # Genero uno stato iniziale casuale
        self.__n_steps = 0
        self.__stato_attuale = np.random.randint(1, 6, size=6)

        return self.__stato_attuale.tolist(), {}
    
    def step(self, action):
        # Aggiorno il numero di step intrapresi
        self.__n_steps += 1
        # Calcolo la ricompensa
        reward = self.__reward_function(action)
        # Verifico la terminazione dell'epoca
        done = True if self.__n_steps >= 500 else False
        # Calcolo lo stato Successivo
        self.__stato_attuale = np.append(self.__stato_attuale, action)[1:]
        
        return self.__stato_attuale, reward, done, {}
    
    def __reward_function(self, action):
        match_state_action = self.__dataset['bmi_variation'].loc[
            (self.__dataset['sb1'] == self.__stato_attuale[0]) &
            (self.__dataset['sb2'] == self.__stato_attuale[1]) &
            (self.__dataset['sb3'] == self.__stato_attuale[2]) &
            (self.__dataset['sb4'] == self.__stato_attuale[3]) &
            (self.__dataset['sb5'] == self.__stato_attuale[4]) &
            (self.__dataset['sb6'] == self.__stato_attuale[5]) &
            (self.__dataset['action_bin'] == action)
        ]

        bmi_variaton = match_state_action.min()

        if bmi_variaton > 0:
            return 1
        elif bmi_variaton < 0:
            return -1
        else:
            return 0

In [None]:
class QLearningAgent():

    def __init__(self, alpha = 0.3, gamma = 0.9):
        self.__alpha = alpha
        self.__gamma = gamma
        self.__Q = {}

        # Inizializzo la Q_table
        for i in range(111111, 555556):
            self.__Q[str(i)] = np.zeros(5, dtype=float)

    def get_q_table(self):
        return self.__Q

    def scegli_azione(self, stato):
        q_key = self.__converti_stato_in_chiave(stato)
        return np.argmax(self.__Q[q_key]) + 1
    
    def aggiorna_q_table(self, stato, azione, ricompensa, stato_successivo):
        self.__Q[self.__converti_stato_in_chiave(stato)][azione - 1] += self.__alpha * (ricompensa + self.__gamma * np.max(self.__Q[self.__converti_stato_in_chiave(stato_successivo)]) - self.__Q[self.__converti_stato_in_chiave(stato)][azione - 1])

    @staticmethod
    def __converti_stato_in_chiave(stato):
        return ''.join(map(str, stato))

In [None]:
def training_phase(env: RecommenderSysEnv, agent: QLearningAgent):
    for episode in range(1000):
        print('Episodio {}'.format(episode + 1))
        state, _ = env.reset()
        done = False

        while not done:
            action = agent.scegli_azione(state)
            next_state, reward, done, _ = env.step(action)
            agent.aggiorna_q_table(state, action, reward, next_state)
            state = next_state

In [None]:
def test_phase(X_test: pd.DataFrame, agent: QLearningAgent):
    w_match_gain, w_match_loss, w_miss_gain, w_miss_loss = 0, 0, 0, 0

    for _, row in X_test.iterrows():
        stato = map(int, row.iloc[:6])
        azione_predetta = agent.scegli_azione(stato)
        azione_effettiva = row.iloc[-1]
        variazione_bmi = row.iloc[-2]

        # Verifica validità predizione
        if (azione_predetta == azione_effettiva) and (variazione_bmi > 0):
            w_match_gain += 1
        elif (azione_predetta != azione_effettiva) and (variazione_bmi > 0):
            w_miss_gain += 1
        elif (azione_predetta == azione_effettiva) and (variazione_bmi <= 0):
            w_match_loss += 1
        elif (azione_predetta != azione_effettiva) and (variazione_bmi <= 0):
            w_miss_loss += 1
    
    # Calcolo la precisione del modello
    precisione = w_match_loss / (w_match_loss + w_miss_loss)

    # Calcolo l'accuratezza del modello
    accuratezza = (w_match_gain + w_match_loss) / (w_match_loss + w_match_gain + w_miss_loss + w_miss_gain)

    return precisione, accuratezza


In [None]:
def main():
    df = pd.read_csv('../../dataset/dataset_cbr.csv')

    X_train, X_test = train_test_split(df, test_size=0.1, random_state=46)

    env = RecommenderSysEnv(X_train)
    agent = QLearningAgent()

    training_phase(env, agent)

    pr, acc = test_phase(X_test, agent)

    display('Accuratezza: {:.2f}'.format(acc))
    display('Precisione: {:.2f}'.format(pr))

In [None]:
main()