In [35]:
import torch
import gym

acrobotenv = gym.make('Acrobot-v1')
assaultenv = gym.make("ALE/Assault-v5")
print(acrobotenv.action_space)
print(acrobotenv.observation_space)
print(assaultenv.action_space)
print(assaultenv.observation_space)


Discrete(3)
Box([ -1.        -1.        -1.        -1.       -12.566371 -28.274334], [ 1.        1.        1.        1.       12.566371 28.274334], (6,), float32)
Discrete(7)
Box(0, 255, (210, 160, 3), uint8)


In [61]:
import matplotlib.pyplot as plt
def Graphs(rewardsQ, rewardsExpS, env, param):
    meanrewsQ = rewardsQ.mean(axis=0)
    meanrewsExpS = rewardsExpS.mean(axis=0)
    Qstd = rewardsQ.std(axis=0)
    ExpSstd = rewardsExpS.std(axis=0)
    plt.errorbar(range(len(meanrewsQ)), meanrewsQ, yerr=Qstd, label='Q-learning')
    plt.errorbar(range(len(meanrewsExpS)), meanrewsExpS, yerr=ExpSstd, label='Expected Sarsa')
    plt.xlabel('Episodes')
    plt.ylabel('Rewards')
    plt.title('Rewards vs Episodes for ' + env + ' with ' + param)
    plt.legend()
    plt.show()
            


In [55]:
class QvalueNet:
    def __init__(self, input_shape, output_shape, nlayers, learning_rate):
        self.input_shape = input_shape
        self.output_shape = output_shape
        self.nlayers = nlayers
        layers = []
        for i in range(nlayers):
            if i == 0:
                a = torch.nn.Linear(input_shape, 256)
            elif i == nlayers - 1:
                a = torch.nn.Linear(256, output_shape)
            else:
                a = torch.nn.Linear(256, 256)
            torch.nn.init.uniform_(a.weight, -0.001, 0.001)
            torch.nn.init.uniform_(a.bias, -0.001, 0.001)
            layers.append(a)
        self.model = torch.nn.Sequential(*layers)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
        assert self.model[-1].out_features == output_shape, "Last layer of model must have output_shape equal to the action space"
        assert self.model[0].in_features == input_shape, "First layer of model must have input_shape equal to the observation space"
        assert len([i for i in (self.model.children())]) == nlayers, "Model must have nlayers layers"
    
    def pred(self, x):
        return self.model(torch.tensor(x, dtype=torch.float32))

    def grad(self, loss):
        loss.backward()
        # GRADIENT MEAN OVER ALL LAYERS        
        self.optimizer.step()
        self.optimizer.zero_grad()

