# Ray RLlib 강화학습 실험

이 노트북은 Ray RLlib을 사용한 강화학습 연구의 기본 템플릿입니다.

In [None]:
# 필수 라이브러리 import
import ray
from ray import tune
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.algorithms.algorithm import Algorithm
from ray.rllib.connectors.env_to_module import EnvToModulePipeline
from ray.rllib.connectors.module_to_env import ModuleToEnvPipeline
from ray.rllib.core.columns import Columns
from ray.rllib.env.single_agent_episode import SingleAgentEpisode
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from pathlib import Path
import os
import torch

# 결과 저장 디렉토리 설정
RESULTS_DIR = Path("../results")
RESULTS_DIR.mkdir(exist_ok=True)

print("라이브러리 import 완료")

## 1. Ray 초기화

In [None]:
# Ray 초기화
# 로컬 모드로 실행 (디버깅용)
# ray.init(local_mode=True)

# 일반 모드로 실행 (실험용)
ray.init(ignore_reinit_error=True)

print(f"Ray 클러스터 정보: {ray.cluster_resources()}")

## 2. 환경 설정

**참고**: 실제 환경은 별도 프로젝트로 관리되므로, 여기서는 예시로 Gymnasium 환경을 사용합니다.
실제 환경을 사용할 때는 해당 환경의 import 경로를 수정하세요.

In [None]:
ENV_NAME = "horcrux_env/plane-v0"

# 환경 정보 확인
import gymnasium as gym
import horcrux_env
from horcrux_env.envs import PlaneJoyDirWorld

env = gym.make(ENV_NAME)
print(f"환경: {ENV_NAME}")
print(f"관찰 공간: {env.observation_space}")
print(f"행동 공간: {env.action_space}")
env.close()

In [None]:
ENV_CONFIG = {
    "forward_reward_weight": 175.0,
    "rotation_reward_weight": 100.0,
    "unhealthy_max_steps": 150.0,
    "healthy_reward": 3.0,
    "healthy_roll_range": (-40,40),
    "terminating_roll_range": (-85,85),
    "rotation_norm_cost_weight": 14.5,
    "termination_reward": 0,
    "gait_params": (30, 30, 40, 40, 0, -1),
    "use_friction_chg": True,
    "joy_input_random": True,
    "use_imu_window": True,
    "use_vels_window": True,
    "ctrl_cost_weight": 0.05,
}

RENDER_ENV_CONFIG = ENV_CONFIG.copy()
RENDER_ENV_CONFIG['render_mode'] = 'rgb_array'
RENDER_ENV_CONFIG['render_camera_name'] = 'ceiling'

# env = gym.make(ENV_NAME, **RENDER_ENV_CONFIG)

from ray.tune.registry import register_env
register_env(ENV_NAME, lambda config: PlaneJoyDirWorld( forward_reward_weight=ENV_CONFIG["forward_reward_weight"], 
                                                     rotation_reward_weight=ENV_CONFIG["rotation_reward_weight"], 
                                                     unhealthy_max_steps=ENV_CONFIG["unhealthy_max_steps"],
                                                     healthy_reward=ENV_CONFIG["healthy_reward"], 
                                                     healthy_roll_range=ENV_CONFIG["healthy_roll_range"],
                                                     terminating_roll_range=ENV_CONFIG["terminating_roll_range"],
                                                     rotation_norm_cost_weight=ENV_CONFIG["rotation_norm_cost_weight"],
                                                     termination_reward=ENV_CONFIG["termination_reward"],
                                                     gait_params=ENV_CONFIG["gait_params"],
                                                     use_friction_chg=ENV_CONFIG["use_friction_chg"],
                                                     joy_input_random=ENV_CONFIG["joy_input_random"],
                                                     use_imu_window=ENV_CONFIG["use_imu_window"],
                                                     ctrl_cost_weight=ENV_CONFIG["ctrl_cost_weight"],
                                                   )
            )

## 3. RLlib 알고리즘 설정

In [None]:
# PPO 알고리즘 설정 (다른 알고리즘으로 변경 가능: DQN, A3C, SAC 등)
config = (
    PPOConfig()
    .environment(env=ENV_NAME)
    .training(
        lr=3e-4,
        train_batch_size=65536,
        minibatch_size=16384,
        gamma=0.95,
        num_sgd_iter=60,
    )
    .resources(num_gpus=1)  # GPU 사용 시 1로 변경
    .framework("torch")  # "tf2" 또는 "torch"
    .env_runners(
        num_env_runners=16,
        num_envs_per_env_runner=4,
    )
    .learners(
        num_learners=1,
        num_gpus_per_learner=1,
    )
    .rl_module(
        model_config={
            "fcnet_hiddens": [512, 512, 512, 512, 512, 64],
            "vf_share_layers": False,
        }
    )
)

print("알고리즘 설정 완료")
print(config)

## 4. 알고리즘 인스턴스 생성 및 학습

In [None]:
# 알고리즘 인스턴스 생성
algo = config.build_algo()

