In [None]:
import os
import gym
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import pybullet as p
import pybullet_data
import time
from gym import spaces

# Updated Environment: 4-joint robotic arm reaching a fixed object with improved reward and observation
class FourJointArmEnv(gym.Env):
    def __init__(self, render=False):
        super(FourJointArmEnv, self).__init__()
        self.render_mode = render
        self.max_steps = 800
        self.action_space = spaces.Discrete(8)  # 4 joints * 2 directions
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(7,), dtype=np.float32)

        self.physicsClient = p.connect(p.GUI if render else p.DIRECT)
        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.setGravity(0, 0, -9.81)
        self.arm = None
        self.target_uid = None
        self.step_counter = 0

        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        p.resetSimulation()
        p.setGravity(0, 0, -9.81)
        p.loadURDF("plane.urdf")
        self.arm = p.loadURDF("kuka_iiwa/model.urdf", useFixedBase=True)

        x = random.uniform(0.3, 0.7)
        y = random.uniform(-0.2, 0.2)
        z = 0
        self.target_pos = [x, y, z]
        self.target_uid = p.loadURDF("sphere2.urdf", self.target_pos, globalScaling=0.05)

        self.step_counter = 0
        self.prev_distance = None
        return self._get_state(), {}

    def _get_state(self):
        joint_angles = [p.getJointState(self.arm, i)[0] for i in range(4)]
        return np.array(joint_angles + self.target_pos, dtype=np.float32)

    def _apply_action(self, action):
        joint = action // 2
        direction = 0.05 if action % 2 == 0 else -0.05
        current_angle = p.getJointState(self.arm, joint)[0]
        new_angle = current_angle + direction
        p.setJointMotorControl2(self.arm, joint, p.POSITION_CONTROL, targetPosition=new_angle)

    def _compute_reward(self):
        end_effector_pos = p.getLinkState(self.arm, 6)[0]
        distance = np.linalg.norm(np.array(end_effector_pos) - np.array(self.target_pos))
        reward = -distance

        if self.prev_distance is not None:
            reward += 10 * (self.prev_distance - distance)
        self.prev_distance = distance

        done = distance < 0.05
        if done:
            reward += 10
            print("Touch!!!!!!!!!!!!!")
        return reward, done

    def step(self, action):
        self._apply_action(action)
        for _ in range(10):
            p.stepSimulation()
            if self.render_mode:
                time.sleep(1. / 240.)

        self.step_counter += 1
        obs = self._get_state()
        reward, terminated = self._compute_reward()
        truncated = self.step_counter >= self.max_steps
        return obs, reward, terminated, truncated, {}

    def close(self):
        p.disconnect()

# DQN Network
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )

    def forward(self, x):
        return self.layers(x)

# DQN Agent
class DQNAgent:
    def __init__(self, state_dim, action_dim, lr=1e-3, gamma=0.99,
                 epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.986):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.memory = deque(maxlen=10000)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

        self.model = DQN(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return random.randrange(self.action_dim)
        state = torch.FloatTensor(state).unsqueeze(0)
        q_values = self.model(state)
        return torch.argmax(q_values).item()

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def replay(self, batch_size=64):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)

        states = torch.FloatTensor(states)
        next_states = torch.FloatTensor(next_states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        dones = torch.BoolTensor(dones)

        q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q = self.model(next_states).max(1)[0]
        expected_q = rewards + self.gamma * next_q * (~dones)

        loss = self.loss_fn(q_values, expected_q.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

# Training function
def train_dqn(render=False, episodes=500):
    env = FourJointArmEnv(render=render)
    agent = DQNAgent(env.observation_space.shape[0], env.action_space.n)
    rewards = []

    for ep in range(episodes):
        state, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.act(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            agent.remember(state, action, reward, next_state, done)
            agent.replay()
            state = next_state
            total_reward += reward

        # Decay epsilon once per episode
        if agent.epsilon > agent.epsilon_min:
            agent.epsilon = max(agent.epsilon * agent.epsilon_decay, agent.epsilon_min)

        rewards.append(total_reward)
        print(f"Episode {ep + 1}: Reward = {total_reward:.2f}, Epsilon = {agent.epsilon:.3f}")

    torch.save(agent.model.state_dict(), "dqn_4joint_arm_model.pth")
    env.close()
    return rewards

# Test function
def test_trained_agent(model_path="dqn_4joint_arm_model.pth", render=True):
    env = FourJointArmEnv(render=render)
    model = DQN(env.observation_space.shape[0], env.action_space.n)
    model.load_state_dict(torch.load(model_path))
    model.eval()

    state, _ = env.reset()
    total_reward = 0
    done = False

    while not done:
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = model(state_tensor)
        action = torch.argmax(q_values).item()
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward

    print(f"\n[TEST] Total reward: {total_reward:.2f}")
    env.close()

# Run training
#rewards = train_dqn(render=False, episodes=80)

# Visual test
test_trained_agent(render=True)


In [1]:
!conda env export > environment.yml