## Testing OPE with trained Algo

In [29]:
from ray.rllib.algorithms.algorithm import Algorithm
from ray.rllib.offline import JsonReader
from ray.rllib.offline.estimators import WeightedImportanceSampling

from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Container,
    Dict,
    List,
    Mapping,
    Optional,
    Tuple,
    Type,
    Union,
)
from ray.rllib.utils.typing import (
    TensorType,
)

In [10]:
algo_path = '/Users/jk1/temp/ope_tests/custom_data_out/crr_model'
data_path = '/Users/jk1/temp/ope_tests/custom_data_out/output-2023-12-10_21-01-48_worker-0_0.json'

In [3]:
trained_algo = Algorithm.from_checkpoint(algo_path)

`UnifiedLogger` will be removed in Ray 2.7.
  return UnifiedLogger(config, logdir, loggers=None)
The `JsonLogger interface is deprecated in favor of the `ray.tune.json.JsonLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `CSVLogger interface is deprecated in favor of the `ray.tune.csv.CSVLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
The `TBXLogger interface is deprecated in favor of the `ray.tune.tensorboardx.TBXLoggerCallback` interface and will be removed in Ray 2.7.
  self._loggers.append(cls(self.config, self.logdir, self.trial))
2023-12-11 09:27:19,328	INFO worker.py:1673 -- Started a local Ray instance.
[33m(raylet)[0m [2023-12-11 09:27:29,318 E 9261 301110] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-12-11_09-27-16_893782_9230 is over 95% full, available space: 4492726272; capacity: 499963174912. Object crea

In [5]:
algo_policy = trained_algo.get_policy()
algo_policy

CRRTorchPolicy

In [14]:
estimator = WeightedImportanceSampling(
    policy=algo_policy,
    gamma=0.99
)

TypeError: WeightedImportanceSampling.__init__() missing 1 required positional argument: 'policy'

In [11]:
reader = JsonReader(data_path)

[33m(raylet)[0m [2023-12-11 09:38:43,410 E 9261 301110] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2023-12-11_09-27-16_893782_9230 is over 95% full, available space: 4418088960; capacity: 499963174912. Object creation will fail if spilling is required.


- v_behavior: The discounted return averaged over episodes in the batch
- v_behavior_std: The standard deviation corresponding to v_behavior
- v_target: The estimated discounted return for `self.policy`,
averaged over episodes in the batch
- v_target_std: The standard deviation corresponding to v_target
- v_gain: v_target / max(v_behavior, 1e-8)
- v_delta: The difference between v_target and v_behavior.

In [16]:
# Compute off-policy estimates
for _ in range(2):
    batch = reader.next()
    print(estimator.estimate(batch))

{'v_behavior': 0.0, 'v_behavior_std': 0.0, 'v_target': 0.0, 'v_target_std': 0.0, 'v_gain': 0.0, 'v_delta': 0.0}


## Create a custom policy

#### Custom constant policy
- Always return 7 (not limited)

In [30]:
from ray.rllib import Policy


class CustomPolicy(Policy):
    """Example of a custom policy always returning a single action - 7

    You might find it more convenient to use the `build_tf_policy` and
    `build_torch_policy` helpers instead for a real policy, which are
    described in the next sections.
    """

    def __init__(self, observation_space, action_space, config):
        Policy.__init__(self, observation_space, action_space, config)
        # example parameter
        self.w = 1.0

    def compute_actions(self,
                        obs_batch,
                        state_batches,
                        prev_action_batch=None,
                        prev_reward_batch=None,
                        info_batch=None,
                        episodes=None,
                        **kwargs):
        # return action batch, RNN states, extra values to include in batch
        return [7 for _ in obs_batch], [], {}

    def compute_log_likelihoods(
        self,
        actions: Union[List[TensorType], TensorType],
        obs_batch: Union[List[TensorType], TensorType],
        state_batches: Optional[List[TensorType]] = None,
        prev_action_batch: Optional[Union[List[TensorType], TensorType]] = None,
        prev_reward_batch: Optional[Union[List[TensorType], TensorType]] = None,
        actions_normalized: bool = True,
        in_training: bool = True,
    ) -> TensorType:
        """Computes the log-prob/likelihood for a given action and observation.

        The log-likelihood is calculated using this Policy's action
        distribution class (self.dist_class).
        
        In this example, check if the given action is equal to 7 and return
        a log-prob of 0.0 if so, otherwise -inf.

        Args:
            actions: Batch of actions, for which to retrieve the
                log-probs/likelihoods (given all other inputs: obs,
                states, ..).
            obs_batch: Batch of observations.
            state_batches: List of RNN state input batches, if any.
            prev_action_batch: Batch of previous action values.
            prev_reward_batch: Batch of previous rewards.
            actions_normalized: Is the given `actions` already normalized
                (between -1.0 and 1.0) or not? If not and
                `normalize_actions=True`, we need to normalize the given
                actions first, before calculating log likelihoods.
            in_training: Whether to use the forward_train() or forward_exploration() of
                the underlying RLModule.
        Returns:
            Batch of log probs/likelihoods, with shape: [BATCH_SIZE].
        """
        
        # return log-likelihoods
        return [0.0 if a == 7 else float("-inf") for a in actions]
        
    # def learn_on_batch(self, samples):
    #     # implement your learning code here
    #     return {}  # return stats
    # 
    # def get_weights(self):
    #     return {"w": self.w}
    # 
    # def set_weights(self, weights):
    #     self.w = weights["w"]

In [31]:
# register custom policy
custom_policy = CustomPolicy(observation_space=None, action_space=None, config={})

In [32]:
for _ in range(2):
    batch = reader.next()
    print(custom_policy.compute_actions(batch['obs'], []))

([7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7], [], {})
([7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7], [], {})


In [33]:
# evaluate custom_policy
estimator = WeightedImportanceSampling(
    policy=custom_policy,
    gamma=0.99
)

In [34]:
for _ in range(2):
    batch = reader.next()
    print(estimator.estimate(batch))

{'v_behavior': 0.49483865960020695, 'v_behavior_std': 0.0, 'v_target': 0.49483865960020695, 'v_target_std': 0.0, 'v_gain': 1.0, 'v_delta': 0.0}
{'v_behavior': 0.0, 'v_behavior_std': 0.0, 'v_target': nan, 'v_target_std': nan, 'v_gain': nan, 'v_delta': nan}


  v_target += episode_p[t] / w_t * rewards[t] * self.gamma**t
