In [None]:
# Import Directory Fix (Since this is in a subdirectory)
import sys
import os
current_notebook_dir = os.getcwd()
project_root = os.path.abspath(os.path.join(current_notebook_dir, '..'))

if project_root not in sys.path:
    sys.path.append(project_root)


# Imports
import numpy as np
import gymnasium as gym
from agents.StoveAgent import StoveAgent


In [70]:
gym.register(
    id = "stove-turnoff-v1",
    entry_point="envs.Untitled:StoveEnv",
    max_episode_steps = 300
)

In [71]:
# Creating the Stove Enviroment
class stove_env(gym.Env):
    def __init___(self, arm_base_pos = 0.6):
        # Initialize StoveAgent
        self.agent = StoveAgent()
        self.agent_arm_base_pos = arm_base_pos
        self.agent_hand_sphere_pos = {"x":0.0, "y":0.0, "z":0.5}
        
        # Observation Space
        self.observation_space = gym.spaces.Dict()
        
        # Action Space & Values
        self.action_space = gym.spaces.Discrete(7)

        self.action_value = {
            0: np.array([0.1, 0, 0]),   # Move arm left
            1: np.array([-0.1, 0, 0]),  # Move arm right
            2: np.array([0, 0.1, 0]),  # Move arm up
            3: np.array([0, -0.1, 0]),  # Move arm down
            4: np.array([0, 0, 0.1]),  # Move arm forward
            5: np.array([0, 0, -0.1]),  # Move arm back
            6: "toggle"
        }
        
    def _get_obs(self):
        MAX_KNOBS = 6
        obs = []
        metadata = self.agent.controller.last_event.metadata['objects']

        stoveknob_list = [obj for obj in metadata if 'StoveKnob' in obj['name']]

        event = self.agent.controller.last_event
        hand_pos = event.metadata["arm"]['handSphereCenter']
        obs.extend([hand_pos['x'], hand_pos['y'], hand_pos['z']])

        knob_pos = [obj['position'] for obj in stoveknob_list]

        for i in range(MAX_KNOBS):
            if i < len(knob_pos):
                k = knob_pos[i]
                obs.extend([
                    abs(k['x'] - hand_pos['x']),
                    abs(k['y'] - hand_pos['y']),
                    abs(k['z'] - hand_pos['z'])
                ])
            else:
                obs.extend([0.0, 0.0, 0.0])

        tot_dist = [((abs(k['x']-hand_pos['x']))**2+(abs(k['y']-hand_pos['y']))**2+(abs(k['z']-hand_pos['z']))**2)**0.5 for k in knob_pos]
        obs.append(min(tot_dist) if tot_dist else 0.0)

        stove_tog = [obj['isToggled'] for obj in stoveknob_list]
        amt = sum(not v for v in stove_tog)
        obs.append(amt)
        obs.append(self.arm_base_y_pos)

        return np.array(obs, dtype=np.float32)
        
    def _get_info(self):
        # Debugging Stuff
        return "Hello"
    
    def _get_reward(self):
        return 100
    
    def reset(self):
        super().reset(seed=seed)
        self.agent.env_randomizer()
        self.agent.man_teleport()
        self.agent.turn_on_stove()
        print("CHECKEREEE1111!")
        
        # Robot Internal States Resets
        self.agent_hand_sphere_pos = {"x": 0.0, "y": 0.0, "z": 0.5}
        self.arm_base_y_pos = 0.6

        # Stuck Counter
        self.stuck_counter = 0
        self.prev_hand_pos = self.curr_hand_pos.copy()
        
        obs = self._get_obs()
        info = {}
        return obs, info
        
    def step(self, action):
        movement = self.action_value[action]

        # Movement is now a random action
        if (action < 5):
            self.agent_hand_sphere_pos += movement
            
        terminated = False
        truncated = False

        reward = self._get_reward()
        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, terminated, truncated, info

In [73]:
from gymnasium.utils.env_checker import check_env

env = gym.make("stove-turnoff-v1")

try:
    check_env(env)
    print("Environment passes all checks!")
except Exception as e:
    print(f"Environment has issues: {e}")

ModuleNotFoundError: No module named 'envs.Untitled'