In [1]:
import torch 
import torch.nn as nn 
import torch.functional as F
import torch.optim as optim

import numpy as np

import gymnasium

import random 

from datetime import datetime

ModuleNotFoundError: No module named 'gymnasium'

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display

In [None]:
from collections import namedtuple , deque
Transition = namedtuple("Transition",["state","action","next_state","reward"])

In [None]:
import warnings
warnings.filterwarnings("ignore", message="To copy construct from a tensor")

In [None]:
class ReplayMemory():
    def __init__(self,maxlen : int):
        self.memory = deque(maxlen=maxlen)

    def push(self,x : Transition):
        self.memory.append(x)

    def sample(self,batch_size : int) -> list[Transition]:
        return random.sample(self.memory,batch_size)
    
    def __len__(self):
        return len(self.memory)

In [None]:
def test_replay():
    memory = ReplayMemory(3)
    memory.push(Transition(1,2,3,4))
    memory.push(Transition(11,12,13,14))
    print(memory.memory)
    print(len(memory))
    print(memory.sample(1))

#test_replay()

In [None]:
class DQN(nn.Module):
    def __init__(self):
        super(DQN,self).__init__()
        nb = 100
        self.network = nn.Sequential(
            nn.Linear(2,nb),
            nn.ReLU(),

            nn.Linear(nb,nb),
            nn.ReLU(),

            nn.Linear(nb,4),
        )
    
    def forward(self,x):
        return self.network(x)
    
    def save(self,filename : str = None):
        if (filename == None):
            filename = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
        torch.save(self.state_dict(),filename)

    def load(self,filename : str):
        self.load_state_dict(torch.load(filename, weights_only=True))

In [None]:
class Env():
    def __init__(self,env_name):
        # you could want to customize options like continuous from example
        self.env = gymnasium.make(env_name,render_mode="rgb_array")
        self.done = False
        self.state_gym,_ = self.env.reset()
        self.model = DQN()
        self.replay = ReplayMemory(10000)

    # Un "état" un batch de taille 1 : donc un tenseur 1*1
    # ou None 
    def state(self):
        if (self.state_gym == None or self.done) :
            return None
        else :
            y = self.state_gym//12
            x = self.state_gym % 12
            return torch.tensor([[x,y]],dtype=torch.float)

    def show_state(self):
        img = self.env.render()
        plt.imshow(img)
        plt.axis("off")
        display.clear_output(wait=True)
        plt.show()

    def reset(self):
        self.done = False
        self.env.reset()
    
    def dist(state):
       goal = torch.tensor([[11,3]],dtype=float)
       start = torch.tensor([[0,3]],dtype=float)
       if (torch.equal(state,start)):
           return torch.tensor(13)
       else :
           return torch.sum(torch.abs(state-goal))
       

    # 0: Move up
    # 1: Move right
    # 2: Move down
    # 3: Move left   
    # Action : tensor ->  State : tensor , Reward : tensor , Done : bool 
    def step(self,action : torch.tensor) :
        if (self.done):
            raise(ValueError("Trying to move from a final state"))

        prev_state = self.state()
        # do the step and update the new gym state
        gym_next_state,_,terminated,truncated,_ = self.env.step(action.item())
        self.state_gym = gym_next_state
        self.done = terminated or truncated or (gym_next_state == 36)

        next_state = self.state()

        # tomber dans le trou ou revenir au point de départ est considéré comme une fin de partie
        if (truncated or (gym_next_state == 36)) :
            reward = -10
        # arriver au cookie est considéré comme une victoire et une fin de partie
        elif (terminated) :
            reward = 10
        # si on ne sait pas rapproché de l'objectif on est pénalisé
        elif (Env.dist(prev_state) <= Env.dist(next_state)) :
            reward = -2
        else :
            reward = 1
        
        reward = torch.tensor(reward,dtype=torch.float).unsqueeze(0)
        action = torch.tensor(action.item()).reshape((1,1))

        transition = Transition(prev_state, action, next_state , reward)
        self.replay.push(transition)

        return transition
    
    def policy(self):
        if (self.done):
            raise(ValueError("Trying to predict a move from a final state"))
        return self.model(self.state()).max(1).indices.reshape((1,1))
    
    # Input : un état non terminal
    # Output : un tenseur 1*1 représentant une action possible
    def random_action(self) -> torch.tensor :
        if (self.done):
            raise(ValueError("Trying to sample a move from a final state"))
        gym_action = self.env.action_space.sample()
        return torch.tensor(gym_action).reshape((1,1))

