In [10]:
import numpy as np
import matplotlib.pyplot as plt
'''
# Defining an environment:
    - 4 DOF robotic arm with arbitrary link lengths
    - Target
    - Obstacles
'''
class MultiArmRoboticEnv:
    def __init__(self, num_links=4, target=(1.0, 1.0, 1.0)):
        self.num_links = num_links
        self.link_length = [0.5, 0.8, 0.8, 0.5]
        self.angles = [0,0,0,0]
        self.target = target
        self.obstacles = [(0.5, 0.5, 0.5), (1.5, 1.5, 1.5)]
        self.steps = 0
        self.reset()

    def reset(self):
        self.angles = [0,0,0,0]
        self.steps = 0
        return self.angles

    def get_end_effector(self, arm_index):
        x = 0
        y = 0
        z = 0
        if arm_index == 1:
            x = 0
            y = 0
            z = self.link_length[0]
        elif arm_index == 2:
            x = np.cos(self.angles[0]) * (self.link_length[1] * np.cos(self.angles[1]))
            y = np.sin(self.angles[0]) * (self.link_length[1] * np.cos(self.angles[1]))
            z = self.link_length[0] + self.link_length[1] * np.sin(self.angles[1])
        elif arm_index == 3:
            x = np.cos(self.angles[0]) * \
            (self.link_length[1] * np.cos(self.angles[1]) + self.link_length[2] * np.cos(self.angles[1] + self.angles[2]))
            y = np.sin(self.angles[0]) * \
            (self.link_length[1] * np.cos(self.angles[1]) + self.link_length[2] * np.cos(self.angles[1] + self.angles[2]))
            z = self.link_length[0] + self.link_length[1] * np.sin(self.angles[1]) + self.link_length[2] * np.sin(self.angles[1] + self.angles[2]) 
        elif arm_index == 4:
            x = np.cos(self.angles[0]) * (self.link_length[1] * np.cos(self.angles[1]) + self.link_length[2] * np.cos(self.angles[1] + self.angles[2]) + self.link_length[3] * np.cos(self.angles[1] + self.angles[2] + self.angles[3]) ) 
            y = np.sin(self.angles[0]) * (self.link_length[1] * np.cos(self.angles[1]) + self.link_length[2] * np.cos(self.angles[1] + self.angles[2]) + self.link_length[3] * np.cos(self.angles[1] + self.angles[2] + self.angles[3])) 
            z = self.link_length[0] + self.link_length[1] * np.sin(self.angles[1]) + self.link_length[2] * np.sin(self.angles[1] + self.angles[2]) + self.link_length[3] * np.sin(self.angles[1] + self.angles[2] + self.angles[3]) 
        return (x, y, z)

    def step(self, actions):
        action_map = {0: -0.1, 1: 0.1}
        rewards = 0
        self.steps += 1

        for j in range(len(self.angles)):
            end_effector = self.get_end_effector(self.num_links-1)
            distance_old = np.linalg.norm(np.array(end_effector) - np.array(self.target))
            self.angles[j] = np.clip(self.angles[j] + action_map[actions[j]], -np.pi, np.pi)
            end_effector = self.get_end_effector(self.num_links-1)
            distance_new = np.linalg.norm(np.array(end_effector) - np.array(self.target))
            
            if distance_old > distance_new:
                rewards += 1
            else:
                rewards -= 1
                
            for obs in self.obstacles:
                if np.linalg.norm(np.array(end_effector) - np.array(obs)) < 0.2:
                    rewards -= 2000           
        
        # rewards -= 1  # Penalize based on the number of steps
        # rewards -= 0.1 * self.steps
        done = False
        if np.linalg.norm(np.array(self.get_end_effector(self.num_links)) - np.array(self.target)) < 0.1:
            rewards += 50000;
            done = True
            
        return self.angles, rewards, done

    def render(self):
        for i in range(self.num_links):
            print(f"Link {i+1} Joint Angle: {self.angles[i]}")
        end_effector = self.get_end_effector(self.num_links)
        print(f"End Effector: {end_effector}")

In [12]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def draw_robot(points, target):

    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(0, 0, 0, color='blue', s=100)
    
    for i in range(len(points)):
        if i==len(points)-1:
            ax.scatter(points[i][0], points[i][1], points[i][2], color='red', s=100)
        else:
            ax.scatter(points[i][0], points[i][1], points[i][2], color='blue', s=100)

    for i in range(len(points)-1):
        ax.plot([points[i][0], points[i+1][0]], [points[i][1], points[i+1][1]], [points[i][2], points[i+1][2]], color='black')
    ax.plot([0, points[0][0]], [0, points[0][1]], [0, points[0][2]], color='black')

    ax.scatter(target[0], target[1], target[2], color='green', s=50)
    
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')

    ax.set_xlim([-2,2])
    ax.set_ylim([-2,2])
    ax.set_zlim([0,3])
    
    plt.show()

    return fig

