In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gymnasium as gym
from robot_model import *
# mps_device = torch.device("mps")
# Set up the environment
# env = gym.make("Pendulum-v1", render_mode=None)
# num_states = env.observation_space.shape[0]
# num_actions = env.action_space.shape[0]
# upper_bound = env.action_space.high[0]
# lower_bound = env.action_space.low[0]


In [2]:
# print("Size of State Space ->  {}".format(num_states))
# print("Size of Action Space ->  {}".format(num_actions))

# print("Max Value of Action ->  {}".format(upper_bound))
# print("Min Value of Action ->  {}".format(lower_bound))

In [3]:
from gymnasium import spaces

class CDPR4_env(CDPR4):
    def __init__(self, start_state=np.array([.0, .0, 1.0, .0, .0, .0]), desired_state=np.array([.0, .0, 2.0, .0, .0, .0]), pos=np.array([.0, .0, 1.0]), params=params, approx=1, mass=1):
        super().__init__(pos=np.array([.0, .0, 1.]), params=params, approx=1, mass=1)

        self.start_state = start_state  # start position 1m on Z
        self.cur_state = np.array([.0, .0, 1., .0, .0, .0]) # X=[pos, vel] in control
        self.reward = -1e3 # reward 0 is 0 error in position, closer position to desired -> higher reward 
        self.desired_state = desired_state
    
        self.max_speed = 10
        self.max_force = 45
        
        self.action_space = spaces.Box(
            low=0.01, high=self.max_force, shape=(4,)
        )
        self.observation_space = spaces.Box(low=np.array([-1.154, -1.404, .0, -self.max_speed, -self.max_speed, -self.max_speed]), 
                                            high=np.array([1.154, 1.404, 3.220,  self.max_speed, self.max_speed, self.max_speed]))
        
    def reset(self):
        state = self.start_state
        self.reward = -1e3
        
        return state, {}
    
    def step(self, action):
        pos = self.cur_state[:3].flatten()
        vel = self.cur_state[3:].flatten()
        m = self.m

        dt = self.dt
        u = np.clip(action, 0.01, self.max_force) # shape should be (4,1)
        # print(f'pos {pos.shape}')
        # print(f'self.desired_state[:3] {self.desired_state[:3].shape}')
        costs = np.linalg.norm(pos - self.desired_state[:3].flatten())**2 # reward function includes only position, no velocities
        # print(f'cost {costs}')

        Jt_u = self.jacobian().T@u # torch.from_numpy(self.jacobian().T@u.cpu().detach().numpy())
        new_vel = vel + Jt_u.flatten()*dt # vel + acc*dt, where acc = J.T@u
        new_pos = pos + new_vel*dt

        state = np.hstack((new_pos, new_vel))
        self.cur_state = state
        
        terminated = np.allclose(self.cur_state, self.desired_state, atol=1e-03) # reached desired position
        
        
        return state, -costs, terminated, False, {} #
        
        

In [4]:
env = CDPR4_env()

num_states = env.observation_space.shape[0]
num_actions = env.action_space.shape[0]
upper_bound = env.action_space.high[0]
lower_bound = env.action_space.low[0]

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


In [5]:
# for i in range(10):
#     state = env.step(np.array([20, 20, 20, 20], dtype=np.float32).reshape((4,1)))
#     print(state)

In [6]:
print("Size of State Space ->  {}".format(num_states))
print("Size of Action Space ->  {}".format(num_actions))

print("Max Value of Action ->  {}".format(upper_bound))
print("Min Value of Action ->  {}".format(lower_bound))

Size of State Space ->  6
Size of Action Space ->  4
Max Value of Action ->  45.0
Min Value of Action ->  0.009999999776482582


In [7]:
# Actor Network
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_dim),
            nn.Tanh()
        )
        self.max_action = max_action

    def forward(self, state):
        return self.max_action * self.net(state)


In [8]:
# Critic Network
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.state_net = nn.Sequential(
            nn.Linear(state_dim, 16),
            nn.ReLU(),
            nn.Linear(16, 32),
            nn.ReLU()
        )
        self.action_net = nn.Sequential(
            nn.Linear(action_dim, 32),
            nn.ReLU()
        )
        self.q_net = nn.Sequential(
            nn.Linear(64, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

    def forward(self, state, action):
        state_out = self.state_net(state)
        action_out = self.action_net(action)
        concat = torch.cat([state_out, action_out], dim=1)
        return self.q_net(concat)

In [9]:
# Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity=100000, batch_size=64):
        self.capacity = capacity
        self.batch_size = batch_size
        self.buffer = []
        self.position = 0

    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity

    def sample(self):
        indices = np.random.choice(len(self.buffer), self.batch_size)
        # batch = []
        # for i in indices:
        #     state, action, reward, next_state, done = self.buffer[i]
        #     batch.append((state.float(), action, reward, next_state, done))
        batch = [self.buffer[i] for i in indices]
        # print(batch)

        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return (
            torch.FloatTensor(state),
            torch.FloatTensor(action),
            torch.FloatTensor(reward).unsqueeze(1),
            torch.FloatTensor(next_state),
            torch.FloatTensor(done).unsqueeze(1)
        )

    def __len__(self):
        return len(self.buffer)

In [10]:
class DDPGAgent:
    def __init__(self, state_dim, action_dim, max_action):
        self.actor = Actor(state_dim, action_dim, max_action)
        # self.actor.to(mps_device)
        self.actor_target = Actor(state_dim, action_dim, max_action)
        # self.actor_target.to(mps_device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-3)

        self.critic = Critic(state_dim, action_dim)
        # self.critic.to(mps_device)
        self.critic_target = Critic(state_dim, action_dim)
        # self.critic_target.to(mps_device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)

        self.max_action = max_action
        self.replay_buffer = ReplayBuffer()

    def select_action(self, state):
        state = torch.FloatTensor(state.reshape(1, -1))
        return self.actor(state).cpu().data.numpy().flatten()

    def train(self, gamma=0.99, tau=0.005):
        if len(self.replay_buffer) < self.replay_buffer.batch_size:
            return

        state, action, reward, next_state, done = self.replay_buffer.sample()

        # Compute the target Q value
        target_Q = self.critic_target(next_state, self.actor_target(next_state))
        target_Q = reward + (1 - done) * gamma * target_Q.detach()

        # Get current Q estimate
        current_Q = self.critic(state, action)

        # Compute critic loss
        critic_loss = nn.MSELoss()(current_Q, target_Q)

        # Optimize the critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Compute actor loss
        actor_loss = -self.critic(state, self.actor(state)).mean()

        # Optimize the actor
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Update the frozen target models
        for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

        for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

In [11]:
# Training loop
def train_ddpg(env, agent, num_episodes=100):
    # ou_noise = OUActionNoise(mean=np.zeros(1), std_deviation=float(0.2) * np.ones(1))

    for episode in range(num_episodes):
        state, _ = env.reset() 
        episode_reward = 0

        while True:
            action = agent.select_action(state)
            # action = action + ou_noise()
            action = np.clip(action, lower_bound, upper_bound)

            next_state, reward, done, truncated, _ = env.step(action)
            agent.replay_buffer.push(state, action, reward, next_state, done)

            episode_reward += reward
            state = next_state

            agent.train()

            if done or truncated:
                print(f"Episode {episode}: Reward = {episode_reward}")
                break

In [12]:
# Create and train the agent
state_dim = num_states
action_dim = num_actions
max_action = upper_bound
agent = DDPGAgent(state_dim, action_dim, max_action)
train_ddpg(env, agent)

KeyboardInterrupt: 