# Training Custom Simulation using Plato Toolkit

When training your custom simulation using the Ray on AML, your simulation can be wrapped in an environment to allow for easy integration. There are two options to training your custom simulation on AML:

**1. Custom Environment Wrapper**

2. Client API


We will discuss how to use the Custom Environment Wrapper in this notebook. 

*Directions are compatible with Ray Lib 2.3.0 package

# Prerequisites

When creating a Plato Toolkit training job using AML, begin by connecting your notebook to your workspace using your Azure credentials as done below. 

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()


In [None]:
subscription_id = ws.subscription_id
resource_group = ws.resource_group
workspace = ws.name


In [None]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient(
    DefaultAzureCredential(), subscription_id, resource_group, workspace
)

# Set up Compute Cluster

Set up compute cluster for model training. 

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget

# Choose a name for the Ray cluster
compute_name = 'compute-gpu'
compute_min_nodes = 0
compute_max_nodes = 2

# This example uses GPU VM. For using CPU VM, set SKU to STANDARD_D2_V2
vm_size = 'STANDARD_NC6'

if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        if compute_target.provisioning_state == 'Succeeded':
            print('found compute target. just use it', compute_name)
        else: 
            raise Exception(
                'found compute target but it is in state', compute_target.provisioning_state)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=vm_size,
        min_nodes=compute_min_nodes, 
        max_nodes=compute_max_nodes,
        idle_seconds_before_scaledown=1800, # Shuts down idle compute cluster after 30 minutes
        vm_priority='dedicated' # Choose 'lowpriority' for low priority VMs 
    )

    # Create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout. 
    # If no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
    # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

In [None]:
# Check compute cluster target 
ws.get_default_compute_target(type) # Possible values should be 'CPU' or 'GPU'

# Custom Simulation Environment Wrapper

Custom Simulations can be created using the BaseEnv API on the Ray package. A custom simulation is created into a gymnastics environment that is compatible with RLLib training. The BasicEnv API requires three functions within the larger simulation class:

1. init
2. step
3. reset 


In [2]:
import gymnasium as gym
from collections import deque
import numpy as np
from sim.simulator_model import SimulatorModel
from ray.rllib.env.base_env import BaseEnv
from utils.callbacks import MyCallback

from training_setup.rl_lesson_init import rl_lesson_init
from training_setup.rl_sim_spec import RLSimSpec


class Gym_Wrapper(gym.Env, gym.utils.EzPickle):
 """
 Gymastic wrapper for custom environment calls on Ray gym package and pickles the simulation. Simulation
 episodes are calls to json files with episode information. Make sure Simulation document has the following
 defined:
    - step(action: Dict[str, Any])
    - reset((config: Dict[str, Any])
    - get_state() -> Dict[str, Any]
 """   
    
    def __init__(self,
                 config = {},
                 **kwargs):
        gym.utils.EzPickle.__init__(
            self,
            config,
        )

        super().__init__()
        
        # Save the environmental config parsed from tune.run
        # This includes the following feats: {, worker=1/11, vector_idx=0, remote=False} -
        self.config = config

        # Define episode reset config
        self.rl_lesson_config = self.config.get('rl_lesson_config', {})

        # Initialize sim specification
        self.rl_sim_spec = RLSimSpec()

        # Define the simulator model
        self.sim = SimulatorModel()

        # Dimensions of the grid.
        self.XX = kwargs.get('XX',3)

        # Get specs for states and actions from the simulator
        self.state_dim, self.action_dim = self.rl_sim_spec.get_gym_specs()

        # Configure states
        self.observation_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(self.state_dim,), dtype=np.float32)
        assert np.array([0]*self.state_dim, np.float32) in self.observation_space
    

        # Configure Actions
        self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(self.action_dim,), dtype=np.float32)


    def step(self, action):
        ''' apply the supplied action '''

        # Take the action
        sim_action = self.rl_sim_spec.gym_action_to_sim(action)
        state_dict = self.sim.step(sim_action)

        # Convert the state to a Gym state
        state = self.rl_sim_spec.sim_state_to_gym(state_dict)
        # clip the state to the observation space
        state = np.clip(state, self.observation_space.low, self.observation_space.high)

        # Get -1 reward for each step
        # - Except at the terminal state which has zero reward
        # - Set the 'terminated' flag if we've reached thermal runaway
        terminated = self.sim.termination()
        truncated = self.sim.truncation()
        reward, terminated, truncated = self.rl_sim_spec.compute_reward_term_and_trun(state_dict, terminated, truncated)

        info = {}
        # Add states to track in the info dict (for logging)
        # - This is used by the 'monitor' wrapper to record the states
        for state_name in self.rl_sim_spec.get_states_to_log():
            info["state_" + state_name] = state_dict[state_name]

        return state, reward, terminated, truncated, info


    def reset(self, *, seed=None, options=None):
        # Seed and options needed for ray 2.3.0 compatibility
        super().reset(seed=seed)
        
        # Setup values for sim config whenever "rl_lesson" has been defined.
        reset_config = rl_lesson_init(self.rl_lesson_config)
        state_dict = self.sim.reset(reset_config)
        # Convert the state to a Gym state
        state = self.rl_sim_spec.sim_state_to_gym(state_dict)
        state = np.clip(state, self.observation_space.low, self.observation_space.high)

        info = {}
        return state, info


    def render(self, action=0, reward=0 ):
        states=[]
        print(f"(action: {action}): (states: {states}) reward = {reward}")