In [60]:
class ModelTrial:
    def __init__(self, env, epsilon, step_size, batch_size, loss, episodes, buffer_size=None, max_steps=100, model="Q", nlayers=3):
        self.env = env
        # INIT QVALUE NET WITH NORMAL SHAPES, + OPTIMIZER WITH DESIRED STEP SIZE
        self.net = QvalueNet(input_shape=env.observation_space.shape[0], output_shape=env.action_space.n, nlayers=nlayers, learning_rate=step_size)
        self.epsilon = epsilon
        self.episodes = episodes
        self.max_steps = max_steps
        if buffer_size == None:
            self.buffer = None
        else:
            self.buffer = []
            self.buffer_size = 1000000
        self.batch_size = batch_size
        if loss=="MSE":
            self.loss = torch.nn.MSELoss()
        elif loss=="L1":
            self.loss = torch.nn.L1Loss()
        else:
            raise ValueError("Loss must be either 'MSE' or 'L1'")
        self.model = model
        assert self.model in ["Q", "ExpS"], "Model must be either 'Q' or 'ExpS'"

    def train(self):
        # FOR ONE TRIAL
        i=0
        obs = self.env.reset()[0]
        rewards = []
        while i<self.episodes:
            # FOR EACH EPISODE
            step = 0
            done = False
            rew = 0
            while not done or step < self.max_steps:
                # FOR EACH STEP IN THE EPSIODE
                if torch.rand(1) < self.epsilon:
                    #RANDOM ACTION WITH PROB EPSILON
                    action = torch.randint(0, self.env.action_space.n, (1,))
                else:
                    # GREEDY ACTION
                    action = torch.argmax(self.net.pred(obs))
                new_obs, reward, done, truncated, info = self.env.step(action)
                # REW = CUMULATIVE REWARD FOR EPISODE
                rew += reward
                if self.buffer != None:
                    # IF BUFFER, APPEND TRANSITION TO BUFFER
                    self.buffer.append((obs, action, reward, new_obs, done))

                    # IF STEPS % BATCH_SIZE == 0, RANDOM SAMPLE + MSE LOSS
                    if (i+1)%self.batch_size == 0:
                        targets = []
                        preds = []
                        # GET RANDOM SAMPLE OF BATCH_SIZE
                        transitions = torch.randint(0, len(self.buffer), (self.batch_size,))
                        for t in transitions:
                            obs, action, reward, new_obs, done = self.buffer[t]
                            if done:
                                targ = reward
                            else:
                                if self.model == "Q":
                                    # IF Q THEN TARGET = REWARD + MAX Q VALUE OF NEXT STATE
                                    targ = reward + torch.max(self.net.pred(new_obs)).detach()
                                else:
                                    # IF EXPS THEN TARGET = REWARD + SUM OF (PROBABILITY OF ACTION * Q VALUE OF ACTION) over actions
                                    nextq = 0
                                    for i in range(self.env.action_space.n):
                                        #IN ST+1 PROBABILITY OF PICKING ACTION I * QVALUE OF ACTION I
                                        prob = 1-self.epsilon if i == torch.argmax(self.net.pred(new_obs)) else self.epsilon/(self.env.action_space.n)
                                        nextq += self.net.pred(new_obs)[i]*prob
                                    targ = reward + nextq.detach()
                            preds.append(self.net.pred(obs)[action])
                            targets.append(targ)
                        preds = torch.tensor(preds)
                        preds.requires_grad = True
                        loss = self.loss(preds, torch.tensor(targets))
                        print("LOSS:", loss)
                        self.net.grad(loss)
                            
                else:
                    if self.model == "ExpS":
                        if done:
                            targ = reward
                        else:
                            target = 0
                            for i in range(self.env.action_space.n):
                                #PROBABILITY OF PICKING ACTION I * QVALUE OF ACTION I
                                prob = 1-self.epsilon if i == torch.argmax(self.net.pred(new_obs)) else self.epsilon/(self.env.action_space.n)
                                target += self.net.pred(new_obs)[i]*prob
                            targ = reward + target.detach()
                    else:
                        if done:
                            targ = reward
                        else:
                            targ = reward + torch.max(self.net.pred(new_obs)).detach()

                    loss = -self.loss(self.net.pred(obs)[action], targ)
                    print("LOSS:", loss)
                    self.net.grad(loss)
                # NEW OBS, NEXT STEP
                obs = new_obs
                step += 1
            # END OF EPSIODE
            i += 1
            obs = self.env.reset()[0]
            rewards.append(rew)
            print("Episode #", i)
        return rewards



In [58]:
def main(params, envs, episodes, trials, models="Q"):
    # PARAMS = [epsilon, step_size, batch_size, loss, max_steps, buffer_size, nlayers]
    # ENV = [env1, env2, ...]
    # MODELS = ["Q", "ExpS"]

    for env in envs:
        if env == "Acrobot-v1":
            env = gym.make(env)
        else:
            env = gym.make("ALE/" + env)
        for param in params:
            rewardsQ = []
            rewardsExpS = []
            for modeltype in models:
                for trial in range(trials):
                    print("TRIAL #", trial)
                    model = ModelTrial(env, param[0], param[1], param[2], param[3], episodes, param[5], param[4], modeltype, param[6])
                    if modeltype == "Q":
                        rewardsQ.append(model.train())
                    else:
                        rewardsExpS.append(model.train())
            Graphs(torch.tensor(rewardsQ), torch.tensor(rewardsExpS), envs, param)

In [62]:
main([[0.2, 0.001, 10, "MSE", 100, 1000000, 3]], ["Acrobot-v1"], 100, 1, "Q")

TRIAL # 0


  if not isinstance(terminated, (bool, np.bool8)):


Episode # 1
Episode # 2
Episode # 3
Episode # 4
Episode # 5
Episode # 6
Episode # 7
Episode # 8
Episode # 9
LOSS: tensor(0.9998, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9997, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9999, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9997, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9997, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9996, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9999, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9998, grad_fn=<MseLossBackward0>)
LOSS: tensor(1., grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9999, grad_fn=<MseLossBackward0>)
LOSS: tensor(1., grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9999, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9996, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9999, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9998, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9999, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9997, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.9999, grad_fn=<MseLossBackward0>)
LOSS: tensor(0.999

KeyboardInterrupt: 