In [1]:
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import cobot_ai4robotics
import gym

In [2]:
class DeepQNetwork(nn.Module):
    def __init__(self, lr, input_dims, fc1_dims, fc2_dims, action_dim):
        super(DeepQNetwork, self).__init__()
        self.fc1 = nn.Linear(*input_dims, fc1_dims)
        self.fc2 = nn.Linear(fc1_dims, fc2_dims)
        self.fc3 = nn.Linear(fc2_dims, action_dim)
        self.optimizer = optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        actions = self.fc3(x)
        return actions
    
class Agent():
    def __init__(self, gamma, epsilon, lr, input_dims, batch_size, action_dim, max_mem_size=100000, eps_end=0.0001, eps_dec=5e-4):
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = lr
        self.batch_size = batch_size
        self.action_dim = action_dim
        self.mem_size = max_mem_size
        self.eps_end = eps_end
        self.eps_dec = eps_dec
        self.Q_eval = DeepQNetwork(lr, input_dims=input_dims, fc1_dims=256, fc2_dims=256, action_dim=action_dim)
        self.state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.new_state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
        self.action_memory = np.zeros((self.mem_size, action_dim), dtype=np.float32)
        self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
        self.terminal_memory = np.zeros(self.mem_size, dtype=bool)
        self.mem_cntr = 0

    def choose_action(self, observation):
        if np.random.random() > self.epsilon:
            state = T.tensor([observation], dtype=T.float32).to(self.Q_eval.device)
            actions = self.Q_eval(state)
            action = actions.detach().cpu().numpy()[0]
        else:
            action = np.random.uniform(-1, 1, self.action_dim)  # Sample random action within bounds
        return action

    def learn(self):
        if self.mem_cntr < self.batch_size:
            return
        
        self.Q_eval.optimizer.zero_grad()
        max_mem = min(self.mem_cntr, self.mem_size)
        batch = np.random.choice(max_mem, self.batch_size, replace=False)

        state_batch = T.tensor(self.state_memory[batch]).to(self.Q_eval.device)
        new_state_batch = T.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)
        action_batch = T.tensor(self.action_memory[batch]).to(self.Q_eval.device)
        reward_batch = T.tensor(self.reward_memory[batch]).to(self.Q_eval.device)
        terminal_batch = T.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)

        predicted_actions = self.Q_eval(state_batch)
        target_actions = self.Q_eval(new_state_batch).detach()
        target_actions[terminal_batch] = 0.0

        # Assuming reward_batch is broadcasted correctly
        q_target = reward_batch.unsqueeze(1) + self.gamma * target_actions

        # Update network weights
        loss = self.Q_eval.loss(predicted_actions, q_target)
        loss.backward()
        self.Q_eval.optimizer.step()

        self.epsilon = max(self.epsilon - self.eps_dec, self.eps_end)
        
    def store_transition(self, state, action, reward, state_, done):
        index = self.mem_cntr % self.mem_size
        self.state_memory[index] = state
        self.new_state_memory[index] = state_
        self.action_memory[index] = action
        self.reward_memory[index] = reward
        self.terminal_memory[index] = done
        self.mem_cntr += 1


In [3]:
import torch
import gym
import numpy as np

# Define your network and agent as previously done

# Setup your environment and agent
env = gym.make("cobot_ai4robotics", renders=False, isDiscrete=False)
input_dims = env.observation_space.shape
action_dim = env.action_space.shape[0]
lr = 0.001
fc1_dims = 256
fc2_dims = 256
gamma = 0.99
epsilon = 1.0
batch_size = 64
max_mem_size = 100000
eps_end = 0.0001
eps_dec = 5e-4

agent = Agent(gamma, epsilon, lr, input_dims, batch_size, action_dim, max_mem_size, eps_end, eps_dec)

EPISODES = 5000
CHECKPOINT_EVERY = 500  # Save the model every 500 episodes

# Training Loop
for episode in range(EPISODES):
    state = env.reset()
    done = False
    score = 0

    while not done:
        action = agent.choose_action(state)
        next_state, reward, done, info = env.step(action)
        score += reward
        
        # Store transition and perform a learning step
        agent.store_transition(state, action, reward, next_state, done)
        agent.learn()
        
        state = next_state
    
    # Optionally decrease epsilon
    agent.epsilon = max(agent.epsilon - agent.eps_dec, agent.eps_end)
    
    # Output the score at the end of each episode
    print(f'Episode {episode + 1}/{EPISODES}, Score: {score}')
    
    # Save the model at specified checkpoint intervals
    if (episode + 1) % CHECKPOINT_EVERY == 0:
        torch.save(agent.Q_eval.state_dict(), f'model_weights_{episode + 1}.pth')
        print(f"Saved model checkpoint at episode {episode + 1}")

# Save final model
torch.save(agent.Q_eval.state_dict(), 'final_model_weights.pth')
print("Saved final model weights")

# Close the environment
env.close()


  logger.warn(
  logger.warn(
  logger.warn(
  logger.deprecation(
  if not isinstance(done, (bool, np.bool8)):
  logger.warn(
  logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")


Contact by ball no. 3 at point (0.12 -0.09 1.36) on KUKA.
Contact by ball no. 3 at point (0.12 -0.12 1.35) on KUKA.
Contact by ball no. 3 at point (0.11 -0.12 1.35) on KUKA.
Episode 1/5000, Score: -150
Episode 2/5000, Score: 0
Episode 3/5000, Score: 0
Episode 4/5000, Score: 0
Episode 5/5000, Score: 0
Episode 6/5000, Score: 0
Episode 7/5000, Score: 0
Episode 8/5000, Score: 0
Episode 9/5000, Score: 0
Episode 10/5000, Score: 0


error: GetBasePositionAndOrientation failed.