In [None]:
def test_state() :
    game_name = "CliffWalking-v0"
    env= Env(game_name)
    print(env.state())
    print(env.step( torch.tensor(0)  ))
    print(env.step( torch.tensor(1)  ))
    print(env.step(  torch.tensor(0)))
    print(env.step(  torch.tensor(2)))
    print()
    for i in range(11):
        env.step( torch.tensor(1))
    print(env.step(torch.tensor(2)))

# Testing 
# state, step, computation of reward , type of the state, size of the state, type of the action
test_state()

tensor([[0., 3.]])
Transition(state=tensor([[0., 3.]]), action=tensor([[0]]), next_state=tensor([[0., 2.]]), reward=tensor([1.]))
Transition(state=tensor([[0., 2.]]), action=tensor([[1]]), next_state=tensor([[1., 2.]]), reward=tensor([1.]))
Transition(state=tensor([[1., 2.]]), action=tensor([[0]]), next_state=tensor([[1., 1.]]), reward=tensor([-2.]))
Transition(state=tensor([[1., 1.]]), action=tensor([[2]]), next_state=tensor([[1., 2.]]), reward=tensor([1.]))

Transition(state=tensor([[11.,  2.]]), action=tensor([[2]]), next_state=None, reward=tensor([10.]))


In [None]:
def test_sample_rand():
    game_name = "CliffWalking-v0"
    env= Env(game_name)
    action = env.random_action()
    print(action) 
    print(env.step(action))

    for i in range(1000):
        env.reset()
        while(not(env.done)):
            env.step(env.random_action())

# test random policy 
test_sample_rand()

tensor([[2]])
Transition(state=tensor([[0., 3.]]), action=tensor([[2]]), next_state=None, reward=tensor([-10.]))


In [None]:
def test_policy():
    game_name = "CliffWalking-v0"
    env= Env(game_name)
    print(env.model(env.state()))
    print(env.policy())
    for i in range(1000):
        env.reset()
        while(not(env.done)):
            env.step(env.random_action())

#test_policy()

In [None]:
def optimize(env : Env,optimizer,criterion,batch_size,discount_factor):
    if (len(env.replay) < batch_size) :
        return 

    transition = env.replay.sample(batch_size)
    batch = Transition(*zip(*transition))

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    expected = env.model(state_batch).gather(1,action_batch)

    next_state_value = torch.zeros((batch_size,1))

    if (len( [s for s in batch.next_state if s is not None]  ) > 0) :
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.bool )
        non_final_next_state = torch.cat([s for s in batch.next_state if s is not None])
        with torch.no_grad():
            next_state_value[non_final_mask] = env.model(non_final_next_state).max(1).values.unsqueeze(1)
        
    predicted = reward_batch + discount_factor * next_state_value

    optimizer.zero_grad()
    loss = criterion(predicted,expected)
    loss.backward()
    torch.nn.utils.clip_grad_value_(env.model.parameters(), 100)
    optimizer.step()

In [None]:
# testing optimize