print("알고리즘 인스턴스 생성 완료")

In [None]:
# 학습 실행
NUM_ITERATIONS = 100  # 학습 반복 횟수

for i in range(NUM_ITERATIONS):
    result = algo.train()
    
    if (i + 1) % 10 == 0:
        print(f"Iteration {i + 1}")

print("학습 완료")

## 5. 학습 결과 시각화

In [None]:
# 학습 히스토리를 DataFrame으로 변환
df = pd.DataFrame(training_history)

# 결과 저장
df.to_csv(RESULTS_DIR / "training_history.csv", index=False)
print(f"결과 저장 완료: {RESULTS_DIR / 'training_history.csv'}")

# 시각화
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# 에피소드 평균 보상
axes[0].plot(df['iteration'], df['episode_reward_mean'])
axes[0].set_xlabel('Iteration')
axes[0].set_ylabel('Mean Episode Reward')
axes[0].set_title('Training Progress: Mean Reward')
axes[0].grid(True)

# 에피소드 평균 길이
axes[1].plot(df['iteration'], df['episode_len_mean'])
axes[1].set_xlabel('Iteration')
axes[1].set_ylabel('Mean Episode Length')
axes[1].set_title('Training Progress: Mean Episode Length')
axes[1].grid(True)

plt.tight_layout()
plt.savefig(RESULTS_DIR / "training_curves.png", dpi=150)
plt.show()

print(f"시각화 저장 완료: {RESULTS_DIR / 'training_curves.png'}")

In [None]:
from ray.rllib.connectors.env_to_module import EnvToModulePipeline
from ray.rllib.connectors.module_to_env import ModuleToEnvPipeline

# 학습된 정책으로 평가 (새 API 스택 사용)
NUM_EVAL_EPISODES = 10
eval_results = []

# RLModule과 Connector 파이프라인 가져오기
rl_module = algo.get_module()

# EnvToModule과 ModuleToEnv 파이프라인 가져오기
# algo 객체에서 직접 가져올 수 있는 경우
try:
    # env_runner에서 connector 가져오기
    env_runner = algo.env_runner
    env_to_module = env_runner._connectors[0] if hasattr(env_runner, '_connectors') else None
    module_to_env = env_runner._connectors[1] if hasattr(env_runner, '_connectors') and len(env_runner._connectors) > 1 else None
except:
    env_to_module = None
    module_to_env = None

# 파이프라인을 직접 생성해야 하는 경우
if env_to_module is None or module_to_env is None:
    # 간단한 방법: RLModule만 사용하고 직접 처리
    from ray.rllib.core.columns import Columns
    
    env = gym.make(ENV_NAME, render_mode=None)
    device = getattr(rl_module, "device", next(rl_module.parameters()).device)
    
    for episode in range(NUM_EVAL_EPISODES):
        obs, info = env.reset()
        episode_reward = 0
        episode_length = 0
        done = False
        
        # Episode 객체 생성
        episode_obj = SingleAgentEpisode(
            observations=[obs],
            observation_space=env.observation_space,
            action_space=env.action_space,
        )
        
        # Option 1
        while not done:
            # 관찰을 텐서로 변환
            obs_batch = np.expand_dims(obs, 0).astype(np.float32)
            obs_tensor = torch.from_numpy(obs_batch).to(device)
            
            # forward_inference 사용 (평가 시에는 탐색 없음)
            rl_module_out = rl_module.forward_inference({"obs": obs_tensor})
            
            # ModuleToEnv 파이프라인이 없으므로 직접 처리
            # action_dist_inputs에서 액션 추출
            if Columns.ACTION_DIST_INPUTS in rl_module_out:
                action_dist_inputs = rl_module_out[Columns.ACTION_DIST_INPUTS]
            elif "action_dist_inputs" in rl_module_out:
                action_dist_inputs = rl_module_out["action_dist_inputs"]
            else:
                # 다른 키 확인
                action_dist_inputs = list(rl_module_out.values())[0]
            
            # action_dist_inputs 처리 (mean만 사용하거나 분포에서 샘플링)
            if isinstance(action_dist_inputs, torch.Tensor):
                dist_params = action_dist_inputs[0] if len(action_dist_inputs.shape) > 1 else action_dist_inputs
                
                if len(dist_params.shape) == 0 or dist_params.shape[0] == 14:
                    # mean만 있는 경우 (deterministic)
                    action_tensor = dist_params[:14] if len(dist_params) >= 14 else dist_params
                elif dist_params.shape[0] == 28:
                    # mean(14) + log_std(14) 구조
                    mean = dist_params[:14]
                    log_std = dist_params[14:]
                    std = torch.exp(log_std.clamp(-20, 2))
                    from torch.distributions import Normal
                    normal_dist = Normal(mean, std)
                    action_tensor = normal_dist.sample()
                else:
                    action_tensor = dist_params
                
                # 액션 공간에 맞게 클리핑
                action_low = torch.tensor(env.action_space.low, device=device, dtype=action_tensor.dtype)
                action_high = torch.tensor(env.action_space.high, device=device, dtype=action_tensor.dtype)
                
                
                bounded_action = torch.tanh(action_tensor[:14])
                # print(f'over:{(bounded_action[0:14] > 1).sum()}, below:{(bounded_action[0:14] < -1).sum()}, total:{(bounded_action[0:14] > 1).sum() + (bounded_action[0:14] < -1).sum()}')
                
                real_action = action_low + (0.5 * (bounded_action + 1.0) * (action_high - action_low))
                # print(real_action)

                action = real_action.detach().cpu().numpy()
            else:
                action = np.array(action_dist_inputs)
                if len(action.shape) > 1:
                    action = action[0]
                action = np.clip(action, env.action_space.low, env.action_space.high)
            
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            episode_reward += reward
            episode_length += 1
            
            if episode_length > 6000:
                done = True

            # Episode 업데이트
            episode_obj.add_env_step(
                obs,
                action,
                reward,
                terminated=terminated,
                truncated=truncated,
            )
        
        # Option 2
        # while not done:
        #     input_dict = env_to_module(
        #         episodes=[episode_obj],
        #         rl_module=rl_module,
        #         explore=False,
        #         shared_data={},
        #     )

        #     rl_module_out = rl_module.forward_inference(input_dict)

        #     to_env = module_to_env(
        #         batch=rl_module_out,
        #         episodes=[episode_obj],
        #         rl_module=rl_module,
        #         explore=False,
        #         shared_data={},
        #     )

        #     action = to_env.pop(Columns.ACTIONS)[0]
            
        #     obs, reward, terminated, truncated, info = env.step(action)
        #     done = terminated or truncated
        #     episode_reward += reward
        #     episode_length += 1
            
        #     # Episode 업데이트
        #     episode_obj.add_env_step(
        #         obs,
        #         action,
        #         reward,
        #         terminated=terminated,
        #         truncated=truncated,
        #     )
            

        print(episode_obj.get_return())
        eval_results.append({
            'episode': episode + 1,
            'reward': episode_reward,
            'length': episode_length,
        })
        
        if episode_obj.is_done:
            obs, info = env.reset()
            episode_obj = SingleAgentEpisode(
                observations=[obs],
                observation_space=env.observation_space,
                action_space=env.action_space,
            )
    
    env.close()
