In [1]:
from collections import deque

import gym
import random
import numpy as np
import torch
from torch import nn
import torch.autograd as autograd
import torch.optim as optim
import torch.nn.functional as F
random.seed(0)

class Cnn(nn.Module):

    def __init__(self, classes_count):
        super().__init__()

        self.conv_net = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4),  # (b x 96 x 55 x 55)
            nn.ReLU(),
            nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2),  # section 3.3
            nn.MaxPool2d(kernel_size=3, stride=2),  # (b x 96 x 27 x 27)
            nn.Conv2d(96, 256, 5, padding=2),  # (b x 256 x 27 x 27)
            nn.ReLU(),
            nn.LocalResponseNorm(size=5, alpha=0.0001, beta=0.75, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2),  # (b x 256 x 13 x 13)
            nn.Conv2d(256, 384, 3, padding=1),  # (b x 384 x 13 x 13)
            nn.ReLU(),
            nn.Conv2d(384, 384, 3, padding=1),  # (b x 384 x 13 x 13)
            nn.ReLU(),
            nn.Conv2d(384, 256, 3, padding=1),  # (b x 256 x 13 x 13)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),  # (b x 256 x 6 x 6)
        )

        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5, inplace=True),
            nn.Linear(in_features=(256 * 6 * 6), out_features=4096),
            nn.ReLU(),
            nn.Dropout(p=0.5, inplace=True),
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(),
            nn.Linear(in_features=4096, out_features=classes_count),
        )

        self.init_bias()

    def init_bias(self):
        for layer in self.conv_net:
            if isinstance(layer, nn.Conv2d):
                nn.init.normal_(layer.weight, mean=0, std=0.01)
                nn.init.constant_(layer.bias, 0)
        nn.init.constant_(self.conv_net[4].bias, 1)
        nn.init.constant_(self.conv_net[10].bias, 1)
        nn.init.constant_(self.conv_net[12].bias, 1)

    def forward(self, x):
        x = self.conv_net(x)
        x = x.view(-1, 256 * 6 * 6)  # reduce the dimensions for linear layer input
        return self.classifier(x)

class ReplayBuffer:

    def __init__(self, max_size):
        self.max_size = max_size
        self.buffer = deque(maxlen=max_size)

    def add_replay(self, state, action, reward, next_state, done):
        experience = (state, action, np.array([reward]), next_state, done)
        self.buffer.append(experience)

    def sample(self, batch_size):
        state_batch = []
        action_batch = []
        reward_batch = []
        next_state_batch = []
        done_batch = []

        batch = random.sample(self.buffer, batch_size)

        for experience in batch:
            state, action, reward, next_state, done = experience
            state_batch.append(state)
            action_batch.append(action)
            reward_batch.append(reward)
            next_state_batch.append(next_state)
            done_batch.append(done)

        return (state_batch, action_batch, reward_batch, next_state_batch, done_batch)

    def __len__(self):
        return len(self.buffer)

class DQNAgent:

    def __init__(self, actions):
        self.classes = actions
        self.net = Cnn(len(actions))
        self.optimizer = optim.Adam(params=self.net.parameters(), lr=0.0001)
        self.replay_buffer = ReplayBuffer(1000)

    def get_action(self, state):
        state = autograd.Variable(torch.from_numpy(state).float().unsqueeze(0))
        qvals = self.net(state)
        action = np.argmax(qvals.detach().numpy())

        return action

    def compute_loss(self, batch):
        states, actions, rewards, next_states, dones = batch
        # states = [autograd.Variable(torch.from_numpy(state).float().unsqueeze(0)) for state in states]
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)

        curr_Q = self.net.forward(states).gather(1, actions.unsqueeze(1))
        curr_Q = curr_Q.squeeze(1)
        next_Q = self.net.forward(next_states)
        max_next_Q = torch.max(next_Q, 1)[0]
        expected_Q = rewards.squeeze(1) + (1 - dones) * self.gamma * max_next_Q

        loss = self.MSE_loss(curr_Q, expected_Q.detach())
        return loss

    def train(self, batch_size):

        if (len(self.replay_buffer.buffer) >= batch_size):
            batch = self.replay_buffer.sample(batch_size)
            loss = self.compute_loss(batch)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()


def main():
    env = gym.make('Pong-v0') #, render_mode="human"
    env.seed(0)
    actions = [i for i in range(0, env.action_space.n)]
    print(actions)

    agent = DQNAgent(actions)

    current_state = env.reset()
    # agent.train(current_state)
    episode_reward = 0
    while True:
        action = random.choice(actions)
        old_state = current_state
        current_state, reward, done, _ = env.step(action)
        episode_reward += reward

        agent.replay_buffer.add_replay(old_state, action, reward, current_state, done)
        agent.train(1)

        if done:
            print('Reward: %s' % episode_reward)
            break


if __name__ == '__main__':
    main()

  logger.warn(
A.L.E: Arcade Learning Environment (version 0.7.4+069f8bd)
[Powered by Stella]
  deprecation(
  deprecation(


[0, 1, 2, 3, 4, 5]


  deprecation(
  rewards = torch.FloatTensor(rewards)


RuntimeError: Given groups=1, weight of size [96, 3, 11, 11], expected input[1, 210, 160, 3] to have 3 channels, but got 210 channels instead