### 1. Imports

In [1]:
from stable_baselines3 import A2C
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_checker import check_env

import gymnasium as gym
from gymnasium import spaces
from stable_baselines3.common.envs import SimpleMultiObsEnv

import os
import random
import numpy as np

2024-04-15 13:39:41.679841: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-04-15 13:39:41.681943: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-15 13:39:41.728313: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


### 2. Building an environment

In [2]:
# function for loading the point cloud as numpy array and adding a batch dimension
# by copying the point cloud batch_size times
def load_point_cloud():
    point_cloud_np = np.load('point_clouds/box_point_cloud.npy').astype(np.float32)
    #print reshape to (3, 4096) and copy it batch_size times
    point_cloud = np.transpose(point_cloud_np, (1, 0))
    # point_cloud = np.expand_dims(point_cloud, axis=0)
    # point_cloud_batch = np.repeat(point_cloud, batch_size, axis=0)
    return point_cloud

def get_point_cloud():
    #print(f'Getting point cloud with shape {point_cloud.shape}')
    return point_cloud


point_cloud = load_point_cloud()

test = get_point_cloud()

print(test.shape)

(3, 4096)


In [3]:
class GraspEnv(gym.Env):
    def __init__(self, pc_shape=(3, 4096)):
        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`}^2, i.e. MultiDiscrete([size, size]).
        # self.observation_space = spaces.Box(0, 1, shape=(3,), dtype=np.float32)
        
        super().__init__()
        
        self.vector_size = 3
        
        if pc_shape == (3, 4096):
            self.pc_shape = pc_shape
        elif pc_shape == (4096, 3):
            self.pc_shape = (3, 4096)
            print("Transposing point cloud shape to (3, 4096)")
        else:
            raise ValueError("Invalid shape for point cloud. Expected shape: {}, Got shape: {}".format((3, 4096), pc_shape))
            
        
        self.observation_space = spaces.Dict(
            spaces={
                "pos": spaces.Box(-1, 1, (self.vector_size,), dtype=np.float32),
                "point_cloud": spaces.Box(-1, 1, self.pc_shape, dtype=np.float32),
            }
        )
        # 1 action including (x, y, z) floats
        self.action_space = spaces.Box(0, 1, shape=(3,), dtype=np.float32)

        self._reward_distance = 200.000
        self.max_steps = 1000
        self.current_step = 0


    # translates the environment’s state into an observation
    def _get_obs(self):
        return {
            "pos": np.array([0.45, 0.48, 0.042], dtype=np.float32),
            "point_cloud": get_point_cloud()
        }

    # provide the manhattan distance between the agent and the target
    def _get_info(self):
        return {
            "distance": self._reward_distance
        }

    # contains most of the logic of your environment. It accepts an action, 
    # computes the state of the environment after applying that action and 
    # returns the 5-tuple (observation, reward, terminated, truncated, info). 
    # See gymnasium.Env.step(). Once the new state of the environment has been 
    # computed, we can check whether it is a terminal state and we set done accordingly.
    def step(self, action):
        terminated, truncated = False, False
        self.current_step += 1


        # episode is done when distance between agent and target is less than 0.1
        observation = self._get_obs()
        reward, terminated = self.reward(action, observation['pos'])
        
        info = self._get_info()

        # The # episode is done when the maximum number of steps is reached
        if self.current_step >= self.max_steps:
            truncated = True
            self.current_step = 0

        # cast reward to float
        reward = float(reward)
        
        # Return step information
        return observation, reward, terminated, truncated, info
    

    # will be called to initiate a new episode. You may assume that the step method will not 
    # be called before reset has been called. Moreover, reset should be called whenever 
    # a done signal has been issued.
    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        if seed is not None:
            super().reset(seed=seed)

        # reset step
        self.current_step = 0
        
        observation = self._get_obs()
        info = self._get_info()

        return observation, info
    
    def reward(self, action, target):
        # Calculate reward
        # reward gets higher if the distance between the starting pose and the target is smaller
        reward = -np.linalg.norm(target - action, ord=1)
        terminated = True if reward > -0.0001 else False
        self._reward_distance = reward
        return reward, terminated

### 3. Test the environment

In [4]:
def get_observation_from_mock_camera():
    # return a 3D numpy array with float values between 0 and 1
    return np.array([0.5, 0.5, 0.5], dtype=np.float32)

In [5]:
env = GraspEnv()

In [6]:
env.observation_space.sample()

OrderedDict([('point_cloud',
              array([[-0.5451536 , -0.6845142 ,  0.40941507, ...,  0.11080492,
                      -0.1039127 , -0.47109625],
                     [-0.49234003, -0.01258129, -0.9657791 , ...,  0.20113094,
                      -0.5018218 ,  0.88164747],
                     [ 0.23991002,  0.59572756,  0.7954208 , ...,  0.99156994,
                       0.9539799 , -0.3241455 ]], dtype=float32)),
             ('pos',
              array([0.2686469 , 0.24840914, 0.01785683], dtype=float32))])

In [7]:
env.reset()
print("observation, reward, terminated, truncated, info")
env.step(np.array([0.45, 0.48, 0.042], dtype=np.float16))

observation, reward, terminated, truncated, info


({'pos': array([0.45 , 0.48 , 0.042], dtype=float32),
  'point_cloud': array([[ 0.05473996,  0.05051422,  0.04627302, ...,  0.04201763,
           0.03775097,  0.05065032],
         [-0.04402218, -0.0440405 , -0.04405328, ...,  0.04440148,
           0.0459224 ,  0.0445084 ],
         [-0.04448913, -0.04448615, -0.04447395, ...,  0.03967373,
          -0.00336128,  0.03963831]], dtype=float32)},
 -7.614865899085999e-05,
 True,
 False,
 {'distance': -7.614866e-05})

In [8]:
check_env(env, warn=True)



In [9]:
from gymnasium.envs.registration import register
# Example for the CartPole environment
register(
    # unique identifier for the env `name-version`
    id="GraspEnv-v1",
    # path to the class for creating the env
    # Note: entry_point also accept a class as input (and not only a string)
    entry_point="gym.envs.classic_control:GraspEnv",
    # Max number of steps per episode, using a `TimeLimitWrapper`
    max_episode_steps=500,
)

### 4. Test Training loop

In [10]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    
    while not done:
        action = env.action_space.sample()
        observation, reward, done, truncated, info = env.step(action)
        if truncated:
            break
    print('Episode:{} Reward:{}'.format(episode, reward))


Episode:1 Reward:-1.3074209690093994
Episode:2 Reward:-1.1922063827514648
Episode:3 Reward:-1.1615262031555176
Episode:4 Reward:-1.0072764158248901
Episode:5 Reward:-0.6937386989593506


### 5. Train the model

In [11]:
from stable_baselines3 import PPO, A2C, DQN, DDPG, SAC
from stable_baselines3.dqn.policies import MlpPolicy
from stable_baselines3.sac.policies import CnnPolicy
from stable_baselines3.common.env_util import make_vec_env

import torch

from networks.point_net import GraspInputExtractor
from networks.test_networks import CustomCNN


Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [12]:
# Instantiate the env
vec_env = make_vec_env(GraspEnv, n_envs=1)

In [13]:
vec_env.observation_space

Dict('point_cloud': Box(-1.0, 1.0, (3, 4096), float32), 'pos': Box(-1.0, 1.0, (3,), float32))

In [15]:
log_path = os.path.join('Training', 'Logs')

# model = DDPG("MlpPolicy", vec_env, gamma=0.0, verbose=1, tensorboard_log=log_path)
#model = DQN(DQNMlpPolicy, vec_env, verbose=2, gamma=0.0, tensorboard_log=log_path)


#policy_kwargs = {"layers": [64, 64], "cnn_extractor": create_augmented_nature_cnn(1)}
#model = PPO("MultiInputPolicy", vec_env, policy_kwargs=policy_kwargs, verbose=2, tensorboard_log=log_path)

policy_kwargs = dict(
    features_extractor_class=GraspInputExtractor,
    features_extractor_kwargs=dict(features_dim=4099),
)
model = PPO("MultiInputPolicy", vec_env, policy_kwargs=policy_kwargs, verbose=1)
model.learn(1000)

Using cpu device


AssertionError: No position vector in observation space

In [None]:
model.learn(total_timesteps=20000)

: 

: 