else:
    # 파이프라인이 있는 경우 예제 방식 사용
    env = gym.make(ENV_NAME, render_mode=None)
    
    obs, _ = env.reset()
    episode_obj = SingleAgentEpisode(
        observations=[obs],
        observation_space=env.observation_space,
        action_space=env.action_space,
    )
    
    num_episodes = 0
    while num_episodes < NUM_EVAL_EPISODES:
        shared_data = {}
        input_dict = env_to_module(
            episodes=[episode_obj],
            rl_module=rl_module,
            explore=False,
            shared_data=shared_data,
        )
        
        rl_module_out = rl_module.forward_inference(input_dict)
        
        to_env = module_to_env(
            batch=rl_module_out,
            episodes=[episode_obj],
            rl_module=rl_module,
            explore=False,
            shared_data=shared_data,
        )
        
        action = to_env.pop(Columns.ACTIONS)[0]
        obs, reward, terminated, truncated, _ = env.step(action)
        
        episode_obj.add_env_step(
            obs,
            action,
            reward,
            terminated=terminated,
            truncated=truncated,
            extra_model_outputs={k: v[0] for k, v in to_env.items()},
        )
        
        if episode_obj.is_done:
            eval_results.append({
                'episode': num_episodes + 1,
                'reward': episode_obj.get_return(),
                'length': len(episode_obj),
            })
            obs, info = env.reset()
            episode_obj = SingleAgentEpisode(
                observations=[obs],
                observation_space=env.observation_space,
                action_space=env.action_space,
            )
            num_episodes += 1
    
    env.close()


eval_df = pd.DataFrame(eval_results)
print("\n평가 결과:")
print(eval_df)
print(f"\n평균 보상: {eval_df['reward'].mean():.2f} ± {eval_df['reward'].std():.2f}")
print(f"평균 에피소드 길이: {eval_df['length'].mean():.2f} ± {eval_df['length'].std():.2f}")

# # 평가 결과 저장
# eval_df.to_csv(RESULTS_DIR / "evaluation_results.csv", index=False)

In [None]:
rl_module

## 7. 모델 저장 및 로드

In [None]:
algo.save(algo.logdir)

In [None]:
# 모델 저장
checkpoint_dir = algo.save()
print(f"모델 저장 완료: {checkpoint_dir}")

# 모델 로드 예시 (주석 처리)
# loaded_algo = Algorithm.from_checkpoint(checkpoint_dir)

## 8. 정리

In [None]:
# 알고리즘 정리
algo.stop()

# Ray 종료 (필요한 경우)
# ray.shutdown()

print("정리 완료")