In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.distributions import Normal

In [None]:
class Actor(nn.Module):
    def __init__(self, in_dims, action_dim, max_action):
        super().__init__()
        self.fc1 = nn.Linear(in_dims, 128)
        self.fc2 = nn.Linear(128, 128)
        self.mu = nn.Linear(128, action_dim)
        self.sigma = nn.Linear(128, action_dim)
        self.max_action = max_action
        self.noise = 1e-6

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        mu = self.mu(x)
        sigma = self.sigma(x)
        sigma = torch.clamp(sigma, self.noise, 1)

        return mu, sigma
    
    def sample_normal(self, x):
        mu, sigma = self.forward(x)
        probabilities = Normal(mu, sigma)

        actions = probabilities.sample()

        action = torch.tanh(actions) * self.max_action

        log_action = probabilities.log_prob(action)

        log_action -= torch.log(1 - torch.tanh(actions)**2 + 1e-6)

        log_action = log_action.sum(dim = -1)

        return action, log_action

In [None]:
class Critic(nn.Module):
    def __init__(self, in_dims, action_dim):
        super().__init__()
        self.fc1 = nn.Linear(in_dims + action_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], dim = 1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))

        return self.fc3(x)

In [None]:
class Value(nn.Module):
    def __init__(self, in_dims):
        super().__init__()
        self.fc1 = nn.Linear(in_dims, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))

        return self.fc3(x)

In [None]:
class Buffer():
    def __init__(self, mem_size, in_dims, action_dims):
        self.mem_size = mem_size
        self.state = np.zeros((mem_size, in_dims))
        self.next_state = np.zeros((mem_size, in_dims))
        self.action = np.zeros((mem_size, action_dims))
        self.reward = np.zeros(mem_size)
        self.done = np.zeros(mem_size)
        self.counter = 0

    def store_transition(self, state, action, reward, next_state, done):
        index = self.counter % self.mem_size

        self.state[index] = state
        self.next_state[index] = next_state
        self.action[index] = action
        self.reward[index] = reward
        self.done[index] = done

        self.counter += 1

    def sample_buffer(self, batch_size):
        max_mem = min(self.counter, self.mem_size)

        batch = np.random.choice(max_mem, size = batch_size)

        states = self.state[batch]
        next_state = self.next_state[batch]
        action = self.action[batch]
        reward = self.reward[batch]
        done = self.done[batch]

        return states, action, reward, next_state, done

In [None]:
class Agent():
    def __init__(self, input_dims, action_dims, max_action, tau = 0.005, mem_size = 10000, gamma = 0.99):
        self.tau = tau
        self.actor = Actor(input_dims, action_dims, max_action)
        self.critic1 = Critic(input_dims, action_dims)
        self.critic2 = Critic(input_dims, action_dims)
        self.value = Value(input_dims)
        self.target_value = Value(input_dims)

        self.critic1_opt = torch.optim.Adam(self.critic1.parameters(), lr = 0.0003)
        self.critic2_opt = torch.optim.Adam(self.critic2.parameters(), lr = 0.0003)
        self.actor_opt = torch.optim.Adam(self.actor.parameters(), lr = 0.0003)
        self.value_opt = torch.optim.Adam(self.value.parameters(), lr = 0.0003)
        self.memory = Buffer(mem_size, input_dims, action_dims)
        self.criterion = nn.MSELoss()
        self.gamma = gamma
        self.batch_size = 256

    def update_parameters(self, tau = None):
        if tau is None:
            tau = self.tau

        with torch.no_grad():
            for param1, param2 in zip(self.value.parameters(), self.target_value.parameters()):
                param2.copy_(param1)

    def remember(self, state, action, reward, next_state, done):
        self.memory.store_transition(state, action, reward, next_state, done)

    def choose_action(self, state):

        with torch.no_grad():
            action, _ = self.actor.sample_normal(state)

        return action.cpu().detach().numpy()[0]
    
    def learn(self):
        state, action, reward, next_state, done = self.memory.sample_buffer(self.batch_size)

        state = torch.tensor(state, dtype = torch.float32)
        action = torch.tensor(action, dtype = torch.float32)
        reward = torch.tensor(reward, dtype = torch.float32)
        next_state = torch.tensor(next_state, dtype = torch.float32)
        done = torch.tensor(done, dtype = torch.float32)

        value = self.value(state)
        value_ = self.target_value(next_state)

        new_action, log_prob = self.actor.sample_normal(state)

        critic1_pred = self.critic1(state, new_action)
        critic2_pred = self.critic2(state, new_action)

        q_value = torch.min(critic1_pred, critic2_pred)

        true = q_value + log_prob

        self.value_opt.zero_grad()

        loss_value = self.criterion(value, true)

        loss_value.backward()

        self.value_opt.step()

        new_action, new_prob = self.actor.sample_normal(state)

        critic1_pred = self.critic1(state, new_action)
        critic2_pred = self.critic2(state, new_action)

        true = torch.min(critic1_pred, critic2_pred)

        actor_loss = self.criterion(new_prob, true)

        self.actor_opt.zero_grad()

        actor_loss.backward()
        
        self.actor_opt.step()

        q_hat = reward + self. gamma * value_ * (1 - done)

        q_pred1 = self.critic1(state, action)

        q_pred2 = self.critic2(state, action)

        loss1 = self.criterion(q_pred1, q_hat)
        loss2 = self.criterion(q_pred2, q_hat)

        critic_loss = loss1 + loss2

        self.critic1_opt.zero_grad()
        self.critic2_opt.zero_grad()

        critic_loss.backward()

        self.critic1_opt.step()
        self.critic2_opt.step()


        self.update_parameters()
    

In [None]:
import gymnasium as gym

In [None]:
env = gym.make("Pendulum-v1")

In [None]:
print(env.action_space.shape[0])

In [None]:
print(env.action_space.high)

In [None]:
import torch
torch.autograd.set_detect_anomaly(True)


In [None]:
agent = Agent(input_dims = env.observation_space.shape[0], action_dims = env.action_space.shape[0], max_action= env.action_space.high)
n_games = 1500
score_history = []

# Replace your training loop with this:
for i in range(n_games):
    observation = env.reset()
    observation = observation[0]  # New reset returns (obs, info)
    score = 0
    done = False
    
    while not done:
        action = agent.choose_action(torch.tensor(observation).unsqueeze(0))
        observation_, reward, terminated, truncated, info = env.step(action)
        
        # 'done' is now either terminated or truncated
        done = terminated or truncated

        agent.remember(observation, action, reward, observation_[0], done)
        
        score += reward
        agent.learn()
        observation = observation_

    score_history.append(score)
    avg_score = np.mean(score_history[-100:])

    print(f"epoch : {i} :: score : {score} :: avg_score : {avg_score}")

In [None]:
env.observation_space.shape
env.action_space.shape

In [None]:
model1 = torch.tensor([[1,2,3],
          [2,1,5],
          [5,6,7]])

model2 = torch.tensor([[45,6,77],
          [11,22,33],
          [66,77,88]])

for param1, param2 in zip(model1, model2):
    param1.copy_(param2)

print(model1)
print(model2)