In [1]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions.categorical import Categorical
from stable_baselines3.common.buffers import ReplayBuffer
torch.set_printoptions(linewidth=120, precision=2, sci_mode=False, profile="short")
import cv2 as cv

# Doesnt support more than 1 env yet
# I dont think sampledExperienced.dones works at all, where do the names come from? Remake it with namedTuple

seed = 1
nEnvs = 1
torch_deterministic = True
env_id = "LunarLander-v2"
totalTimesteps = 10
buffer_size = int(1e6)
gamma = 0.99
tau = 0.005
batch_size = 256
policy_lr = 3e-4
q_lr = 1e-3
policyFrequency = 2
QNetworkFrequency = 1  # Denis Yarats' implementation delays this by 2.
noise_clip = 0.5
alpha = 0.2
autoEntropy = True
targetEntropyScale = 0.89

In [2]:
def linearInitialize(layer, std=np.sqrt(2), bias_const=0.0):
    torch.nn.init.orthogonal_(layer.weight, std)
    torch.nn.init.constant_(layer.bias, bias_const)
    return layer

# ALGO LOGIC: initialize agent here:
class SoftQNetwork(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.critic = nn.Sequential(
            linearInitialize(nn.Linear(np.array(env.single_observation_space.shape).prod(), 256)),
            nn.Tanh(),
            linearInitialize(nn.Linear(256, 128)),
            nn.Tanh(),
            linearInitialize(nn.Linear(128, 64)),
            nn.Tanh(),
            linearInitialize(nn.Linear(64, 1), std=1.0))

    def forward(self, x):
        return self.critic(x)

LOG_STD_MAX = 2
LOG_STD_MIN = -5

class Actor(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.actor = nn.Sequential(
            linearInitialize(nn.Linear(np.array(env.single_observation_space.shape).prod(), 256)),
            nn.Tanh(),
            linearInitialize(nn.Linear(256, 128)),
            nn.Tanh(),
            linearInitialize(nn.Linear(128, 64)),
            nn.Tanh(),
            linearInitialize(nn.Linear(64, env.single_action_space.n), std=0.01))

    def forward(self, x):
        return self.actor(x)

    def get_action(self, x):
        logits = self(x)
        policyDistribution = Categorical(logits=logits)
        action = policyDistribution.sample()
        actionProbabilities = policyDistribution.probs
        logActionProbabilities = F.log_softmax(logits, dim=1)
        return action, logActionProbabilities, actionProbabilities


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
envs = gym.vector.make(env_id, num_envs=nEnvs)
envs.single_observation_space.dtype = np.float32
totalEpisodicRewards = torch.zeros(nEnvs).to(device)

  gym.logger.warn(


In [3]:
actor = Actor(envs).to(device)
QNet1 = SoftQNetwork(envs).to(device)
QNet2 = SoftQNetwork(envs).to(device)
QNet1_target = SoftQNetwork(envs).to(device)
QNet2_target = SoftQNetwork(envs).to(device)
QNet1_target.load_state_dict(QNet1.state_dict())
QNet2_target.load_state_dict(QNet2.state_dict())
QNetsOptimizer = optim.Adam(list(QNet1.parameters()) + list(QNet2.parameters()), lr=q_lr)
actorOptimizer = optim.Adam(list(actor.parameters()), lr=policy_lr)

# Automatic entropy tuning
if autoEntropy:
    targetEntropy = -targetEntropyScale * torch.log(1 / torch.tensor(envs.single_action_space.n))
    logAlpha = torch.zeros(1, requires_grad=True, device=device)
    alpha = logAlpha.exp().item()
    alphaOptimizer = optim.Adam([logAlpha], lr=q_lr)

experiences = ReplayBuffer(
    buffer_size,
    envs.single_observation_space,
    envs.single_action_space,
    device,
    handle_timeout_termination=False,
)

allScores = []
obs, _ = envs.reset(seed=seed)
for globalStep in range(totalTimesteps):
    actions, _, _ = actor.get_action(torch.Tensor(obs).to(device))
    actions = actions.detach().cpu().numpy()

    next_obs, rewards, terminations, truncations, infos = envs.step(actions)
    experiences.add(obs, next_obs, actions, rewards, terminations, infos)
    obs = next_obs

    done = np.logical_or.reduce([terminations, truncations])
    totalEpisodicRewards += torch.tensor(rewards).to(device)
    for finalScore in totalEpisodicRewards[done]:
        print(f"Score: {finalScore:>8.2f}")
        allScores.append(finalScore.item())
    totalEpisodicRewards[done] = 0





    sampledExperiences = experiences.sample(min(batch_size, experiences.size()))
    with torch.no_grad():
        _, nextStateLogProbs, nextStateActionprobs = actor.get_action(sampledExperiences.next_observations)
        QFunction1NextTarget = QNet1_target(sampledExperiences.next_observations)
        QFunction2NextTarget = QNet2_target(sampledExperiences.next_observations)
        minQNextTarget = nextStateActionprobs * (torch.min(QFunction1NextTarget, QFunction2NextTarget) - alpha * nextStateLogProbs)
        minQNextTarget = minQNextTarget.sum(dim=1)
        nextQValue = sampledExperiences.rewards.flatten() + (1 - sampledExperiences.dones.flatten()) * gamma * (minQNextTarget)

    # use Q-values only for the taken actions
    QFunction1ActionValues = QNet1(sampledExperiences.observations).gather(1, sampledExperiences.actions.long()).view(-1)
    QFunction2ActionValues = QNet2(sampledExperiences.observations).gather(1, sampledExperiences.actions.long()).view(-1)
    QFunction1Loss = F.mse_loss(QFunction1ActionValues, nextQValue)
    QFunction2Loss = F.mse_loss(QFunction2ActionValues, nextQValue)
    QFunctionsTotalLoss = QFunction1Loss + QFunction2Loss

    QNetsOptimizer.zero_grad()
    QFunctionsTotalLoss.backward()
    QNetsOptimizer.step()

    if globalStep % policyFrequency == 0:
        for i in range(policyFrequency):
            if i > 0:
                # Sample new experiences if we make multiple updates
                sampledExperiences = experiences.sample(min(batch_size, experiences.size()))

            _, logProbabilities, actionProbabilities = actor.get_action(sampledExperiences.observations)
            with torch.no_grad():
                    batchStateValues1 = QNet1(sampledExperiences.observations)
                    batchStateValues2 = QNet2(sampledExperiences.observations)
                    batchStateValues = torch.min(batchStateValues1, batchStateValues2)

            actorLoss = (actionProbabilities * (alpha * logProbabilities) - batchStateValues).mean()

            actorOptimizer.zero_grad()
            actorLoss.backward()
            actorOptimizer.step()

            if autoEntropy:
                with torch.no_grad():
                    _, logProbabilities, _ = actor.get_action(sampledExperiences.observations)
                alphaLoss = (-logAlpha.exp()*(logProbabilities + targetEntropy)).mean()

                alphaOptimizer.zero_grad()
                alphaLoss.backward()
                alphaOptimizer.step()
                alpha = logAlpha.exp().item()

    if globalStep % QNetworkFrequency == 0:
        for param, targetParam in zip(QNet1.parameters(), QNet1_target.parameters()):
            targetParam.data.copy_(tau*param.data + (1 - tau)*targetParam.data)
        for param, targetParam in zip(QNet2.parameters(), QNet2_target.parameters()):
            targetParam.data.copy_(tau*param.data + (1 - tau)*targetParam.data)
envs.close()

TypeError: 'Tensor' object is not callable