<a href="https://colab.research.google.com/github/achanhon/coursdeeplearningcolab/blob/master/rl_sample.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gymnasium
!pip install swig
!pip install "gymnasium[box2d]"

In [None]:
import random
import torch
import gymnasium

class StateSampler:
  def __init__(self,memory):
    self.memory = memory
    w = [t for t,_,_,_,_,_,_ in memory]
    w = torch.Tensor(w)+1
    self.w = w/float(w.sum())

  def get(self,n):
    I = torch.multinomial(self.w, n, replacement=True)
    return torch.stack(memory[I][2],dim=0)

class TransitionSampler:
  def __init__(self,memory):
    self.memory = memory
    w = [totR for _,totR,_,_,_,_,_ in memory]
    w = torch.Tensor(w)*0.1
    self.w = torch.nn.functional.softmax(w,dim=0)

  def get(self,n):
    I = torch.multinomial(self.w, n, replacement=True)
    return torch.stack(memory[I][2:],dim=0)

def tokenf(f):
  out = torch.zeros(7):
  out[0]=f
  if -0.001<=f<=0.001
    out[1]=0
  else:
    if f<=0:
      out[1]=-1
    else
      out[1]=1
  f = int(abs(f)*32)
  for i in range(5):
    out[i+2] = f%2
    f = f//2
  return f

def tokens(s):
  out = [token(float(s[i])) for i in range(6)]
  out = out+[torch.Tensor([float(s[6]),float(s[7])])]
  return torch.cat(out,dim=0)

def trial(env,agent,eps):
    s, _ = env.reset(seed=0)
    totR,s,traj =0, tokens(s),[]
    agent.eval().cpu()
    for _ in range(3000):
        if random.random()<eps:
          a = int(random.random()*4)
        else:
          _,a = agent(s.view(1,-1)).max(1)
          a = int(a)

        s_, r, terminated, truncated, _ = env.step(a)
        s_,totR,a = tokens(s_),totR+r,torch.eye(4)[a]
        traj.append((s,a,r,s_))

        if terminated or truncated:
            traj = [[totR,s,a,r,s_] for (s,a,r,s_) in traj]
            for i in range(len(traj)-1):
              traj[i].append(traj[i+1][3])
            traj[-1].append(torch.zeros(4))
            return totR,traj
        else:
            s = s_

def train(agent,T,memory,nbstep):
    optimizer = schedulefree.AdamWScheduleFree(agent.parameters(), lr=0.001)

    meanloss = torch.zeros(nbstep)
    for step in range(nbstep):
        B = memory.getBatch()
        S, A, R, S_,F = B

        Q = agent.Q(S)
        QA = (Q * A).sum(1)
        Q_ = agent.V(S_,T)
        loss = ((GAMMA * Q_ * F + R - QA)**2).mean()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        with torch.no_grad():
            meanloss[step] = loss.clone()
    return float(meanloss.mean())


In [None]:
class Block(torch.nn.Module):
    def __init__(self):
        super(Block,self).__init__()

        self.strongR = torch.nn.LeakyReLU(negative_slope=0.1)
        self.weakR = torch.nn.LeakyReLU(negative_slope=0.5)
        self.f1 =torch.nn.Linear(24,8)
        self.f2 =torch.nn.Linear(8,8)
        self.f3 =torch.nn.Linear(8,8)
        self.f4 =torch.nn.Linear(8,16)

    def forward(self,x):
        f = self.strongR(self.f1(x))
        f = f+self.strongR(self.f2(f))
        f = f+self.strongR(self.f3(f))
        f = self.weakR(self.f4(f))

        tmp = torch.zeros(x.shape[0],8)
        f = torch.cat([tmp,f],dim=1)
        return x+f


class LunarAgent(torch.nn.Module):
    def __init__(self):
        super(LunarAgent,self).__init__()

        self.b1 = Block()
        self.b2 = Block()
        self.b3 = Block()
        self.b4 = Block()
        self.b5 = Block()
        self.b6 = Block()
        self.b7 = Block()
        self.b8 = Block()
        self.b9 = Block()

        self.A =torch.nn.Linear(24,4)

    def forward(self,x):
        code = torch.zeros(x.shape[0],16)
        x = torch.cat([x,code],dim=1)

        x = self.b1(x)
        x = self.b2(x)
        x = self.b3(x)
        x = self.b4(x)
        x = self.b5(x)
        x = self.b6(x)
        x = self.b7(x)
        x = self.b8(x)
        x = self.b9(x)

        return self.A(x)

    def Q(self,x):
        return self.forward(x)

    def pi(self,Q,T):
        Q = Q-Q.mean(1).view(-1,1)
        # it does not change anything from mathematical point of view
        # but it is numerically better
        return torch.nn.functional.softmax(T*Q,dim=1)

    def V(self,x,T):
        Q =self.Q(x)
        pi = self.pi(Q,T)
        return (Q*pi).sum(1)

    def sample(self,x,T):
        with torch.no_grad():
            pi = self.pi(self.Q(x.view(1,-1)),T)
            return int(torch.multinomial(pi, num_samples=1))

In [None]:
env = gymnasium.make("LunarLander-v3", continuous=False, gravity=-8.0,enable_wind=False)

T = 0.3
agent = LunarAgent()
SEUIL = 0.
for _ in range(100):
    SEUIL+=trial(env,agent,T)
SEUIL = SEUIL/100

for j in range(20):
    memory = MemoryBuffer()
    for _ in range(5):
        trial(env,agent,T,memory,seed=42)
    for i in range(10):
        trial(env,agent,T,memory,seed=i)
    for _ in range(35):
        trial(env,agent,T,memory)

    for _ in range(10+j*2):
        v = trial(env,agent,T,memory)
        l = train(agent,T,memory, nbstep=100+10*j)
        print("\t",v,l)

    tot = 0
    for _ in range(100):
        tot+=trial(env,agent,T)
    tot = tot/100
    print(j,T,tot)
    if SEUIL<tot:
        T = 1.2*T
        SEUIL = tot
