In [None]:
import gym
import numpy as np
import os

import ray._private.utils

from ray.rllib.models.preprocessors import get_preprocessor
from ray.rllib.evaluation.sample_batch_builder import SampleBatchBuilder
from ray.rllib.offline.json_writer import JsonWriter

if __name__ == "__main__":
    batch_builder = SampleBatchBuilder()  # or MultiAgentSampleBatchBuilder
    writer = JsonWriter(
        os.path.join('.', "demo-out")
    )

    # You normally wouldn't want to manually create sample batches if a
    # simulator is available, but let's do it anyways for example purposes:
    env = gym.make("Pendulum-v1")

    # RLlib uses preprocessors to implement transforms such as one-hot encoding
    # and flattening of tuple and dict observations. For CartPole a no-op
    # preprocessor is used, but this may be relevant for more complex envs.
    prep = get_preprocessor(env.observation_space)(env.observation_space)
    print("The preprocessor is", prep)

    for eps_id in range(100):
        obs = env.reset()
        prev_action = np.zeros_like(env.action_space.sample())
        prev_reward = 0
        done = False
        t = 0
        while not done:
            action = env.action_space.sample()
            new_obs, rew, done, info = env.step(action)
            batch_builder.add_values(
                t=t,
                eps_id=eps_id,
                agent_index=0,
                obs=prep.transform(obs),
                actions=action,
                action_prob=1.0,  # put the true action probability here
                action_logp=0.0,
                rewards=rew,
                prev_actions=prev_action,
                prev_rewards=prev_reward,
                dones=done,
                infos=info,
                new_obs=prep.transform(new_obs),
            )
            obs = new_obs
            prev_action = action
            prev_reward = rew
            t += 1
        writer.write(batch_builder.build_and_reset())

In [None]:
!rllib train \
    --run=PG \
    --env=Pendulum-v1 \
     --framework=torch \
    --config='{"output": "/DATA/pendulum-out", "output_max_file_size": 100}' \
    --stop='{"timesteps_total": 100}'

In [None]:
import argparse
import numpy as np
import pytz
import datetime


from ray.rllib.policy.sample_batch import convert_ma_batch_to_sample_batch
from ray.rllib.algorithms import cql as cql
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.execution.rollout_ops import (
    synchronous_parallel_sample,
)
from ray.rllib.offline.estimators import (
    ImportanceSampling,
    WeightedImportanceSampling,
    DirectMethod,
    DoublyRobust,
)
from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel


torch, _ = try_import_torch()

# parser = argparse.ArgumentParser()
# parser.add_argument(
#     "--as-test",
#     action="store_true",
#     help="Whether this script should be run as a test: --stop-reward must "
#     "be achieved within --stop-timesteps AND --stop-iters.",
# )
# parser.add_argument(
#     "--stop-iters", type=int, default=5, help="Number of iterations to train."
# )
# parser.add_argument(
#     "--stop-reward", type=float, default=50.0, help="Reward at which we stop training."
# )
hcmTz = pytz.timezone("Asia/Ho_Chi_Minh") 
date = datetime.datetime.now(hcmTz).strftime("%d-%m-%Y_%H-%M-%S")
ray_result_logdir = '/DATA/l5kit/ray_results/' + date


stop_iters = 5
stop_reward=50
as_test=True
runs = 'CQL'

In [None]:

import ray
ray.init(num_cpus=64, ignore_reinit_error=True, log_to_driver=False)

In [None]:
config = (
    cql.CQLConfig()
    .framework(framework="torch")
    .rollouts(num_rollout_workers=62)
    .resources(num_gpus = 1)
    .training(
        n_step=3,
        bc_iters=0,
        clip_actions=False,
        tau=0.005,
        target_entropy="auto",
        q_model_config={
            "fcnet_hiddens": [256, 256],
            "fcnet_activation": "relu",
        },
        policy_model_config={
            "fcnet_hiddens": [256, 256],
            "fcnet_activation": "relu",
        },
        optimization_config={
            "actor_learning_rate": 3e-4,
            "critic_learning_rate": 3e-4,
            "entropy_learning_rate": 3e-4,
        },
        train_batch_size=256,
        target_network_update_freq=1,
        num_steps_sampled_before_learning_starts=256,
    )
    .reporting(min_train_timesteps_per_iteration=1000)
    .debugging(log_level="INFO")
    .environment(normalize_actions=True, env="Pendulum-v1")
    # .offline_data(
    #     input_config={
    #         "paths": ["./demo-out/output-2023-02-11_06-03-52_worker-0_0.json"], #tests/data/pendulum/enormous.zip
    #         "format": "json",
    #         #"num_rollout_workers": 63,
    #         #"num_cpus_per_worker": 0.5,
    #     }
    # )
    .offline_data(input_="/DATA/pendulum-out/output-2023-02-11_15-20-26_worker-0_0.json")
    .evaluation(
        evaluation_num_workers=1,
        evaluation_interval=1,
        evaluation_duration=10,
        evaluation_parallel_to_training=False,
        # evaluation_config=cql.CQLConfig.overrides(input_="sampler"),
        evaluation_config={"input":"/DATA/pendulum-out/output-2023-02-11_15-20-26_worker-0_0.json"}, # sampler
        # off_policy_estimation_methods={
        #     "is": {"type": ImportanceSampling},
        #     "wis": {"type": WeightedImportanceSampling},
        #     "dm_fqe": {
        #         "type": DirectMethod,
        #         "q_model_config": {"type": FQETorchModel, "polyak_coef": 0.05},
        #         "epsilon_greedy": 0.0,
        #     },
        #     "dr_fqe": {
        #         "type": DoublyRobust,
        #         "q_model_config": {"type": FQETorchModel, "polyak_coef": 0.05},
        #         "epsilon_greedy": 0.0,
        #     },
        # },
    )
    )
