In [260]:
import gym
import numpy as np
import torch 
import torch.nn as nn

In [261]:
import gym
from gym import spaces


class Dynamics(nn.Module):
    def __init__(self, state_dim, act_dim, model_size=[256, 256]):
        super(Dynamics, self).__init__()
        self.state_dim = state_dim
        self.act_dim = act_dim
        self.model_size = model_size
        self.fc1 = nn.Linear(state_dim + act_dim, model_size[0])
        self.fc2 = nn.Linear(model_size[0], model_size[1])
        self.fc3 = nn.Linear(model_size[1], state_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    
    def predict(self, obs, act):
        x = torch.cat((obs, act), dim=1)
        return self.forward(x)




class HopperSurrogateEnv(gym.Env):

    def terminate(self, ob):
        unhealthy = False
        healthy_state = ob[:,1:]
        height = ob[:,0]
        angle = ob[:,1]
        
        if np.all((self.healthy_state_range[0] > healthy_state) + (self.healthy_state_range[1] < healthy_state)):
            # print("terminating due to all state limits")
            unhealthy = True

        
        if np.all((self.healthy_z_range[0] > height) + (self.healthy_z_range[1] < height)):
            # print("terminating due to height")
            unhealthy = True 
        
        if np.all((self.healthy_angle_range[0] > angle) + (self.healthy_angle_range[1] < angle)):
            # print("terminating due to angle")
            unhealthy = True  

        return unhealthy

    def __init__(self, render_mode=None, model_path=None):

        self.observation_space = spaces.Box(low= float('-inf'), high= float('inf'), shape=(11,))
        self.action_space = spaces.Box(-1, 1, (3,))


        self.init_qpos = [0, 1.25, 0, 0, 0, 0]
        self.init_qvel = [0, 0,    0, 0, 0, 0]
        self.init_x = 0


        self.forward_reward_weight=1.0
        self.ctrl_cost_weight=1e-3
        self.healthy_reward=1.0
        self.terminate_when_unhealthy=True
        self.healthy_state_range=(-100.0, 100.0)
        self.healthy_z_range=(0.7, float("inf"))
        self.healthy_angle_range=(-0.2, 0.2)
        self.reset_noise_scale=5e-3
        self.exclude_current_positions_from_observation=True
        self.timestep = 1

        self.model = torch.load(model_path)
    
    def reset(self, seed=None, options=None):
        self.qpos = self.init_qpos + self.np_random.uniform(low=-.005, high=.005, size=int(len(self.init_qpos)))
        self.qvel = self.init_qvel + self.np_random.uniform(low=-.005, high=.005, size=int(len(self.init_qvel)))
        self.observation = np.concatenate([self.qpos, self.qvel]).reshape(-1, 12)
        self.init_x = self.observation[:,0]
        self.timestep = 1
        return self.observation[:,1:], {"full_state":self.observation}


    def step(self, action):
        with torch.no_grad():
            action = np.array(action).reshape(-1, 3)
            obs = torch.tensor(self.observation, dtype=torch.float32)
            act = torch.tensor(action, dtype=torch.float32)
            next_obs = self.model.predict(obs, act)
            next_obs = next_obs.detach().numpy()

        final_x = next_obs[:,0]   
        self.observation = next_obs

        healthy_reward = self.healthy_reward
        forward_reward = self.forward_reward_weight * (final_x - self.init_x ) / 0.008 
        self.init_x = final_x
        control_cost = self.ctrl_cost_weight * np.sum(action**2, axis=1)
        reward = healthy_reward + forward_reward - control_cost

        terminated = self.terminate(self.observation[:,1:])
        
        if self.timestep > 1000:
            truncated = True
        else:
            truncated = False
        
        self.timestep += 1
        self.init_x = final_x

        return self.observation[:,1:], reward, terminated, truncated, {"full_state":self.observation}

    def render(self):
        pass

    def close(self):
        pass

In [262]:
env = HopperSurrogateEnv(model_path='/home/prajwal/homework/me592/FinalProject/Deep-Learning-of-Robot-Dynamics/Experiments/Hopper/TrainedModels/vanilla_nn_dynamics_hopper.pth')

In [263]:
env.observation_space.sample()

array([-2.4628992 ,  0.19398974,  1.2791119 ,  0.96002185,  1.0163924 ,
        0.75161767, -0.4426608 , -0.10419376, -1.2021351 ,  0.891419  ,
       -0.8495554 ], dtype=float32)

In [264]:
env.action_space.sample()

array([-0.13817625,  0.19347589, -0.3934131 ], dtype=float32)

In [265]:
env.reset()
done = False
episode_length = 0
while not done:
    action = env.action_space.sample()
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    episode_length += 1
    if done:
        break

print("Episode length:", episode_length)
print("Total rewards:", reward)

Episode length: 47
Total rewards: [-10.479429]
