# Deep Q-Learning Agent

Import des libraries

In [6]:
import torch
import torch.nn as nn
from gym.spaces import Box
import torch.optim as optim
from collections import deque
from citylearn.citylearn import CityLearnEnv
import random
import numpy as np
import torch.nn.functional as F

Définition des classes permettant d'instancier l'environnement CityLearn

In [3]:
class Constants:
    episodes = 3
    schema_path = './data/citylearn_challenge_2022_phase_1/schema.json'

Fabrication du réseau de neurones

In [5]:
class DQN(nn.Module):

    def __init__(self, env, learning_rate, discrete_action_space = np.linspace(-1, 1, num = 21)):

        super(DQN,self).__init__()
        input_features = env.observation_space[0].shape[0]
        self.discrete_action_space = discrete_action_space

        self.dense1 = nn.Linear(in_features = input_features, out_features = 128)
        self.dense2 = nn.Linear(in_features = 128, out_features = 64)
        self.dense3 = nn.Linear(in_features = 64, out_features = 32)
        self.dense4 = nn.Linear(in_features = 32, out_features = len(discrete_action_space))

        self.optimizer = optim.Adam(self.parameters(), lr = learning_rate)

    def forward(self, x):

        x = torch.tanh(self.dense1(x))
        x = torch.tanh(self.dense2(x))
        x = torch.tanh(self.dense3(x))
        x = torch.tanh(self.dense4(x))

        return x

Experience replay

In [26]:
class ExperienceReplay:

    def __init__(self, env, buffer_size, min_replay_size = 1000):

        self.env = env
        self.min_replay_size = min_replay_size
        self.replay_buffer = deque(maxlen = buffer_size)
        self.reward_buffer = deque([-200.0], maxlen = 100)

        print('Please wait, the experience replay buffer will be filled with random transitions')

        obs = self.env.reset()
        discrete_action_space = np.linspace(-1, 1, num = 21)
        for _ in range(self.min_replay_size):

            action = [[np.random.choice(discrete_action_space)] for i in range(len(env.action_space))]
            new_obs, rew, done, _ = env.step(action)
            transition = (obs, action, rew, done, new_obs)
            self.replay_buffer.append(transition)
            obs = new_obs

            if done:
                obs = env.reset()

        print('Initialization with random transitions is done!')

    def add_data(self, data):
        self.replay_buffer.append(data)

    def sample(self, batch_size):

        # Echantillonage d'un batch de transitions
        transitions = random.sample(self.replay_buffer, batch_size)
        observations = np.asarray([t[0] for t in transitions])
        actions = np.asarray([t[1] for t in transitions])
        rewards = np.asarray([t[2] for t in transitions])
        dones = np.asarray([t[3] for t in transitions])
        new_observations = np.asarray([t[4] for t in transitions])

        # Conversion en tensors
        observations_t = torch.as_tensor(observations, dtype = torch.float32)
        actions_t = torch.as_tensor(actions, dtype = torch.float32).unsqueeze(-1)
        rewards_t = torch.as_tensor(rewards, dtype = torch.float32).unsqueeze(-1)
        dones_t = torch.as_tensor(dones, dtype = torch.float32).unsqueeze(-1)
        new_observations_t = torch.as_tensor(new_observations, dtype = torch.float32)

        return observations_t, actions_t, rewards_t, dones_t, new_observations_t

    def add_reward(self, reward):
        self.reward_buffer.append(reward)


Agent class

In [48]:
class dqAgent:

    def __init__(self, env, device, epsilon_decay, epsilon_start, epsilon_end, discount_rate, lr, buffer_size, discrete_action_space):
        
        # Définition des attributs
        self.env = env
        self.device = device
        self.epsilon_decay = epsilon_decay
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.discount_rate = discount_rate
        self.learning_rate = lr
        self.buffer_size = buffer_size
        self.discrete_action_space = discrete_action_space

        # Instanciation de l'ExperienceReplay et du réseau de neurones
        self.replay_memory = ExperienceReplay(self.env, self.buffer_size)
        self.online_network = DQN(self.env, self.learning_rate, self.discrete_action_space).to(self.device)

    def choose_action(self, step, observation, greedy = False):

        epsilon = np.interp(step, [0, self.epsilon_decay], [self.epsilon_start, self.epsilon_end])

        random_sample = random.random()

        if (random_sample <= epsilon) and not greedy:
            action = [self.env.action_space[0].sample() for _ in range(len(self.env.action_space))]
        
        else:
            obs_t = torch.as_tensor(observation, dtype = torch.float32)
            q_values = self.online_network(obs_t.unsqueeze(0))

            max_q_index = torch.argmax(q_values, dim = 1)
            action = [self.discrete_action_space[max_q_index.item()]]

        return action, epsilon

    def learn(self, batch_size):

        # Sample random transitions with size = batch size
        observations_t, actions_t, rewards_t, dones_t, new_observations_t = self.replay_memory.sample(batch_size)
        building = np.random.randint(0, len(self.env.observation_space))
        # Compute the target value
        target_q_values = self.online_network(new_observations_t[:,building,:])
        max_target_q_values = target_q_values.max(dim = 1, keepdim = True)[0]
        targets = rewards_t[:,building] + self.discount_rate * (1 - dones_t) * max_target_q_values
        # Compute the loss
        q_values = self.online_network(observations_t[:,building,:])
        a = actions_t[:,building,0]
        a_index = torch.as_tensor([j for i in range(len(a)) for j in range(len(self.discrete_action_space)) if abs(a[i] - self.discrete_action_space[j]) < 0.001])
        a_index = a_index.reshape((2,1))
        action_q_values = torch.gather(q_values, dim = 1, index = a_index)
        loss = F.smooth_l1_loss(action_q_values, targets)
        # Gradient descent
        self.online_network.optimizer.zero_grad()
        loss.backward()
        self.online_network.optimizer.step()