In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import numpy as np

from custom_env import CustomDrivingEnv

# class DQNNetwork(nn.Module):
#     def __init__(self, action_space):
#         super(DQNNetwork, self).__init__()
#         self.conv_layers = nn.Sequential(
#             nn.Conv2d(1, 32, kernel_size=8, stride=4),
#             nn.ReLU(),
#             nn.Conv2d(32, 64, kernel_size=4, stride=2),
#             nn.ReLU(),
#             nn.Conv2d(64, 64, kernel_size=3, stride=1),
#             nn.ReLU()
#         )
#         self.fc_layers = nn.Sequential(
#             nn.Linear(64 * 20 * 26, 512),
#             nn.ReLU(),
#             nn.Linear(512, action_space)
#         )
        
#     def forward(self, x):
#         x = x / 255.0  # Normalize input
#         x = self.conv_layers(x)
#         x = x.view(x.size(0), -1)  # Flatten the output
#         return self.fc_layers(x)


class DQNNetwork(nn.Module):
    def __init__(self, action_space):
        super(DQNNetwork, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)

        # # Calculate the size of the flattened features after the last conv layer
        # def conv2d_size_out(size, kernel_size = 3, stride = 1):
        #     return (size - (kernel_size - 1) - 1) // stride + 1
        
        # # convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(320, 8, 4), 4, 2), 3, 1)
        # # convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(240, 8, 4), 4, 2), 3, 1)
        # # linear_input_size = convw * convh * 64

        # linear_input_size = 9 * 12 * 64
        # self.head = nn.Linear(linear_input_size, action_space)

        # def conv2d_size_out(size, kernel_size, stride, padding=0):
        #     return (size - (kernel_size - 1) - 1 + padding * 2) // stride + 1

        def conv2d_size_out(size, kernel_size, stride, padding=0):
            return (size + 2 * padding - (kernel_size - 1) - 1) // stride + 1

        # Initial size
        input_width, input_height = 320, 240

        # After first conv layer
        conv1_size_out_width = conv2d_size_out(input_width, 8, 4)  # kernel size=8, stride=4
        conv1_size_out_height = conv2d_size_out(input_height, 8, 4)

        # After second conv layer
        conv2_size_out_width = conv2d_size_out(conv1_size_out_width, 4, 2)  # kernel size=4, stride=2
        conv2_size_out_height = conv2d_size_out(conv1_size_out_height, 4, 2)

        # After third conv layer
        conv3_size_out_width = conv2d_size_out(conv2_size_out_width, 3, 1)  # kernel size=3, stride=1
        conv3_size_out_height = conv2d_size_out(conv2_size_out_height, 3, 1)

        linear_input_size = conv3_size_out_width * conv3_size_out_height * 64  # 64 is the number of output channels from the last conv layer

        self.head = nn.Linear(linear_input_size, action_space)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        print(x.shape)  # Should match the calculated flat size
        x = x.view(x.size(0), -1)
        return self.head(x)



        # # Assuming the input image is grayscale and has dimensions (1, 240, 320)
        # # After the first convolutional layer
        # convw = conv2d_size_out(320, 8, 4)  # Width after first conv layer
        # convh = conv2d_size_out(240, 8, 4)  # Height after first conv layer

        # # After the second convolutional layer
        # convw = conv2d_size_out(convw, 4, 2)
        # convh = conv2d_size_out(convh, 4, 2)

        # # After the third convolutional layer
        # convw = conv2d_size_out(convw, 3, 1)
        # convh = conv2d_size_out(convh, 3, 1)

        # # Calculate total number of features to feed into the linear layer
        # linear_input_size = convw * convh * 64  # 64 is the number of output channels from the last conv layer

        # self.head = nn.Linear(linear_input_size, action_space)


    # def forward(self, x):
    #     x = F.relu(self.conv1(x))
    #     x = F.relu(self.conv2(x))
    #     x = F.relu(self.conv3(x))
    #     print("Shape before flattening:", x.shape)  # Diagnostic print statement
    #     x = x.view(x.size(0), -1)
    #     return self.head(x)

    # def forward(self, x):
    #     x = F.relu(self.conv1(x))
    #     print("Shape after conv1:", x.shape)

    #     x = F.relu(self.conv2(x))
    #     print("Shape after conv2:", x.shape)

    #     x = F.relu(self.conv3(x))
    #     print("Shape after conv3:", x.shape)

    #     x = x.view(x.size(0), -1)  # Flatten the output


    #     return self.head(x)

class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0
    
    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity
    
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done
    
    def __len__(self):
        return len(self.buffer)

class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=0.001):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.memory = ReplayBuffer(10000)
        self.model = DQNNetwork(action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.criterion = nn.MSELoss()
        
    def update(self, batch_size):
        if len(self.memory) < batch_size:
            return
        states, actions, rewards, next_states, dones = self.memory.sample(batch_size)
        states = torch.FloatTensor(states)
        next_states = torch.FloatTensor(next_states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        dones = torch.FloatTensor(dones)

        q_values = self.model(states)
        next_q_values = self.model(next_states)
        q_value = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_value = next_q_values.max(1)[0]
        expected_q_value = rewards + 0.99 * next_q_value * (1 - dones)
        
        loss = self.criterion(q_value, expected_q_value.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def act(self, state, epsilon):
        if random.random() > epsilon:
            state = torch.FloatTensor(state).unsqueeze(0)
            q_value = self.model(state)
            action = q_value.max(1)[1].item()
        else:
            action = random.randrange(self.action_dim)
        return action
    

env = CustomDrivingEnv()
agent = DQNAgent(state_dim=(1, 240, 320), action_dim=env.action_space.n)
num_episodes = 1000
batch_size = 32

for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0
    while True:
        epsilon = max(0.01, 0.08 - 0.01 * (episode / 200))  # Linearly decreasing epsilon
        action = agent.act(state, epsilon)
        next_state, reward, done, _ = env.step(action)
        agent.memory.push(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        
        if done:
            print('Episode: {}, Total reward: {}'.format(episode, total_reward))
            break
        
        agent.update(batch_size)



torch.Size([64, 26, 36])


RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x936 and 59904x9)