In [1]:
pip install gymnasium

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install gymnasium[box2d]

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install torchsummary

Note: you may need to restart the kernel to use updated packages.


In [4]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torchsummary import summary
from torch.distributions.normal import Normal
import gymnasium as gym

from itertools import count
from collections import deque
from collections import namedtuple
import random
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create environment
env = gym.make("CarRacing-v2", domain_randomize=True, continuous=False, render_mode='human')

In [5]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))
n_actions = env.action_space.n
state,_ = env.reset()
n_obj = (len(state),len(state[0][0]))
class CarAgent():
    def __init__(self, batch, learning_rate, epsilon, discount):
        self.batch_size = batch
        self.lr = learning_rate
        self.epsilon = epsilon
        self.discount = discount
        self.policy_net = carNet(n_obj, n_actions)
        self.target_net = carNet(n_obj, n_actions)
        self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=self.lr, amsgrad=True)
        self.memory = ReplayMemory(10000)
        self.steps_done = 0

    def select_action(self, state):
        sample = random.random()
        self.steps_done += 1
        if sample > self.epsilon:
            with torch.no_grad():
                return self.policy_net(state).max(1).indices.view(1,1)
        else:
            return torch.tensor([[env.action_space.sample()]], device=device)

    def optimize_model(self):
        if len(self.memory) < self.batch_size:
            return
        transitions = self.memory.sample(self.batch_size)

        batch = Transition(zip(transitions))

        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)

        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        state_action_values = self.policy_net(state_batch).gather(1, action_batch)

        next_state_values = torch.zeros(self.batch_size, device=device)
        with torch.no_grad():
            next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1).values
        
        expected_states_action_values = (next_state_values * self.epsilon)

        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_states_action_values.unsqueeze(1))

        self.optimizer.zero_grad()
        loss.backward()

        torch.nn.utils.clip_grad_value_(self.policy_net.parameters(), 100)
        self.optimizer.step()


In [6]:
#Comparing current and previous frames
class carNet(nn.Module):
    def __init__(self, n_observations, n_acts):
        super(carNet, self).__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(3, 32, 3, 1, 1),  nn.ReLU(), nn.MaxPool2d(2, 2), nn.Dropout2d())
        self.layer2 = nn.Sequential(nn.Conv2d(32, 64, 3, 1, 1),  nn.ReLU(), nn.MaxPool2d(2, 2), nn.Dropout2d())
        self.flat = nn.Flatten()
        self.fc1 = nn.Linear(36864, 1024)
        self.fc2 = nn.Linear(1024, 128)
        self.fc3 = nn.Linear(128,n_acts)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.flat(x)
        x = self.fc1(x)
        x = self.fc2(x)
        #print(x.size())
        return self.fc3(x)

In [7]:
class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [32]:
def train(agent):
    episode_durations = []
    tau = 0.005
    num_episodes = 50
    if torch.cuda.is_available():
        num_episodes = 600
        
    for i in range(num_episodes):
        print(i)
        state, info = env.reset()
        state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        state = state.permute(0,3,1,2)
        for t in count():
            print(t)
            action = agent.select_action(state)
            observation, reward, terminated, truncated, _ = env.step(action.item())
            reward = torch.tensor([reward], device=device)
            done = terminated or truncated
            next_state= None
            if not terminated:
                next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
                next_state = next_state.permute(0,3,1,2)
    
            agent.memory.push(state, action, next_state, reward)
            state = next_state
            agent.optimize_model()
            
            target_net_state_dict = agent.target_net.state_dict()
            policy_net_state_dict = agent.policy_net.state_dict()
            
            for key in policy_net_state_dict:
                target_net_state_dict[key] = policy_net_state_dict[key]*tau + target_net_state_dict[key]*(1-tau)
            agent.target_net.load_state_dict(target_net_state_dict)
            
            if done:
                episode_durations.append(t + 1)
                break

In [33]:
def main():
    agent = CarAgent(128,0.001,0.1,0.95)
    train(agent)
    '''state, info = env.reset()
    print(state)
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    print(state)
    p = carNet(0,5)
    summary(p,(3,96,96))
    s = state.permute(0,3,1,2)
    print(s.size())'''
    
if __name__  == '__main__':
    main()

0
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127


TypeError: Transition.__new__() missing 3 required positional arguments: 'action', 'next_state', and 'reward'