-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
150 lines (126 loc) · 4.93 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import gym
import hydra
import torch
from omegaconf import DictConfig
from omniisaacgymenvs.envs.vec_env_rlgames import VecEnvRLGames
from omniisaacgymenvs.utils.config_utils.path_utils import (
retrieve_checkpoint_path,
get_experience,
)
from omniisaacgymenvs.utils.hydra_cfg.hydra_utils import *
from omniisaacgymenvs.utils.hydra_cfg.reformat import omegaconf_to_dict, print_dict
from omniisaacgymenvs.utils.rlgames.rlgames_utils import RLGPUAlgoObserver, RLGPUEnv
from omniisaacgymenvs.utils.task_util import initialize_task
from rl_games.common import env_configurations, vecenv
from rl_games.torch_runner import Runner
class RLGTrainer:
def __init__(self, cfg, cfg_dict):
self.cfg = cfg
self.cfg_dict = cfg_dict
def launch_rlg_hydra(self, env):
# `create_rlgpu_env` is environment construction function which is passed to RL Games and called internally.
# We use the helper function here to specify the environment config.
self.cfg_dict["task"]["test"] = self.cfg.test
# register the rl-games adapter to use inside the runner
vecenv.register(
"RLGPU",
lambda config_name, num_actors, **kwargs: RLGPUEnv(
config_name, num_actors, **kwargs
),
)
env_configurations.register(
"rlgpu", {"vecenv_type": "RLGPU", "env_creator": lambda **kwargs: env}
)
self.rlg_config_dict = omegaconf_to_dict(self.cfg.train)
def run(self, module_path, experiment_dir):
self.rlg_config_dict["params"]["config"]["train_dir"] = os.path.join(
module_path, "runs"
)
# create runner and set the settings
runner = Runner(RLGPUAlgoObserver())
runner.load(self.rlg_config_dict)
runner.reset()
# dump config dict
os.makedirs(experiment_dir, exist_ok=True)
with open(os.path.join(experiment_dir, "config.yaml"), "w") as f:
f.write(OmegaConf.to_yaml(self.cfg))
runner.run(
{
"train": not self.cfg.test,
"play": self.cfg.test,
"checkpoint": self.cfg.checkpoint,
"sigma": None,
}
)
@hydra.main(version_base=None, config_name="config", config_path="./cfg")
def parse_hydra_configs(cfg: DictConfig):
headless = cfg.headless
# local rank (GPU id) in a current multi-gpu mode
local_rank = int(os.getenv("LOCAL_RANK", "0"))
# global rank (GPU id) in multi-gpu multi-node mode
global_rank = int(os.getenv("RANK", "0"))
if cfg.multi_gpu:
cfg.device_id = local_rank
cfg.rl_device = f"cuda:{local_rank}"
enable_viewport = "enable_cameras" in cfg.task.sim and cfg.task.sim.enable_cameras
# select kit app file
experience = get_experience(
headless,
cfg.enable_livestream,
enable_viewport,
cfg.enable_recording,
cfg.kit_app,
)
env = VecEnvRLGames(
headless=headless,
sim_device=cfg.device_id,
enable_livestream=cfg.enable_livestream,
enable_viewport=enable_viewport or cfg.enable_recording,
experience=experience,
)
# parse experiment directory
module_path = os.path.abspath(os.curdir)
experiment_dir = os.path.join(module_path, "runs", cfg.train.params.config.name)
# use gym RecordVideo wrapper for viewport recording
if cfg.enable_recording:
if cfg.recording_dir == "":
videos_dir = os.path.join(experiment_dir, "videos")
else:
videos_dir = cfg.recording_dir
video_interval = lambda step: step % cfg.recording_interval == 0
video_length = cfg.recording_length
env.is_vector_env = True
if env.metadata is None:
env.metadata = {
"render_modes": ["rgb_array"],
"render_fps": cfg.recording_fps,
}
else:
env.metadata["render_modes"] = ["rgb_array"]
env.metadata["render_fps"] = cfg.recording_fps
env = gym.wrappers.RecordVideo(
env,
video_folder=videos_dir,
step_trigger=video_interval,
video_length=video_length,
)
# ensure checkpoints can be specified as relative paths
if cfg.checkpoint:
cfg.checkpoint = retrieve_checkpoint_path(cfg.checkpoint)
if cfg.checkpoint is None:
quit()
cfg_dict = omegaconf_to_dict(cfg)
print_dict(cfg_dict)
from omni.isaac.core.utils.torch.maths import set_seed
cfg.seed = cfg.seed + global_rank if cfg.seed != -1 else cfg.seed
cfg.seed = set_seed(cfg.seed, torch_deterministic=cfg.torch_deterministic)
cfg_dict["seed"] = cfg.seed
initialize_task(cfg_dict, env)
torch.cuda.set_device(local_rank)
rlg_trainer = RLGTrainer(cfg, cfg_dict)
rlg_trainer.launch_rlg_hydra(env)
rlg_trainer.run(module_path, experiment_dir)
env.close()
if __name__ == "__main__":
parse_hydra_configs()