In [39]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# Define the Q-Network
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Define the DQN Agent
class DQNAgent:
    def __init__(self, env, state_size, action_size, gamma=0.99, lr=0.001, batch_size=64, epsilon=0.2, epsilon_decay=0.995, min_epsilon=0.01, memory_size=10000):
        self.env = env
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.lr = lr
        self.batch_size = batch_size
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon
        self.memory = deque(maxlen=memory_size)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.q_network = QNetwork(state_size, action_size).to(self.device)
        self.target_network = QNetwork(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.update_target_network()

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state):
        if random.uniform(0, 1) < self.epsilon:
            return [random.choice([0, 1]) for _ in range(self.env.num_links)]
        else:
            state = torch.FloatTensor(state).to(self.device)
            with torch.no_grad():
                q_values = self.q_network(state).cpu().numpy()
            return np.argmax(q_values, axis=1).tolist()

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        q_values = self.q_network(states)
        next_q_values = self.target_network(next_states)
        q_target = q_values.clone()

        for i in range(self.batch_size):
            for j in range(self.env.num_links):
                q_target[i][actions[i][j]] = rewards[i] + (1 - dones[i]) * self.gamma * torch.max(next_q_values[i])

        loss = nn.MSELoss()(q_values, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, episodes=1000, target_update=10):
        for episode in range(episodes):
            # print(1)
            state = self.env.reset()
            total_reward = 0
            done = False

            # while not done:
            for t in range (100):
                # print(t)
                action = self.choose_action(state)
                next_state, reward, done = self.env.step(action)
                self.remember(state, action, reward, next_state, done)
                self.replay()
                state = next_state
                total_reward += reward
                if done:
                    break

            self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

            if episode % target_update == 0:
                self.update_target_network()
            print(f"Episode {episode+1}/{episodes}, Total Reward: {total_reward}, Epsilon: {self.epsilon}")
            env.render()
            draw_robot([env.get_end_effector(1),env.get_end_effector(2),env.get_end_effector(3),env.get_end_effector(4)], env.target)

env = MultiArmRoboticEnv()
state_size = env.num_links
action_size = 2
agent = DQNAgent(env, state_size, action_size)
agent.train(episodes=1000)

env.render()


AxisError: axis 1 is out of bounds for array of dimension 1

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# Define the Q-Network
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, num_links):
        super(QNetwork, self).__init__()
        self.num_links = num_links
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size * num_links)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        return self.fc3(x).view(-1, self.num_links, 2)

# Define the DQN Agent
class DQNAgent:
    def __init__(self, env, state_size, action_size, gamma=0.99, lr=0.001, batch_size=64, epsilon=0.3, epsilon_decay=0.995, min_epsilon=0.01, memory_size=10000):
        self.env = env
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.lr = lr
        self.batch_size = batch_size
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon
        self.memory = deque(maxlen=memory_size)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.q_network = QNetwork(state_size, action_size, env.num_links).to(self.device)
        self.target_network = QNetwork(state_size, action_size, env.num_links).to(self.device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        self.update_target_network()

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state):
        if random.uniform(0, 1) < self.epsilon:
            return [random.choice([0, 1]) for _ in range(self.env.num_links)]
        else:
            state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            with torch.no_grad():
                q_values = self.q_network(state).cpu().numpy()
            return [np.argmax(q_values[0][i]) for i in range(self.env.num_links)]

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        q_values = self.q_network(states)
        next_q_values = self.target_network(next_states)
        
        q_target = q_values.clone()

        for i in range(self.batch_size):
            for j in range(self.env.num_links):
                q_target[i][j][actions[i][j]] = rewards[i] + (1 - dones[i]) * self.gamma * torch.max(next_q_values[i][j])

        loss = nn.MSELoss()(q_values, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, episodes=1000, target_update=10):
        for episode in range(episodes):
            state = self.env.reset()
            total_reward = 0
            done = False

            # while not done:
            for t in range (1000):
                # print(t)
                action = self.choose_action(state)
                next_state, reward, done = self.env.step(action)
                self.remember(state, action, reward, next_state, done)
                self.replay()
                state = next_state
                total_reward += reward
                if done:
                    break

            self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

            if episode % target_update == 0:
                self.update_target_network()

            print(f"Episode {episode+1}/{episodes}, Total Reward: {total_reward}, Epsilon: {self.epsilon}")
            env.render()
            draw_robot([env.get_end_effector(1),env.get_end_effector(2),env.get_end_effector(3),env.get_end_effector(4)], env.target)

env = MultiArmRoboticEnv()
state_size = env.num_links
action_size = 2  # Increase or decrease the joint angle
agent = DQNAgent(env, state_size, action_size)
agent.train(episodes=1000)

env.render()