In [None]:
def optimizer_one_by_one(env,transition,optimizer,criterion,discount_factor):
        predicted_value = env.model(transition.state)[0][transition.action[0][0]]
        with torch.no_grad() :
            if (transition.next_state == None) :
                expected_value = transition.reward
            else :
                expected_value = transition.reward + discount_factor * env.model(transition.next_state).max(1).values.item()

        optimizer.zero_grad()
        loss = criterion(predicted_value,expected_value)
        loss.backward()
        torch.nn.utils.clip_grad_value_(env.model.parameters(), 100)
        optimizer.step()

In [None]:
def training(env):
    #env.model.load(filename)
    filename =  "saved_model/"  + datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
    epochs = 10000
    batch_size = 32

    epsilon_max = 1
    epsilon_min = 0.02
    epsilon_decay = 1000.
    lr = 1e-3
    discount_factor = 0.9

    optimizer = optim.AdamW(env.model.parameters(), lr=lr, amsgrad=True)
    criterion = nn.SmoothL1Loss()

    reward = 0

    for i in range(epochs):
        env.reset()
        epsilon = epsilon_min + (epsilon_max-epsilon_min)*np.exp(-i/epsilon_decay)
        while(not(env.done)):
            if (random.random() <= epsilon ):
                action = env.random_action()
            else :
                with torch.no_grad() :
                    action = env.policy()
            transition = env.step(action)
            reward += transition.reward

            #optimizer_one_by_one(env,transition,optimizer,criterion,discount_factor)
            optimize(env,optimizer,criterion,batch_size,discount_factor)

            env.show_state()
    
    print(reward/epochs)
                
    #     if (i%100 == 0) :
    #          env.model.save(filename)
    # env.model.save(filename)

In [None]:
game_name = "CliffWalking-v0"
env= Env(game_name)
training(env)

KeyboardInterrupt: 

In [None]:
env.reset()
env.model(env.state())

tensor([[-18.9484,  -9.9799,  -9.9577,  -9.9794]], grad_fn=<AddmmBackward0>)

In [None]:
def evaluate(env : Env, try_n : int = 100, maxlen : int = 100):
    s = .0
    for i in range(try_n):
        env.reset()
        for j in range(maxlen):
            transition = env.step(env.policy())
            s += transition.reward
            if (env.done) : 
                break
    return s/try_n

game_name = "CliffWalking-v0"
env= Env(game_name)
evaluate(env,1000)


tensor([-10.])

In [None]:
# game_name = "CliffWalking-v0"
# env= Env(game_name)

# env.step(torch.tensor(0))
# env.step(torch.tensor(1))
# env.step(torch.tensor(2))
# print(env.state())



# batch = env.replay.sample(3)
# tbatch = Transition(*zip(*batch))
# print(tbatch[0])
# print(tbatch[1])
# print(tbatch[2])
# print(tbatch[3])

# state_batch = torch.cat(tbatch.state)
# action_batch = torch.cat(tbatch.action)
# reward_state_batch = torch.cat(tbatch.reward)

# non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, tbatch.next_state)) )
# non_final_next_state = torch.cat([s for s in tbatch.next_state if s is not None])
# nxt = torch.zeros((3,1))
# nxt[non_final_mask] = env.model(non_final_next_state).max(1).values.unsqueeze(1)


# t = env.model(state_batch)
# print(t)
# print(action_batch)

# print(t.gather(1,action_batch))

# #t2 = env.model()
# # res[non_final_mask] = 



In [None]:
#print(evaluate(env))

In [None]:
# game_name = "CliffWalking-v0"
# env= Env(game_name)

# env.step(torch.tensor(0))
# env.step(torch.tensor(1))
# env.step(torch.tensor(2))

# epsilon_max = 1
# epsilon_min = 0.05
# epsilon_decay = 1000.0

# lr = 1e-3
# discount_factor = 0.9

# optimizer = optim.AdamW(env.model.parameters(), lr=lr, amsgrad=True)
# criterion = nn.MSELoss()

# optimize(env,optimizer,criterion,3,discount_factor)

In [None]:
game_name = "CliffWalking-v0"
env= Env(game_name)
#training(env)