In [42]:
from time import sleep

import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'mps')

In [43]:
torch.cuda.is_available()

False

# Env

In [44]:
import gymnasium as gym
import gym_pusht
from time import sleep

env = gym.make("gym_pusht/PushT-v0", render_mode="human")
observation, info = env.reset()

for _ in range(1000):
    sleep(0.2)
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    image = env.render()

    if terminated or truncated:
        observation, info = env.reset()

env.close()

KeyboardInterrupt: 

Implement the deepq Learning to compete the task

# Part 1: Numerical PushT state

## Observation Space

If obs_type is set to state, the observation space is a 5-dimensional vector representing the state of the environment: [agent_x, agent_y, block_x, block_y, block_angle]. The values are in the range [0, 512] for the agent and block positions and [0, 2*pi] for the block angle.

If obs_type is set to environment_state_agent_pos the observation space is a dictionary with: - environment_state: 16-dimensional vector representing the keypoint locations of the T (in [x0, y0, x1, y1, ...] format). The values are in the range [0, 512]. - agent_pos: A 2-dimensional vector representing the position of the robot end-effector.

If obs_type is set to pixels, the observation space is a 96x96 RGB image of the environment.

In [7]:
env.observation_space

Box(0.0, [512.         512.         512.         512.           6.28318531], (5,), float64)

The Policy model is designed to give the action $a$ given the state $s$.

input:

$$
[agent_x, agent_y, t_x, t_y, t_{angle}]
$$

output

$$
[move_x, move_y]
$$


so the model tells

$$
a = P(s)
$$

In [45]:
from torch import nn
import torch.nn.functional as F


class Policy(nn.Module):
    def __init__(self):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(env.observation_space.shape[0], 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        return x

In [46]:
model = Policy()

In [47]:
input = torch.rand((4, 5))

input

tensor([[0.8921, 0.7441, 0.9367, 0.9744, 0.6661],
        [0.0690, 0.1201, 0.0467, 0.2564, 0.5554],
        [0.7746, 0.5310, 0.0090, 0.2866, 0.5752],
        [0.3824, 0.0491, 0.6623, 0.8789, 0.7061]])

In [48]:
output = model(input)


print(output.shape)
output.detach().numpy()

torch.Size([4, 2])


array([[-0.17463213, -0.11809652],
       [-0.0337225 , -0.07954362],
       [-0.10974984, -0.05514188],
       [-0.1263717 , -0.13616867]], dtype=float32)

In [51]:
class Critic(nn.Module):
    def __init__(self):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(env.observation_space.shape[0] + env.action_space.shape[0], 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))

        return x

In [52]:
model = Critic()
input = torch.rand((4, 7))

input

tensor([[0.0183, 0.4395, 0.6245, 0.7507, 0.4421, 0.0535, 0.4591],
        [0.6714, 0.7539, 0.2639, 0.4286, 0.1981, 0.0199, 0.7007],
        [0.3981, 0.7796, 0.4714, 0.0915, 0.3333, 0.9992, 0.0658],
        [0.6996, 0.4717, 0.0114, 0.6784, 0.1693, 0.4736, 0.8691]])

In [53]:
output = model(input)


print(output.shape)
output.detach().numpy()

torch.Size([4, 1])


array([[0.],
       [0.],
       [0.],
       [0.]], dtype=float32)

Critic model is aimed to estimate

$$
Q(s,a)
$$

it takes concat input $[s,a]$, more detailed

input

$$
[agent_x, agent_y, t_x, t_y, t_{angle}, move_x, move_y]
$$

output

$$
score
$$


so it give single estimate of $Q$ value

$$
score = Q(s,a)
$$

The Policy model is trained using Critic model

Since we cant directly access the action value $Q(s,a)$ we use the critic model that gives estimate $Q'(s,a)$ to understand the value of this state action pair. Then we could use this estimate to compute the loss


$$
loss = -Q'(s,a)
$$

\- is used because optimization tasks aim to minimize function, hence minimizing -f is equivalent to maximizing f

In [None]:
def train_policy(policy_model, critic_model, input, optimizer):
    optimizer.zero_grad()

    input = input.to(device)

    output = policy_model(input)

    critic_input = torch.cat((input, output), dim=1)

    score = critic_model(critic_input)

    loss = -score

    loss.backward()
    optimizer.step()

    return loss.item()

For training critic, we use the actual reward we received from the env, bootstrap with critic model and train it with basic MSE loss

In [55]:
def train_critic(model, input, target, optimizer):
    optimizer.zero_grad()
    criterion = nn.MSELoss()

    model.train()

    input = input.to(device)

    output = model(input)

    loss = criterion(output, target)

    loss.backward()
    optimizer.step()

    return loss.item()

so in my approach the policy actually changes, but I could ignore the fact that actions were recorded under another distribution because I am using Q, which gives some level of abstraction which is stable for the environment and any optimal policy will converge to identical function. Then the agent here is just the another function that is trained on this level of the representation of the environment. If I had the training process without Q where the policy is responsible to somehow incorporate the knowledge of the environment inside itself then I need to also think of sampling, because on the interpretation level the actions were done on that perception of env, which changed with policy.

this is idea of the model-based and model-free and particaul;larty off-policy and on-policy in the fact that env is not encoded in the model

Here comes the problem since the update rule is based on MSE of the value
$$
(Q(s,a), R(s,a))
$$

where target $R(s,a)$ is calculated

$$
R(s,a) = r + \gamma \cdot Q(s', \pi(s'))
$$

then the update rule:

$$
Q(s,a) \leftarrow \alpha \cdot (Q(s,a) - (r + \gamma \cdot Q(s', \pi(s'))))^2
$$

In [56]:
import random

class Memory:
    def __init__(self, batch_size = 128):
        self.items = []
        self.batch_size = batch_size

    def add(self, item):
        self.items.append(item)

    def sample(self):
        return random.sample(self.items, self.batch_size)

In [24]:
def train(policy_model, value_model, memory, episodes = 10000, max_steps = 1000):
    env = gym.make("gym_pusht/PushT-v0", obs_type="state", render_mode="rgb_array")

    for ep in range(episodes):

        state, info = env.reset()

        for t in range(max_steps):

            state = torch.stack([torch.from_numpy(state).float()])

            action = model.forward(state)[0].detach().numpy()

            new_state, reward, terminated, truncated, info = env.step(action)

            sarsa = (state, action, reward, new_state)

            memory.add(sarsa)

            if len(memory) > memory.batch_size:
                train_set = memory.sample()
                policy_loss = train_policy()

train(model)


KeyboardInterrupt: 