IndentationError: unexpected indent (571552409.py, line 6)

In [None]:
# Setup rl_lesson config

import yaml
with open("training_setup/rl_lesson.yml", "r") as file:
    rl_lesson_config = yaml.safe_load(file)
    config = {"rl_lesson_config": rl_lesson_config}
    print("config: ", config)


    # use the Ray 'check_env' function to check the environment
    # -- requires pip install opencv-python --
    from ray.rllib.utils.pre_checks.env import check_env as ray_check_env
    ray_check_env(env)

In [None]:
# Create an instance of our custom environment

env = Gym_Wrapper(config)

print(env.action_space)
print(env.observation_space)
print(env.reset())
print(env.step([1]))

In [None]:
# Use the Gymnasium 'check_env' function to check the environment
# - returns nothing if the environment is verified as ok

from gymnasium.utils.env_checker import check_env
check_env(env)

In [None]:
# Use the Ray 'check_env' function to check the environment
# -- requires pip install opencv-python --

from ray.rllib.utils.pre_checks.env import check_env as ray_check_env
ray_check_env(env)

## Create Ray Environment on Azure 

In [None]:
# Run Environment 
import os
from azureml.core import Environment

ray_docker_path = 'Dockerfile'

ray_environment = Environment. \
    from_dockerfile(name = 'CSTR', dockerfile= ray_docker_path). \
    register(workspace=ws)

ray_build = ray_environment.build(workspace=ws)

ray_build.wait_for_completion(show_output = True)

In [None]:
from ray.tune import register_env

# Initiate training algorithm, call on rl environment, and collect Wrapped Simulation
training_algorithm = "PPO"
rl_environment = "CTSR"
script_name = 'custom_rllib_run.py'


In [None]:
from azureml.core import RunConfiguration, ScriptRunConfig
from azureml.core.experiment import Experiment

Experiment(worspace=ws, "CSTR-Experiment")


aml_run_config_ml = RunConfiguration(communicator='OpenMpi')
aml_run_config_ml.target = compute_target
aml_run_config_ml.node_count = 2
aml_run_config_ml.environment = ray_environment

command=[
    'python', script_name,
    '--run', training_algorithm,
    '--env', rl_environment,
    '--config', '\'{"num_gpus": 4, "num_workers": 11}\'',
]

config = ScriptRunConfig(source_directory='./files',
                    command=command,
                    run_config = aml_run_config_ml
                   )
training_run = Experiment.submit(config)

training_run.wait_for_completion()

In [None]:
from azureml.widgets import RunDetails

RunDetails(training_run).show()

# Conclusion 

Now that you have created a custom Simulation using Gymnasium, you can train your model using Ray. Do not forget to shut down your compute instance and archive your training run. 

In [None]:
# To archive the created experiment:
Experiment.archive()

# To delete created compute instance
if not current_compute_instance:
    compute_target.delete()

#