# num_workers: 0
#         num_gpus: 1

In [None]:
from ray.rllib.offline.json_reader import JsonReader
from ray.rllib.offline.estimators import DoublyRobust
from ray.rllib.offline.estimators.fqe_torch_model import FQETorchModel

estimator = DoublyRobust(
    policy=algo.get_policy(),
    gamma=0.99,
    q_model_config={"type": FQETorchModel, "n_iters": 160},
)

# Train estimator's Q-model; only required for DM and DR estimators
reader = JsonReader("/tmp/cartpole-out")
for _ in range(100):
    batch = reader.next()
    print(estimator.train(batch))
    # {'loss': ...}

reader = JsonReader("/tmp/cartpole-eval")
# Compute off-policy estimates
for _ in range(100):
    batch = reader.next()
    print(estimator.estimate(batch))
    # {'v_behavior': ..., 'v_target': ..., 'v_gain': ...,
    # 'v_behavior_std': ..., 'v_target_std': ..., 'v_delta': ...}

In [None]:

# evaluation_parallel_to_training should be False b/c iterations are very long
# and this would cause evaluation to lag one iter behind training.

# Check, whether we can learn from the given file in `num_iterations`
# iterations, up to a reward of `min_reward`.


from ray import air, tune
stop_iters = 1000
stop_reward = -300

# Test for torch framework (tf not implemented yet).
# cql_algorithm = cql.CQL(config=config)
# learnt = False
# for i in range(num_iterations):
#     print(f"Iter {i}")
#     eval_results = cql_algorithm.train().get("evaluation")
#     if eval_results:
#         print("... R={}".format(eval_results["episode_reward_mean"]))
#         # Learn until some reward is reached on an actual live env.
#         if eval_results["episode_reward_mean"] >= min_reward:
#             # Test passed gracefully.
#             if as_test:
#                 print("Test passed after {} iterations.".format(i))
#                 quit(0)
#             learnt = True
#             break

stop = {
    "training_iteration": stop_iters,
    "evaluation/episode_reward_mean": stop_reward,
}

tuner = tune.Tuner(
    'CQL', param_space=config, 
    run_config=air.RunConfig(stop=stop, verbose=1, local_dir=ray_result_logdir,
                                                      checkpoint_config=air.CheckpointConfig(num_to_keep=4, checkpoint_frequency = 10, checkpoint_score_attribute = 'episode_reward_mean')
                                                )
)
tuner.fit()


In [None]:
observation_space =gym.spaces.Box(low=0, high=1, shape=(1,3), dtype=np.float32)

In [None]:
observation_space.low

In [None]:

# Get policy and model.
cql_policy = cql_algorithm.get_policy()
cql_model = cql_policy.model

# If you would like to query CQL's learnt Q-function for arbitrary
# (cont.) actions, do the following:
obs_batch = torch.from_numpy(np.random.random(size=(5, 3)))
action_batch = torch.from_numpy(np.random.random(size=(5, 1)))
q_values = cql_model.get_q_values(obs_batch, action_batch)[0]
# If you are using the "twin_q", there'll be 2 Q-networks and
# we usually consider the min of the 2 outputs, like so:
twin_q_values = cql_model.get_twin_q_values(obs_batch, action_batch)[0]
final_q_values = torch.min(q_values, twin_q_values)[0]
print(f"final_q_values={final_q_values.detach().numpy()}")

# Example on how to do evaluation on the trained Algorithm.
# using the data from our buffer.
# Get a sample (MultiAgentBatch).

batch = synchronous_parallel_sample(worker_set=cql_algorithm.workers)
batch = convert_ma_batch_to_sample_batch(batch)
obs = torch.from_numpy(batch["obs"])
# Pass the observations through our model to get the
# features, which then to pass through the Q-head.
model_out, _ = cql_model({"obs": obs})
# The estimated Q-values from the (historic) actions in the batch.
q_values_old = cql_model.get_q_values(
    model_out, torch.from_numpy(batch["actions"])
)[0]
# The estimated Q-values for the new actions computed by our policy.
actions_new = cql_policy.compute_actions_from_input_dict({"obs": obs})[0]
q_values_new = cql_model.get_q_values(model_out, torch.from_numpy(actions_new))[0]
print(f"Q-val batch={q_values_old.detach().numpy()}")
print(f"Q-val policy={q_values_new.detach().numpy()}")

cql_algorithm.stop()

In [None]:
!python cql.py