In [1]:
import base64
import json
import requests

In [2]:
base_url = "http://localhost:8000/"
task_api = base_url + "task/"
agent_api = base_url + "agent/"
cluster_api = base_url + "cluster/"

In [3]:
def do_request(url, params=None):
    response = requests.get(url, params=params)
    return response.json()

def b64encode(data: str, encoding="utf-8") -> str:
    return base64.b64encode(data.encode(encoding)).decode(encoding)

### Submit job

In [4]:
def submit_task(stage: str, base_model_dir: str = None, resume: bool = False) -> str:
    with open(f"../configs/{stage}_example.json", "r") as f:
        config = json.load(f)

    if "stop" in config["task_config"][stage]:
        config["task_config"][stage]["stop"]["training_iteration"] = 100

    if base_model_dir is not None:
        config["task_config"][stage]["base_model"] = {"checkpoint_dir": base_model_dir}
    else:
        config["task_config"][stage].pop("base_model", None)

    url = task_api + ("resume/{}" if resume else "submit/{}")
    encode_data = b64encode(json.dumps(config))
    job_id = do_request(url.format(encode_data))
    return job_id

#### Offline RL

In [5]:
pretrain_job_id = submit_task("pretrain")

#### Online RL

In [6]:
ckpt_dir = "/mnt/ray/ray_results/pretrain_example/DQN_LocalPathPlanning_4965b_00000_0_2024-09-10_03-56-09/checkpoint_000004"
job_id = evolve_job_id = submit_task("evolve", ckpt_dir)

#### Online with different env_config

In [7]:
ckpt_dir = "/mnt/ray/ray_results/pretrain_example/DQN_LocalPathPlanning_4965b_00000_0_2024-09-10_03-56-09/checkpoint_000004"
transfer_job_id = submit_task("transfer", ckpt_dir)

### Inference

In [24]:
ckpt_dir = "/mnt/ray/ray_results/fe6503f9-602d-11ef-9344-c85ea95f6813/DQN_LocalPathPlanning_fc8b1_00000_0_2024-09-14_05-00-50/checkpoint_000004"
infer_job_id = submit_task("inference", ckpt_dir)

### Job status

In [None]:
status = do_request(task_api + f"status/{job_id}")
status

### Abort Job

In [None]:
stopped = do_request(task_api + f"stop/{job_id}")
stopped

### Resume Job

In [None]:
resumed_id = submit_task("evolve", resume=True)
resumed_id

### Delete Job

In [None]:
# 仅通过JobSubmissionClient提交的任务可以删除
# 运行中的任务无法删除
deleted = do_request(task_api + f"delete/{job_id}")
deleted

### Job logs

In [None]:
log = do_request(task_api + f"log/{job_id}")
print(log[-20000:])

### Job Progress

In [None]:
progress = do_request(task_api + f"progress/{job_id}")
progress

### Job Result

In [11]:
with open("../configs/pretrain_example.json", "r") as f:
    config = json.load(f)

In [None]:
result = do_request(task_api + f"result/{config['task_id']}")
# result = json.loads(result)
result

In [None]:
import pandas as pd
df = pd.DataFrame.from_dict(json.loads(result))
metric = "episode_reward_mean"
df.filter([metric, "checkpoint_dir_name"]).groupby("checkpoint_dir_name").mean()

### Visualization (Tensorboard)

In [None]:
tensorboard_url = do_request(task_api + f"visualize/{config['task_id']}")
print(tensorboard_url)

### List all checkpoints

In [None]:
checkpoints = do_request(task_api + f"checkpoint/{config['task_id']}")
checkpoints

### Export model

In [None]:
checkpoint_dir = checkpoints[0]
checkpoint_dir = b64encode(checkpoint_dir)
save_dir = b64encode(f"/mnt/ray/export_models/{config['task_name']}_{config['task_id']}")
onnx_version = 17

url = agent_api + f"export/{checkpoint_dir}?save_dir={save_dir}"
if onnx_version:
    url += f"&onnx={onnx_version}"

saved = do_request(url)
saved

### Job list (Dashboard)

In [None]:
task_list = do_request(task_api + "list")
task_list

### Node info (Dashboard)

In [None]:
node_ip = "172.19.0.3"
node_url = do_request(cluster_api + f"node/{node_ip}")
node_url

### Cluster info

In [None]:
cluster_info = do_request(cluster_api + "info")
cluster_info

### Cluser resources

In [None]:
cluster_resources = do_request(cluster_api + "resources")
cluster_resources