<a href="https://colab.research.google.com/github/btlgs2000/Elettric80/blob/main/DQN_cartpole_from_tuple.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import gym
import tensorflow.keras as keras
from collections import deque
import random
import numpy as np
from tqdm import tqdm

In [None]:
from keras import Sequential, Input
from keras.layers import Dense

In [None]:
model = Sequential(
    [
     Input(shape=(4,)),
     Dense(100, 'relu'),
     Dense(100, 'relu'),
     Dense(100, 'relu'),
     Dense(2)
    ]
)

In [None]:
model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense (Dense)                (None, 100)               500       
_________________________________________________________________
dense_1 (Dense)              (None, 100)               10100     
_________________________________________________________________
dense_2 (Dense)              (None, 100)               10100     
_________________________________________________________________
dense_3 (Dense)              (None, 2)                 202       
Total params: 20,902
Trainable params: 20,902
Non-trainable params: 0
_________________________________________________________________


# Esempio di generazione di un episodio

In [None]:
cart_pole = gym.make('CartPole-v0')
obs = cart_pole.reset()
done = False
counter = 0
while not done:
    obs, rew, done, info = cart_pole.step(cart_pole.action_space.sample())
    counter += 1

In [None]:
NUM_EPISODES = 1_000
REPLAY_BUFFER_CAPACITY = 10_000
MIN_REPLAY_BUFFER_CAPACITY = 1_000
TARGET_MODEL_UPDATE_PERIOD = 1_000
BATCH_SIZE = 32
UPDATE_EVERY_N_STEPS = 10
# epsilon decade esponenzialmente dal valore iniziale a quello finale
# in NUM_EPISODES
EPSILON_0 = 0.9
EPSILON_N = 0.1
LR = 1e-3
GAMMA = 0.9

In [None]:
def choose_action(observation, epsilon, model):
    ''' sceglie un'azione secondo il criterio epsilon-greedy'''
    if random.random() < epsilon:
        # azione casuale
        action = random.randint(0, 1)
    else:
        # azione migliore
        action = np.argmax(model(np.expand_dims(observation, axis=0)))
    
    return action

def get_target_value(reward, next_state, target_model, gamma):
    ''' Valore target calcolato in base al reward ottenuto
    e allo stato successivo incontrato'''
    if next_state is None:
        return reward
    else:
        return reward + gamma*max(target_model(next_state))

def clone_model(model):
    ''' Clona un modello e i suoi pesi'''
    cloned = keras.models.clone_model(model)
    cloned.set_weights(model.get_weights())
    return cloned

In [None]:
optimizer = keras.optimizers.Adam(learning_rate=LR)
loss = keras.losses.Huber()

In [None]:
# copia il modello, compresi i pesi
target_model = clone_model(model)

# inizializza replay buffer
observations = deque(maxlen=REPLAY_BUFFER_CAPACITY)
actions = deque(maxlen=REPLAY_BUFFER_CAPACITY)
next_observations = deque(maxlen=REPLAY_BUFFER_CAPACITY)
rewards = deque(maxlen=REPLAY_BUFFER_CAPACITY)

global_step_counter = 0

with tqdm(total=NUM_EPISODES) as tqdm_bar:
    # for num_episode, epsilon in zip(range(1, NUM_EPISODES+1), np.logspace(np.log10(EPSILON_0), np.log10(EPSILON_N), NUM_EPISODES)):
    for num_episode, epsilon in zip(range(1, NUM_EPISODES+1), np.linspace(EPSILON_0, EPSILON_N, NUM_EPISODES)):
        # genera un episodio
        obs = cart_pole.reset()
        done = False
        episode_counter = 0
        while not done:
            # sceglie l'azione secondo il modello epsilon-greedy
            action = choose_action(obs, epsilon, model)

            # esegue l'azione
            next_obs, rew, done, info = cart_pole.step(action)

            # appends
            observations.append(obs)
            actions.append(action)
            next_observations.append(next_obs if not done else None)
            rewards.append(rew)

            # info update
            obs = next_obs
            episode_counter += 1
            global_step_counter += 1

            # update della rete
            # ogni UPDATE_EVERY_N_STEPS step se global_step_counter > MIN_REPLAY_BUFFER_CAPACITY
            if (global_step_counter > MIN_REPLAY_BUFFER_CAPACITY) and (global_step_counter % UPDATE_EVERY_N_STEPS == 0):
                # compone il batch
                # sceglie BATCH_SIZE elementi dal buffer di replay
                batch_idxs = random.sample(range(len(observations)), BATCH_SIZE)

                # target
                batch_observations = []
                batch_targets = []
                for idx in batch_idxs:
                    obs = observations[idx]
                    act = actions[idx]
                    rew = rewards[idx]
                    obs1 = next_observations[idx]

                    # valori attuali
                    target = model(np.expand_dims(obs, axis=0)).numpy().reshape(-1)

                    # modifica il valore dell'azione scelta
                    if obs1 is not None:
                        target[act] = rew + GAMMA*target_model(np.expand_dims(obs1, axis=0)).numpy().reshape(-1)[act]
                    else:
                        target[act] = rew

                    batch_observations.append(obs)
                    batch_targets.append(target)

                batch_observations = np.stack(batch_observations)
                batch_targets = np.stack(batch_targets)

                # esegui uno step dell'ottimizzatore 
                optimizer.minimize(lambda: loss(batch_targets, model(batch_observations)), model.trainable_weights)
            
            if global_step_counter % TARGET_MODEL_UPDATE_PERIOD == 0:
                # la rete target viene allineata all'altra
                target_model.set_weights(model.get_weights())

        # episodio finito
        tqdm_bar.update()
        tqdm_bar.set_postfix(dict(lunhezza_episodio=episode_counter, epsilon=epsilon))