In [2]:
%load_ext autoreload
%autoreload 2

from tqdm import tqdm
from typing import List

import numpy as np
import cv2

import torch
import torch.linalg as LA

from camera_transition import CameraTransition
from models import (
    ActorImprovedValue,
    CriticTD,
    ActorModel,
    CriticModel)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
def torch_rodrigues(mat):
    U, _, V_T = LA.svd(mat)
    R = U @ V_T
    r1 = R[:, 2, 1] - R[:, 1, 2]
    r2 = R[:, 0, 2] - R[:, 2, 0]
    r3 = R[:, 1, 0] - R[:, 0, 1]

    r = torch.stack((r1, r2, r3), 1)
    s = LA.norm(r, dim=1) / 2
    c = (R[:, 0, 0] + R[:, 1, 1] + R[:, 2, 2] - 1) / 2
    c = torch.clip(c, -1., 1.)
    theta = torch.acos(c)
    vth = 1 / (2 * s)
    r = r.T * vth * theta
    return r.T

In [10]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

camera_mat = torch.tensor([
    [633.09029639, 0., 629.06462963],
    [0., 638.7544391, 362.74013262],
    [0., 0., 1.]
])
target_points = torch.tensor([
    [755.25,     476.25],
    [822.75,     362.25],
    [624.75,     476.25],
    [554.25,     366.75],
    [755.25,     270.75],
    [618.75,     273.75]])
points_env = torch.tensor([
    [2, 0, 4],
    [1, 1.66, 4],
    [1, 1.66, 4],
    [-2, 0, 4],
    [-1, -1.66, 4],
    [-1, -1.66, 4],
])

camera_transition = CameraTransition(
    device,
    camera_mat,
    target_points,
    points_env,
    1.
)

In [12]:
optimizer_critic_kind = torch.optim.Adam
optimizer_critic_parameters = {
    'lr': 1e-5,
    'weight_decay': .0
}

optimizer_actor_kind = torch.optim.Adam
optimizer_actor_parameters = {
    "lr" : 0.001,
    "weight_decay" : 0.0
}

critic_iterations = 2000
critic_batch_size = 4000

actor_iterations = 400
actor_batch_size = 4000

epochs = 10

In [13]:
actor = ActorModel().to(device)
critic = CriticModel().to(device)

critic_temporal_difference = CriticTD(actor, critic, camera_transition).to(device)
actor_improved_value = ActorImprovedValue(actor, critic, camera_transition).to(device)

In [39]:
def critic_epoch(optimizer: torch.optim.Optimizer,
                 model: CriticTD, 
                 iterations: int, 
                 batch_size: int) -> List[float]:
    losses = []
    for iteration in tqdm(range(iterations), "Critic epoch"):
        # Поменять на наши ограничения позиции
        scales = torch.tensor([10, 10, 10, 0.15, 0.15, 0.15])
        X = (torch.rand((batch_size, 6)) * scales * 2 - scales).to(device)
        optimizer.zero_grad()
        loss = model(X)
        loss.backward()
        optimizer.step()
        losses.append(loss.detach().cpu().numpy())
    print("Critic mean loss:", np.mean(losses), "[%f --> %f]" % (np.mean(losses[0:10]), np.mean(losses[-10:-1])))
    return losses

def actor_epoch(optimizer: torch.optim.Optimizer,
                 model: CriticTD, 
                 iterations: int, 
                 batch_size: int) -> List[float]:
    values = []
    for iteration in tqdm(range(iterations), "Actor epoch"):
        # Поменять на наши ограничения позиции
        scales = torch.tensor([10, 10, 10, 0.15, 0.15, 0.15])
        X = (torch.rand((batch_size, 6)) * scales * 2 - scales).to(device)
        optimizer.zero_grad()
        improved_value = model(X)
        improved_value.backward()
        optimizer.step()
        values.append(improved_value.detach().cpu().numpy())
    print("Actor mean value:", np.mean(values), "[%f --> %f]" % (np.mean(values[0:10]), np.mean(values[-10:-1])))
    return values

In [40]:
optimizer_actor = optimizer_actor_kind(actor_improved_value.parameters(), **optimizer_actor_parameters)

In [42]:
for _ in tqdm(range(epochs), "Actor-Critic learning"):
    optimizer_critic = optimizer_critic_kind(critic_temporal_difference.parameters(), **optimizer_critic_parameters) ## It is important to reinitialize the critic optimizer to erase irrelevant momenta and adaptations
    values = np.array(critic_epoch(optimizer_critic, 
                                   critic_temporal_difference,
                                   critic_iterations,
                                   actor_batch_size))
    actor_epoch(optimizer_actor, 
                actor_improved_value,
                actor_iterations,
                actor_batch_size)

Actor-Critic learning:   0%|          | 0/10 [00:00<?, ?it/s]

Critic epoch:   0%|          | 0/2000 [00:00<?, ?it/s]

tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0')
tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0', grad_fn=<TanhBackward0>)
tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0')
tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0', grad_fn=<TanhBackward0>)
tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0')
tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0', grad_fn=<TanhBackward0>)
tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0')
tensor([[1.],
        [1.],
        [1.],
        ...,
        [1.],
        [1.],
        [1.]], device='cuda:0', grad_

KeyboardInterrupt: 

In [None]:
actor(torch.tensor([0., 0., 1., 0.1, 0.1, 0.]).to(device))

tensor([ 0.7029, -0.6145, -0.3581, -0.3022,  0.0942,  0.3870], device='cuda:0',
       grad_fn=<CatBackward0>)