In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from sim_class import Simulation

# Class for a PID controller with proportional, integral, and derivative gains
class PIDController:
    def __init__(self, Kp, Ki, Kd, setpoint=0):
        """
        Initializes a PID controller.
        :param Kp: Proportional gain.
        :param Ki: Integral gain.
        :param Kd: Derivative gain.
        :param setpoint: Target value for the controller to achieve.
        """
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.setpoint = setpoint
        self.integral = 0
        self.previous_error = 0

    # Updates the control signal based on the current value
    def update(self, current_value):
        """
        Calculates the control signal based on the current value.
        :param current_value: Current state variable value.
        :return: Control signal to adjust the variable towards the setpoint.
        """
        error = self.setpoint - current_value
        self.integral += error
        derivative = error - self.previous_error
        self.previous_error = error
        
        # PID control formula
        return self.Kp * error + self.Ki * self.integral + self.Kd * derivative

# Custom Gym environment for controlling a robotic pipette
class OT2Env(gym.Env):
    def __init__(self, render=False, max_steps=10000):
        """
        Custom Gym environment for controlling a robotic pipette using PID controllers.
        :param render: Boolean flag to enable/disable rendering.
        :param max_steps: Maximum number of steps per episode.
        """
        super(OT2Env, self).__init__()
        self.render_enabled = render
        self.max_steps = max_steps

        # Initialize the simulation environment
        self.sim = Simulation(num_agents=1, render=self.render_enabled)

        # Define action and observation spaces
        self.action_space = spaces.Box(
            low=np.array([-1, -1, -1, 0]),
            high=np.array([1, 1, 1, 1]),
            shape=(4,),
            dtype=np.float32
        )
        self.observation_space = spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(6,),
            dtype=np.float32
        )

        # Initialize PID controllers for X, Y, and Z axes
        self.pid_x = PIDController(Kp=1.0, Ki=0.1, Kd=0.01)
        self.pid_y = PIDController(Kp=1.0, Ki=0.1, Kd=0.01)
        self.pid_z = PIDController(Kp=1.0, Ki=0.1, Kd=0.01)

        # Track the number of steps and goal position
        self.steps = 0
        self.goal_position = None

    # Resets the environment to its initial state
    def reset(self, seed=None):
        """
        Resets the environment to its initial state.
        :param seed: Random seed for reproducibility.
        :return: Initial observation and metadata.
        """
        if seed is not None:
            np.random.seed(seed)

        # Set a random goal position
        self.goal_position = np.array([
            np.random.uniform(-0.18700, 0.25300),
            np.random.uniform(-0.17050, 0.21950),
            np.random.uniform(0.16940, 0.28950)
        ], dtype=np.float32)

        # Update PID controller setpoints
        self.pid_x.setpoint = self.goal_position[0]
        self.pid_y.setpoint = self.goal_position[1]
        self.pid_z.setpoint = self.goal_position[2]

        # Reset the simulation environment
        observation = self.sim.reset(num_agents=1)

        # Combine current pipette position with the goal position
        pipette_position = np.array(
            observation[f'robotId_{self.sim.robotIds[0]}']['pipette_position'],
            dtype=np.float32
        )
        observation = np.concatenate([pipette_position, self.goal_position], axis=0)

        self.steps = 0
        return observation, {}

    # Executes one step in the environment
    def step(self, action):
        """
        Executes one step in the environment.
        :param action: Action to be executed.
        :return: Tuple (observation, reward, terminated, truncated, info).
        """
        # Run the action in the simulation
        observation = self.sim.run([action])
        pipette_position = np.array(
            observation[f'robotId_{self.sim.robotIds[0]}']['pipette_position'],
            dtype=np.float32
        )

        # Compute control signals using PID controllers
        control_signal_x = self.pid_x.update(pipette_position[0])
        control_signal_y = self.pid_y.update(pipette_position[1])
        control_signal_z = self.pid_z.update(pipette_position[2])

        # Construct the control action
        control_action = np.array([
            control_signal_x,
            control_signal_y,
            control_signal_z,
            0.0
        ], dtype=np.float32)

        # Execute control action in the simulation
        self.sim.run([control_action])

        # Combine current position with goal position for the observation
        observation = np.concatenate([pipette_position, self.goal_position], axis=0)

        # Calculate reward as negative distance to the goal
        distance = np.linalg.norm(pipette_position - self.goal_position)
        reward = -distance

        # Check if the goal is reached (10 mm accuracy)
        terminated = distance <= 0.001  # 1 mm threshold
        if terminated:
            reward = 100  # Reward bonus for achieving the goal

        # Check if maximum steps have been reached
        truncated = self.steps >= self.max_steps

        # Print debug information
        distance_mm = distance * 1000  # Convert distance to mm
        print(f"Step {self.steps}:")
        print(f"  Current Position: {pipette_position}")
        print(f"  Goal Position: {self.goal_position}")
        print(f"  Distance to Goal: {distance_mm:.2f} mm")

        self.steps += 1
        return observation, reward, terminated, truncated, {}

    # Renders the environment
    def render(self):
        """
        Renders the environment.
        """
        if self.render_enabled:
            self.sim.render()

    # Closes the environment and releases resources
    def close(self):
        """
        Closes the environment and releases resources.
        """
        self.sim.close()

# Main entry point for testing the environment
if __name__ == "__main__":
    # Example usage of the environment
    env = OT2Env(render=True)
    observation, _ = env.reset()

    print(f"Goal Position: {env.goal_position}")

    for _ in range(10000):
        action = env.action_space.sample()  # Sample a random action
        observation, reward, terminated, truncated, _ = env.step(action)

        if terminated or truncated:
            print("Goal Reached!" if terminated else "Simulation Truncated.")
            break

    env.close()


pybullet build time: Jan 20 2025 10:18:19


Version = 4.1 Metal - 86
Vendor = Apple
Renderer = Apple M1
b3Printf: Selected demo: Physics Server
startThreads creating 1 threads.
starting thread 0
started thread 0 
MotionThreadFunc thread started
Goal Position: [-0.18003549  0.05123494  0.18420415]
Step 0:
  Current Position: [0.0735 0.0889 0.1195]
  Goal Position: [-0.18003549  0.05123494  0.18420415]
  Distance to Goal: 264.36 mm
Step 1:
  Current Position: [0.0739 0.088  0.1211]
  Goal Position: [-0.18003549  0.05123494  0.18420415]
  Distance to Goal: 264.23 mm
Step 2:
  Current Position: [0.0743 0.0871 0.1223]
  Goal Position: [-0.18003549  0.05123494  0.18420415]
  Distance to Goal: 264.21 mm
Step 3:
  Current Position: [0.0747 0.0873 0.1239]
  Goal Position: [-0.18003549  0.05123494  0.18420415]
  Distance to Goal: 264.25 mm
Step 4:
  Current Position: [0.0741 0.0863 0.1249]
  Goal Position: [-0.18003549  0.05123494  0.18420415]
  Distance to Goal: 263.31 mm
Step 5:
  Current Position: [0.0722 0.0864 0.1252]
  Goal Position

: 