In [None]:
import time
from sim_class import Simulation
# Define the velocities for moving to each corner
velocities = [
    [-0.1, 0.1, 0.0],  # Move along +Y
    [0.1, 0.0, 0.0],  # Move along +X
    [0.0, 0.0, 0.1],  # Move along +Z
    [-0.1, 0.0, 0.0],  # Move along -X
    [0.0, -0.1, 0.0],  # Move along -Y
    [0.1, 0.0, 0.0],  # Move along +X
    [0.0, 0.0, -0.1],  # Move along -Z
    [-0.1, 0.0, 0.0],  # Move along -X
]

# Initialize the simulation
sim = Simulation(num_agents=1, render=True)

for velocity in velocities:
    actions = [[velocity[0], velocity[1], velocity[2], 0]]  # Define actions for the robot
    
    for _ in range(1300):  # Adjust range for movement duration
        state = sim.run(actions, num_steps=1)  # Run the simulation and get the state
        time.sleep(0.01)  # Delay for visualization

    # Extract and print the pipette position
    pipette_position = state.get('robotId_1', {}).get('pipette_position', None)
    if pipette_position:
        print(pipette_position)

[-0.187, 0.2195, 0.1195]
[0.253, 0.2195, 0.1195]
[0.253, 0.2195, 0.2895]
[-0.187, 0.2195, 0.2894]
[-0.1868, -0.1705, 0.2892]
[0.253, -0.1705, 0.2891]
[0.253, -0.1705, 0.1695]
[-0.187, -0.1705, 0.1695]


: 

In [5]:
import pybullet as p
p.disconnect()  # Disconnect any active PyBullet connections


In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from sim_class import Simulation
import numpy as np
import pybullet as p


class OT2Env(gym.Env):
    def __init__(self, render=False, max_steps=1000):
        super(OT2Env, self).__init__()
        self.render = render
        self.max_steps = max_steps

        # Create the simulation environment
        self.sim = Simulation(num_agents=1)

        # Define action and observation space
        # They must be gym.spaces objects
        self.action_space = spaces.Box(low=(-1, -1, -1) , high=(1, 1, 1), shape=(4,), dtype=np.float32)
        self.observation_space = spaces.Box(low='-inf' , high='inf', shape=(6,), dtype=np.float32)

        # keep track of the number of steps
        self.steps = 0

    def reset(self, seed=None):
        # being able to set a seed is required for reproducibility
        if seed is not None:
            np.random.seed(seed)

        # Reset the state of the environment to an initial state
        # set a random goal position for the agent, consisting of x, y, and z coordinates within the working area (you determined these values in the previous datalab task)
        self.goal_position = np.array([
            np.random.uniform(-0.187, 0.253),
            np.random.uniform(-0.1705, 0.2195),
            np.random.uniform(0.1195, 0.2895)
        ], dtype=np.float32)

        # Call the environment reset function
        observation = self.sim.reset(num_agents=1)
        # now we need to process the observation and extract the relevant information, the pipette position, convert it to a numpy array, and append the goal position and make sure the array is of type np.float32
        pipette_position = np.array(observation[f'robotId_{self.sim.robot_Ids[0]}']['pipette_position'], dtype=np.float32)
        observation = np.concatenate([pipette_position, self.goal_position], axis=0)
        # Reset the number of steps
        self.steps = 0

        return observation

    def step(self, action):
        # Execute one time step within the environment
        # since we are only controlling the pipette position, we accept 3 values for the action and need to append 0 for the drop action
        action = np.append(action, 0)

        # Call the environment step function
        observation = self.sim.run([action]) # Why do we need to pass the action as a list? Think about the simulation class.

        # now we need to process the observation and extract the relevant information, the pipette position, convert it to a numpy array, and append the goal position and make sure the array is of type np.float32
        pipette_position = np.array(observation[f'robotId_{self.sim.robot_Ids[0]}']['pipette_position'], dtype=np.float32)
        observation = np.array(observation[f'robotId_{self.sim.robot_Ids[0]}']['pipette_position'], dtype=np.float32)

        # Calculate the reward, this is something that you will need to experiment with to get the best results
        reward = float(-np.linalg.norm(pipette_position - self.goal_position))
        
        # next we need to check if the if the task has been completed and i the episode should be terminated
        # To do this we need to calculate the distance between the pipette position and the goal position and if it is below a certain threshold, we will consider the task complete. 
        # What is a reasonable threshold? Think about the size of the pipette tip and the size of the plants.
        distance = np.linalg.norm(pipette_position - self.goal_position)
        if distance - self.goal_position < 0.05:
            terminated = True
            # we can also give the agent a positive reward for completing the task
        else:
            terminated = False

        # next we need to check if the episode should be truncated, we can check if the current number of steps is greater than the maximum number of steps
        if self.steps > self.max_steps:
            truncated = True
        else:
            truncated = False

        info = {} # we don't need to return any additional information

        # increment the number of steps
        self.steps += 1

        return observation, reward, terminated, truncated, info

    def render(self, mode='human'):
        pass
    
    def close(self):
        self.sim.close()


In [2]:
import pybullet as p
p.disconnect()  # Disconnect any active PyBullet connections


In [1]:
import gymnasium as gym
import numpy as np
from ot2_env_wrapper import OT2Env 

# Load your custom environment
env = OT2Env(render=True, max_steps=1000)

# Number of episodes
num_episodes = 5

for episode in range(num_episodes):
    obs = env.reset()
    terminated, truncated = False, False
    step = 0

    while not (terminated or truncated):  # Correct condition for Gymnasium environments
        # Take a random action from the environment's action space
        action = env.action_space.sample()
        obs, reward, terminated, truncated, info = env.step(action)

        print(f"Episode: {episode + 1}, Step: {step + 1}, Action: {action}, Reward: {reward}")

        step += 1

    print(f"Episode finished after {step} steps. Info: {info}")

Observation: {'robotId_2': {'joint_states': {'joint_0': {'position': 0.0, 'velocity': 0.0, 'reaction_forces': (0.0, 0.0, 0.0, 0.0, 0.0, 0.0), 'motor_torque': 0.0}, 'joint_1': {'position': 0.0, 'velocity': 0.0, 'reaction_forces': (0.0, 0.0, 0.0, 0.0, 0.0, 0.0), 'motor_torque': 0.0}, 'joint_2': {'position': 0.0, 'velocity': 0.0, 'reaction_forces': (0.0, 0.0, 0.0, 0.0, 0.0, 0.0), 'motor_torque': 0.0}}, 'robot_position': [0.0, 0.0, 0.03], 'pipette_position': [0.073, 0.0895, 0.1195]}}
Episode: 1, Step: 1, Action: [ 0.33155861  0.87114066  0.85061955 -0.95268154], Reward: -0.29651448130607605
Episode: 1, Step: 2, Action: [-0.5298414  -0.27832013 -0.6835679  -0.4130298 ], Reward: -0.29651448130607605
Episode: 1, Step: 3, Action: [-0.819646    0.9616249  -0.6895472   0.61566633], Reward: -0.29604026675224304
Episode: 1, Step: 4, Action: [0.34160957 0.5183211  0.20049039 0.49922356], Reward: -0.29551222920417786
Episode: 1, Step: 5, Action: [ 0.8730142   0.18137029  0.67476255 -0.3295513 ], Rew

error: Not connected to physics server.