Create the trainer

In [96]:
import inspect
import time
from statistics import mean, stdev
from CybORG import CybORG
from CybORG.Agents import B_lineAgent, SleepAgent, GreenAgent
from CybORG.Agents.SimpleAgents.BaseAgent import BaseAgent
from CybORG.Agents.SimpleAgents.BlueReactAgent import BlueReactRemoveAgent
from CybORG.Agents.SimpleAgents.Meander import RedMeanderAgent
from CybORG.Agents.Wrappers.FixedFlatWrapper import FixedFlatWrapper
from CybORG.Agents.Wrappers.OpenAIGymWrapper import OpenAIGymWrapper
from CybORG.Agents.Wrappers import ChallengeWrapper
import os
from ray.rllib.agents.ppo import PPOTrainer
from ray.rllib.agents import ppo
from ray.tune.registry import register_env
from CybORG.Agents.Wrappers.rllib_wrapper import RLlibWrapper
import warnings
import numpy as np
import random
import scipy
from collections import deque
warnings.filterwarnings('ignore')

In [97]:
MAX_EPS = 20
agent_name = 'Blue'

def wrap(env):
    return RLlibWrapper(agent_name="Blue", env=env)


def evaluate(steps):
    path = str(inspect.getfile(CybORG))
    path = path[:-10] + '/Shared/Scenarios/Scenario2.yaml'

    #print(f'using CybORG v{cyborg_version}, {scenario}\n')
    for num_steps in steps:
        for red_agent in [B_lineAgent]:

            cyborg = CybORG(path, 'sim', agents={'Red': red_agent})
            wrapped_cyborg = wrap(cyborg)

            observation = wrapped_cyborg.reset()
            # observation = cyborg.reset().observation

            action_space = wrapped_cyborg.get_action_space(agent_name)
            # action_space = cyborg.get_action_space(agent_name)
            total_reward = []
            actions = []
            for i in range(MAX_EPS):
                r = []
                a = []
                # cyborg.env.env.tracker.render()
                for j in range(num_steps):
                    action, _, _ = trainer.compute_actions(observation)
                    observation, rew, done, info = wrapped_cyborg.step(action)
                    # result = cyborg.step(agent_name, action)
                    r.append(rew)
                    # r.append(result.reward)
                    a.append((str(cyborg.get_last_action('Blue')), str(cyborg.get_last_action('Red'))))
                total_reward.append(sum(r))
                actions.append(a)
                # observation = cyborg.reset().observation
                observation = wrapped_cyborg.reset()
            print(f'Average reward for red agent {red_agent.__name__} and steps {num_steps} is: {mean(total_reward):.1f} with a standard deviation of {stdev(total_reward):.1f}')
    return mean(total_reward)

In [117]:
import gym
import numpy as np
import tree  # pip install dm_tree
from typing import Optional

import ray
import ray.experimental.tf_utils
from ray.rllib.models import ModelCatalog
from ray.rllib.policy.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.utils.annotations import override
from ray.rllib.utils.filter import get_filter
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.spaces.space_utils import get_base_struct_from_space, unbatch
from ray.rllib.execution.replay_ops import Replay, StoreToReplayBuffer
from decimal import Decimal
tf1, tf, tfv = try_import_tf()

def rollout(
    policy: Policy,
    env: gym.Env,
    novelty_archive,
    timestep_limit: Optional[int] = None,
    add_noise: bool = False,
    offset: float = 0.0,
    novelty_type: str = "action"
):
    """Do a rollout.
    If add_noise is True, the rollout will take noisy actions with
    noise drawn from that stream. Otherwise, no action noise will be added.
    Args:
        policy: RLlib Policy from which to draw actions.
        env: Environment from which to draw rewards, done, and
            next state.
        timestep_limit: Steps after which to end the rollout.
            If None, use `env.spec.max_episode_steps` or 999999.
        add_noise: Indicates whether exploratory action noise should be
            added.
        offset: Value to subtract from the reward (e.g. survival bonus
            from humanoid).
    """
    max_timestep_limit = 999999
    env_timestep_limit = (
        env.spec.max_episode_steps
        if (hasattr(env, "spec") and hasattr(env.spec, "max_episode_steps"))
        else max_timestep_limit
    )
    timestep_limit = (
        env_timestep_limit
        if timestep_limit is None
        else min(timestep_limit, env_timestep_limit)
    )
    t = 0
    cur_obs = env.reset()
    novel = []; returns = []
    batch = SampleBatchBuilder() 
    for _ in range(timestep_limit or max_timestep_limit):
        action, dist, _ = policy.compute_actions([cur_obs], add_noise=add_noise, update=True)
        new_obs, r, done, _ = env.step(action[0])
        if novelty_type == 'action':
            action_vector = np.zeros(145)
            action_vector[action] = 1
            novel.append(action_vector); 
        else:
            novel.append(cur_obs); 
        returns.append(r)          
        batch.add_values(
                obs=cur_obs,
                actions=action[0],
                rewards=r,
                dones=done,
                new_obs=new_obs)      
        cur_obs = new_obs
       # print(new_obs)
        if offset != 0.0: r -= np.abs(offset)
        t += 1
        if done:
            sample = batch.build_and_reset()
            returns = np.array(returns)
            sample[Postprocessing.ADVANTAGES] = scipy.signal.lfilter([1], [1, float(-0.9)], returns[::-1], axis=0)[::-1]
            break
        
    
    returns = np.array(returns, dtype=np.float64)
    novel = np.mean(np.array(novel), axis=0)
    return returns, t, novel, sample

def eval_rollout(
    policy: Policy,
    env: gym.Env,
):
    max_timestep_limit = 999999
    returns = []
    cur_obs = env.reset()
    for _ in range(max_timestep_limit):
        action, _, _ = policy.compute_actions([cur_obs], add_noise=False, update=False)
        new_obs, r, done, _ = env.step(action[0])
        returns.append(r)          
        cur_obs = new_obs
        if done:
            returns = np.array(returns)
            break
        
    returns = np.array(returns, dtype=np.float64)
    return returns

def make_session(single_threaded):
    if not single_threaded:
        return tf1.Session()
    return tf1.Session(
        config=tf1.ConfigProto(
            inter_op_parallelism_threads=1, intra_op_parallelism_threads=1
        )
    )


class GATFPolicy(Policy):
    def __init__(self, obs_space, action_space, config):
        super().__init__(obs_space, action_space, config)
        self.action_space_struct = get_base_struct_from_space(action_space)
        self.action_noise_std = self.config["action_noise_std"]
        self.preprocessor = ModelCatalog.get_preprocessor_for_space(obs_space)
        self.observation_filter = get_filter(
            self.config["observation_filter"], self.preprocessor.shape
        )
        self.single_threaded = self.config.get("single_threaded", False)
        self.config["framework"] = "tfe"
        if self.config["framework"] == "tf2":
            self.sess = make_session(single_threaded=self.single_threaded)

            # Set graph-level seed.
            if config.get("seed") is not None:
                with self.sess.as_default():
                    tf1.set_random_seed(config["seed"])

            self.inputs = tf1.placeholder(
                tf.float32, [None] + list(self.preprocessor.shape)
            )
        else:
            if not tf1.executing_eagerly():
                tf1.enable_eager_execution()
            self.sess = self.inputs = None
            if config.get("seed") is not None:
                # Tf2.x.
                if config.get("framework") == "tf2":
                    tf.random.set_seed(config["seed"])
                # Tf-eager.
                elif tf1 and config.get("framework") == "tfe":
                    tf1.set_random_seed(config["seed"])

        # Policy network.
        self.dist_class, dist_dim = ModelCatalog.get_action_dist(
            self.action_space, self.config["model"], dist_type="deterministic"
        )

        self.model = ModelCatalog.get_model_v2(
            obs_space=self.preprocessor.observation_space,
            action_space=action_space,
            num_outputs=dist_dim,
            model_config=self.config["model"],
        )

        self.sampler = None
        if self.sess:
            dist_inputs, _ = self.model({SampleBatch.CUR_OBS: self.inputs})
            dist = self.dist_class(dist_inputs, self.model)
            self.sampler = dist.sample()
            self.variables = ray.experimental.tf_utils.TensorFlowVariables(
                dist_inputs, self.sess
            )
            self.sess.run(tf1.global_variables_initializer())
        else:
            self.variables = ray.experimental.tf_utils.TensorFlowVariables(
                [], None, self.model.variables()
            )

        self.num_params = sum(
            np.prod(variable.shape.as_list())
            for _, variable in self.variables.variables.items()
        )

    @override(Policy)
    def compute_actions(self, observation, add_noise=False, update=True, **kwargs):
        # Squeeze batch dimension (we always calculate actions for only a
        # single obs).
        observation = observation[0]
        observation = self.preprocessor.transform(observation)
        observation = self.observation_filter(observation[None], update=update)
        # `actions` is a list of (component) batches.
        # Eager mode.
        dist_inputs, _ = self.model({SampleBatch.CUR_OBS: observation})
        dist = self.dist_class(dist_inputs, self.model)
        actions = dist.sample()
        actions = tree.map_structure(lambda a: a.numpy(), actions)
        return actions, dist_inputs.numpy()[0], {}

    def compute_single_action(
        self, observation, add_noise=False, update=True, **kwargs
    ):
        action, state_outs, extra_fetches = self.compute_actions(
            [observation], add_noise=add_noise, update=update, **kwargs
        )
        return action[0], state_outs, extra_fetches

    def _add_noise(self, single_action, single_action_space):
        if isinstance(
            single_action_space, gym.spaces.Box
        ) and single_action_space.dtype.name.startswith("float"):
            single_action += (
                np.random.randn(*single_action.shape) * self.action_noise_std
            )
        return single_action

    def get_state(self):
        return {"state": self.get_flat_weights()}

    def set_state(self, state):
        return self.set_flat_weights(state["state"])

    def set_flat_weights(self, x):
        self.variables.set_flat(x)

    def get_flat_weights(self):
        return self.variables.get_flat()

In [118]:

from collections import namedtuple
import logging
import numpy as np
import random
import time
from typing import Optional

import ray
from ray.rllib.agents import Trainer
from CybORG.Agents.ES.RLLibFiles.trainer_config import TrainerConfig
from CybORG.Agents.ES import optimizers, utils
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.agents.dqn import DQNTrainer
from ray.rllib.env.env_context import EnvContext
from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID
from ray.rllib.utils import FilterManager
from ray.rllib.utils.annotations import override
from ray.rllib.utils.deprecation import Deprecated
from ray.rllib.utils.metrics import (
    NUM_AGENT_STEPS_SAMPLED,
    NUM_AGENT_STEPS_TRAINED,
    NUM_ENV_STEPS_SAMPLED,
    NUM_ENV_STEPS_TRAINED,
)
from ray.rllib.utils.torch_utils import set_torch_seed
from ray.rllib.utils.typing import TrainerConfigDict
from ray.rllib.evaluation.sample_batch_builder import SampleBatchBuilder
from ray.rllib.evaluation.postprocessing import Postprocessing

logger = logging.getLogger(__name__)

Result = namedtuple(
    "Result",
    [
        "noise_indices",
        "eval_returns",
        "observations",
        "novelties",
        "experience",
    ],
)


class GAConfig(TrainerConfig):

    def __init__(self):
        super().__init__(trainer_class=GATrainer)

        # ES specific settings:
        self.action_noise_std = 0.0
        self.l2_coeff = 0.005
        self.noise_stdev = 0.02
        self.noise_decay = 0.995
        self.episodes_per_batch = 100
        self.reward_coefficient = 0.5
        self.store_novelty_probs = 0.03
        self.stepsize = 0.01
        self.noise_size = 250000000
        self.num_workers = 30
        self.individuals_per_worker = 1
        self.observation_filter = "MeanStdFilter"
        self.framework = "tf"
        self.evaluation_config["num_envs_per_worker"] = 1
        self.evaluation_config["observation_filter"] = "NoFilter"
        self.experience_sample_rate = 0.1
        self.experience_store_dir = "tmp"
        self.tourney_size = 12
        self.novelty_max_size = 150
        self.novelty_k = 25
        self.novelty_type = 'action'


    @override(TrainerConfig)
    def training(
        self,
        *,
        action_noise_std: Optional[float] = None,
        noise_stdev: Optional[int] = None,
        noise_decay: Optional[float] = None,
        episodes_per_batch: Optional[int] = None,
        reward_coefficient: Optional[float] = None,
        stepsize: Optional[float] = None,
        noise_size: Optional[int] = None,
        tourney_size: Optional[int] = None,
        individuals_per_worker: Optional[int] = None,
        store_novelty_probs: Optional[float] = None,
        experience_sample_rate: Optional[float] = None,
        experience_store_dir: Optional[str] = None,
        novelty_max_size: Optional[int] = None,
        novelty_k: Optional[int] = None,
        novelty_type: Optional[str] = None,
        **kwargs,
    ) -> "GAConfig":
        """Sets the training related configuration.
        Args:
            action_noise_std: Std. deviation to be used when adding (standard normal)
                noise to computed actions. Action noise is only added, if
                `compute_actions` is called with the `add_noise` arg set to True.
            l2_coeff: Coefficient to multiply current weights with inside the globalg
                optimizer update term.
            noise_stdev: Std. deviation of parameter noise.
            episodes_per_batch: Minimum number of episodes to pack into the train batch.
            store_novelty_probs: Probability of 
            stepsize: SGD step-size used for the Adam optimizer.
            noise_size: Number of rows in the noise table (shared across workers).
                Each row contains a gaussian noise value for each model parameter.
        Returns:
            This updated TrainerConfig object.
        """
        # Pass kwargs onto super's `training()` method.
        super().training(**kwargs)

        if action_noise_std is not None:
            self.action_noise_std = action_noise_std
        if noise_stdev is not None:
            self.noise_stdev = noise_stdev
        if noise_decay is not None:
            self.noise_decay = noise_decay
        if episodes_per_batch is not None:
            self.episodes_per_batch = episodes_per_batch
        if reward_coefficient is not None:
            self.reward_coefficient = reward_coefficient
        if individuals_per_worker is not None:
            self.individuals_per_worker = individuals_per_worker
        if stepsize is not None:
            self.stepsize = stepsize
        if noise_size is not None:
            self.noise_size = noise_size
        if store_novelty_probs is not None:
            self.store_novelty_probs = store_novelty_probs
        if experience_sample_rate is not None:
            self.experience_sample_rate = experience_sample_rate
        if experience_store_dir is not None:
            self.experience_store_dir = experience_store_dir
        if tourney_size is not None:
            self.tourney_size = tourney_size
        if novelty_max_size is not None:
            self.novelty_max_size = novelty_max_size
        if novelty_k is not None:
            self.novelty_k = novelty_k
        if novelty_type is not None:
            self.novelty_type = novelty_type
        return self


@ray.remote
def create_shared_noise(count):
    """Create a large array of noise to be shared by all workers."""
    seed = 123
    noise = np.random.RandomState(seed).randn(count).astype(np.float32)
    return noise


class SharedNoiseTable:
    def __init__(self, noise):
        self.noise = noise
        assert self.noise.dtype == np.float32

    def get(self, i, dim):
        return self.noise[i : i + dim]

    def sample_index(self, dim):
        return np.random.randint(0, len(self.noise) - dim + 1)


@ray.remote
class Worker:
    def __init__(
        self,
        config,
        policy_params,
        env_creator,
        noise,
        worker_index,
        min_task_runtime=0.2,
    ):

        # Set Python random, numpy, env, and torch/tf seeds.
        seed = config.get("seed")
        if seed is not None:
            # Python random module.
            random.seed(seed)
            # Numpy.
            np.random.seed(seed)
            # Torch.
            if config.get("framework") == "torch":
                set_torch_seed(seed)

        self.min_task_runtime = min_task_runtime
        self.config = config
        self.config.update(policy_params)
        self.config["single_threaded"] = False
        self.noise = SharedNoiseTable(noise)

        env_context = EnvContext(config["env_config"] or {}, worker_index)
        self.env = env_creator(env_context)
        # Seed the env, if gym.Env.
        if not hasattr(self.env, "seed"):
            logger.info("Env doesn't support env.seed(): {}".format(self.env))
        else:
            self.env.seed(seed)

        from ray.rllib import models

        _policy_class = get_policy_class(config)
        self.policy = _policy_class(
            self.env.observation_space, self.env.action_space, config
        )

    def rollout(self, timestep_limit, novelty_archive, add_noise=True):
        rollout_reward, rollout_fragment_length, obs, batch = rollout(
            self.policy, self.env, novelty_archive, timestep_limit=timestep_limit, add_noise=add_noise, novelty_type=self.config["novelty_type"]
        )
        return rollout_reward, obs, batch
    
    def evaluation_rollout(self):
        rollout_reward = eval_rollout(
            self.policy, self.env)
        return rollout_reward
    
    #optimise this? 
    def calculate_model_weights(self, noise_indexes):
        weights = self.config["noise_stdev"] * self.noise.get(noise_indexes[0], self.policy.num_params)
        for i in range(1, len(noise_indexes)):
            weights += (self.config["noise_stdev"] * self.config["noise_decay"]**i) * self.noise.get(noise_indexes[i], self.policy.num_params)    
        return weights 
    
    def euclidean_distance(self, x, y):
        n, m = len(x), len(y)
        if n > m:
            a = np.linalg.norm(y - x[:m])
            b = np.linalg.norm(y[-1] - x[m:])
        else:
            a = np.linalg.norm(x - y[:n])
            b = np.linalg.norm(x[-1] - y[n:])
        return np.sqrt(a**2 + b**2)

    def compute_novelty_vs_archive(self, archive, novelty_vector, k):
        if len(archive) < k:
            return 0
        distances = []
        nov = novelty_vector.astype(np.float64)
        for point in archive:
            distances.append(self.euclidean_distance(point.astype(np.float64), nov))

        # Pick k nearest neighbors
        distances = np.array(distances)
        top_k = np.sort(distances)[:k]
        return top_k.mean()
    
    def do_evaluate(self, noise_indices):
        weights = self.calculate_model_weights(noise_indices)
        self.policy.set_flat_weights(weights)
        reward = []
        for i in range(100):
            r = self.evaluation_rollout()
            reward.append(np.sum(r))
        return mean(reward)
    
    def do_rollouts(self, noise_indices, novelty_archive, timestep_limit=None):
        # Set the network weights.
        pop = []
        rewards = []
        observations = []
        novelties = []
        batch_builder = SampleBatchBuilder()
        writer = JsonWriterEdit(os.path.join(ray._private.utils.get_user_temp_dir(), self.config["experience_store_dir"])) 
        for ni in noise_indices:
            weights = self.calculate_model_weights(ni)

            # Do a regular run with parameter perturbations.
            noise_index = self.noise.sample_index(self.policy.num_params)

            perturbation = self.config["noise_stdev"] * self.noise.get(
                noise_index, self.policy.num_params
            )

            self.policy.set_flat_weights(weights + perturbation)
            reward = 0; obs = []
            for i in range(self.config["episodes_per_batch"]):
                r, o, batch = self.rollout(timestep_limit, novelty_archive)
                if np.random.rand() <= self.config["experience_sample_rate"]:  
                    writer.write(batch)
                reward += np.sum(r)
                obs.append(o)
            rewards.append(reward/self.config["episodes_per_batch"])
            obvs = np.mean(obs, axis=0)
            observations.append(obvs)
            novelties.append(self.compute_novelty_vs_archive(novelty_archive, obvs, self.config["novelty_k"]))
            ni.append(noise_index)
            pop.append(ni)
            
        return Result(
            noise_indices=pop,
            eval_returns=rewards,
            observations=observations,  
            novelties=novelties,
            experience=batch_builder.build_and_reset(),
        )


def get_policy_class(config):
    return GATFPolicy

class GATrainer(Trainer):
    """Large-scale implementation of Evolution Strategies in Ray."""

    @classmethod
    @override(Trainer)
    def get_default_config(cls) -> TrainerConfigDict:
        return GAConfig().to_dict()

    @override(Trainer)
    def validate_config(self, config: TrainerConfigDict) -> None:
        # Call super's validation method.
        super().validate_config(config)

        if config["num_gpus"] > 1:
            raise ValueError("`num_gpus` > 1 not yet supported for ES!")
        if config["num_workers"] <= 0:
            raise ValueError("`num_workers` must be > 0 for ES!")
        if config["evaluation_config"]["num_envs_per_worker"] != 1:
            raise ValueError(
                "`evaluation_config.num_envs_per_worker` must always be 1 for "
                "ES! To parallelize evaluation, increase "
                "`evaluation_num_workers` to > 1."
            )

    @override(Trainer)
    def setup(self, config):
        # Setup our config: Merge the user-supplied config (which could
        # be a partial config dict with the class' default).
        if isinstance(config, dict):
            self.config = self.merge_trainer_configs(
                self.get_default_config(), config, self._allow_unknown_configs
            )
        else:
            self.config = config.to_dict()

        # Call super's validation method.
        self.validate_config(self.config)

        # Generate `self.env_creator` callable to create an env instance.
        self.env_creator = self._get_env_creator_from_env_id(self._env_id)
        # Generate the local env.
        env_context = EnvContext(self.config["env_config"] or {}, worker_index=0)
        env = self.env_creator(env_context)

        self.callbacks = self.config["callbacks"]()

        self._policy_class = get_policy_class(self.config)
        self.policy = self._policy_class(
            obs_space=env.observation_space,
            action_space=env.action_space,
            config=self.config,
        )

        # Create the shared noise table.
        logger.info("Creating shared noise table.")
        noise_id = create_shared_noise.remote(self.config["noise_size"])
        self.noise = SharedNoiseTable(ray.get(noise_id))

        # Create the actors.
        logger.info("Creating actors.")
        self.workers = [
            Worker.remote(self.config, {}, self.env_creator, noise_id, idx + 1)
            for idx in range(self.config["num_workers"])
        ]
        
        self.population = [[i] for i in range(int(self.config["num_workers"] * self.config["individuals_per_worker"]))]
        self.novelty_archive = deque([], maxlen=self.config["novelty_max_size"])
        self.novelty_history = []
        self.episodes_so_far = 0
        self.reward_list = []
        self.tstart = time.time()
        self.elite = [0]

    @override(Trainer)
    def get_policy(self, policy=DEFAULT_POLICY_ID):
        if policy != DEFAULT_POLICY_ID:
            raise ValueError(
                "ES has no policy '{}'! Use {} "
                "instead.".format(policy, DEFAULT_POLICY_ID)
            )
        return self.policy

    @override(Trainer)
    def step_attempt(self):
        config = self.config

        #theta = self.policy.get_flat_weights()
        #assert theta.dtype == np.float32
        #assert len(theta.shape) == 1

        # Put the current policy weights in the object store.
       # theta_id = ray.put(theta)
        # Use the actors to do rollouts. Note that we pass in the ID of the
        # policy weights as these are shared.
        results = self._collect_results(self.population, self.novelty_archive)
        
        # Update our sample steps counters.

        # Loop over the results.
        self.episodes_so_far += 1
        # Assemble the results.
        returns = []
        novelty = []
        individuals = []
        for i, result in enumerate(results):
            returns.extend(result.eval_returns)
            novelty.extend(result.novelties)
            individuals.extend(result.noise_indices)
            if np.random.rand() <= self.config["store_novelty_probs"]:
                #print(result.observations)
                self.novelty_archive.extend(result.observations)
                self.novelty_history.extend(result.observations)
            
        #Learn GA
        novelty = np.array(novelty); returns = np.array(returns)
        values = []
        for i in range(len(individuals)): 
            n = (novelty[i] / np.max(novelty)) * (1-self.config["reward_coefficient"]) if np.max(novelty) > 0 else 0
            r = ((returns[i]+np.abs(np.min(returns))) / (np.max(returns)+np.abs(np.min(returns)))) * self.config["reward_coefficient"]
            values.append(n + r)
        values = np.array(values)
        
        
        population = [self.elite]
        self.elite = individuals[np.argmax(returns)]
        
        indexs = np.argpartition(returns, -30)[-30:]
        
        rollout_ids = [
            worker.do_evaluate.remote(individuals[indexs[i]]) for i, worker in enumerate(self.workers)
        ]
        evals = []
        for result in ray.get(rollout_ids):
            evals.append(result)
        print('eval:' + str(max(evals)))
        
        indexes = np.arange(len(individuals))
        for i in range(len(individuals)-1):
            np.random.shuffle(indexes)
            winner = np.max(values[indexes[0:self.config["tourney_size"]]])
            index = np.where(values == winner)[0][0]
            population.append(individuals[index])
        
        self.population = population
        

        info = {
            "episodes_so_far": self.episodes_so_far,
        }

        result = dict(
            episode_reward_mean=mean(returns),
            episode_reward_max=max(returns),
            episode_novelty_mean=mean(novelty),
            episode_elite_eval=max(evals),
            #episode_len_mean=eval_lengths.mean(),
            #timesteps_this_iter=noisy_lengths.sum(),
            info=info,
        )

        return result
    
    def calculate_model_weights(self, noise_indexes):
        weights = self.config["noise_stdev"] * self.noise.get(noise_indexes[0], self.policy.num_params)
        for i in range(1, len(noise_indexes)):
            weights += (self.config["noise_stdev"] * self.config["noise_decay"]**i) * self.noise.get(noise_indexes[i], self.policy.num_params)    
        return weights

    @override(Trainer)
    def compute_single_action(self, observation, *args, **kwargs):
        action, _, _ = self.policy.compute_actions([observation], update=False)
        if kwargs.get("full_fetch"):
            return action[0], [], {}
        return action[0]

    @override(Trainer)
    def _sync_weights_to_workers(self, *, worker_set=None, workers=None):
        # Broadcast the new policy weights to all evaluation workers.
        assert worker_set is not None
        logger.info("Synchronizing weights to evaluation workers.")
        weights = ray.put(self.policy.get_flat_weights())
        worker_set.foreach_policy(lambda p, pid: p.set_flat_weights(ray.get(weights)))

    @override(Trainer)
    def cleanup(self):
        # workaround for https://github.com/ray-project/ray/issues/1516
        for w in self.workers:
            w.__ray_terminate__.remote()

    def _collect_results(self, population, novelty_archive):
        num_timesteps = 0
        results = []

        ind = self.config['individuals_per_worker']
        rollout_ids = [
            worker.do_rollouts.remote(population[int(i*ind):int((i+1)*ind)], novelty_archive) for i, worker in enumerate(self.workers)
        ]
        # Get the results of the rollouts.
        for result in ray.get(rollout_ids):
            results.append(result)

        return results

    def __getstate__(self):
        return {
            "weights": self.policy.get_flat_weights(),
            "filter": self.policy.observation_filter,
            "episodes_so_far": self.episodes_so_far,
        }

    def __setstate__(self, state):
        self.episodes_so_far = state["episodes_so_far"]
        self.policy.set_flat_weights(state["weights"])
        self.policy.observation_filter = state["filter"]
        FilterManager.synchronize(
            {DEFAULT_POLICY_ID: self.policy.observation_filter}, self.workers
        )


# Deprecated: Use ray.rllib.algorithms.es.ESConfig instead!
class _deprecated_default_config(dict):
    def __init__(self):
        super().__init__(GAConfig().to_dict())

    @Deprecated(
        old="ray.rllib.algorithms.es.es.DEFAULT_CONFIG",
        new="ray.rllib.algorithms.es.es.ESConfig(...)",
        error=False,
    )
    def __getitem__(self, item):
        return super().__getitem__(item)


DEFAULT_CONFIG = _deprecated_default_config()

In [119]:
from ray.rllib.offline.json_writer import JsonWriter
from ray.rllib.offline.dataset_writer import DatasetWriter
from ray.rllib.offline.io_context import IOContext
from datetime import datetime
import shutil

def env_creator(env_config: dict):
    path = str(inspect.getfile(CybORG))
    path = path[:-10] + '/Shared/Scenarios/Scenario2.yaml'
    agents = {"Red": B_lineAgent, "Green": GreenAgent}
    cyborg = CybORG(scenario_file=path, environment='sim', agents=agents)
    env = RLlibWrapper(env=cyborg, agent_name="Blue", max_steps=100)
    return env

def print_results(results_dict):
    train_iter = results_dict["training_iteration"]
    r_mean = results_dict["episode_reward_mean"]
    r_max = results_dict["episode_reward_max"]
    r_min = results_dict["episode_reward_min"]
    print(f"{train_iter:4d} \tr_mean: {r_mean:.1f} \tr_max: {r_max:.1f} \tr_min: {r_min: .1f}")
    
register_env(name="CybORG", env_creator=env_creator)

In [120]:
import json
import subprocess
from shutil import make_archive
model = {"fcnet_hiddens": [512,512],
         "fcnet_activation": "relu",}

names = ["85_data_a", "9_data_a"]
#names = ["50_store","150_store","250_store","350_store"]
means_all = []; max_all = []; novel_all = []; archives = []; evals_all = []
for i, co in enumerate([.85,.9]):
    if os.path.isdir(os.path.join(ray._private.utils.get_user_temp_dir(), names[i])):
        shutil.rmtree(os.path.join(ray._private.utils.get_user_temp_dir(), names[i]))

    config = GAConfig().training(
                             episodes_per_batch=25, 
                             reward_coefficient=co,
                             model=model, 
                             noise_stdev=0.035,
                             noise_decay=0.997,
                             store_novelty_probs=0.1,
                             individuals_per_worker=4,
                             novelty_max_size=10000, #Go as high as you dare
                             experience_sample_rate=0,
                             tourney_size=12,
                             novelty_k=30,
                             novelty_type='action', #'action' or 'state'
                             experience_store_dir=names[i])\
                             .resources(num_gpus=1).rollouts(num_rollout_workers=30).debugging(log_level='ERROR')
    trainer = config.build(env="CybORG")
    print(co)
    s = "{:3d} reward mean: {:6.2f}, reward max: {:6.2f}, novelty mean: {:6.2f}"
    means = []; maxs = []; nov = []; evall = []
    for j in range(int(200)):
        result = trainer.train()
        means.append(result["episode_reward_mean"])
        maxs.append(result["episode_reward_max"])
        nov.append(result["episode_novelty_mean"])
        evall.append(result["episode_elite_eval"])
        print(s.format(j,result["episode_reward_mean"], result["episode_reward_max"], result["episode_novelty_mean"]))

    #Collected data needs to be cleaned 
    result = subprocess.run(['ls', os.path.join(ray._private.utils.get_user_temp_dir(), names[i])], stdout=subprocess.PIPE)
    removed = 0
    for j, name in enumerate(str(result.stdout)[2:].split('\\n')[0:-1]):
        f = open(os.path.join(ray._private.utils.get_user_temp_dir(), names[i], name))
        try:
            json.load(f)
        except ValueError as err:
            os.remove(os.path.join(ray._private.utils.get_user_temp_dir(), names[i], name)) 
            removed += 1
    print('Removed ' + str(removed) + ' files, of ' + str(j) + 'files')
    make_archive(names[i], 'zip', os.path.join(ray._private.utils.get_user_temp_dir(), names[i]))
    
    evals_all.append(evall); np.save(names[i]+'_evals.npy', np.array(evall))
    means_all.append(means); np.save(names[i]+'_means.npy', np.array(means))
    max_all.append(maxs); np.save(names[i]+'_maxs.npy', np.array(maxs))
    novel_all.append(nov); np.save(names[i]+'_nov.npy', np.array(nov))
    archives.append(np.stack(trainer.novelty_history)); np.save(names[i]+'_archive.npy', np.stack(trainer.novelty_history))

2022-06-25 09:22:23,822	INFO trainable.py:159 -- Trainable.setup took 10.761 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


0.85




eval:-626.935
  0 reward mean: -738.05, reward max: -554.27, novelty mean:   0.00
eval:-534.838
  1 reward mean: -702.08, reward max: -504.71, novelty mean:   0.00
eval:-424.786
  2 reward mean: -658.76, reward max: -381.70, novelty mean:   0.00
eval:-289.38000000000005
  3 reward mean: -539.00, reward max: -297.84, novelty mean:   0.06
eval:-226.35100000000003
  4 reward mean: -403.49, reward max: -215.41, novelty mean:   0.07
eval:-216.58900000000003
  5 reward mean: -307.77, reward max: -205.18, novelty mean:   0.09
eval:-207.637
  6 reward mean: -256.28, reward max: -201.69, novelty mean:   0.13
eval:-197.285
  7 reward mean: -232.79, reward max: -194.84, novelty mean:   0.19
eval:-188.784
  8 reward mean: -225.05, reward max: -183.34, novelty mean:   0.19
eval:-182.543
  9 reward mean: -210.73, reward max: -165.76, novelty mean:   0.21
eval:-178.40300000000002
 10 reward mean: -210.02, reward max: -167.48, novelty mean:   0.22
eval:-175.657
 11 reward mean: -207.38, reward max: -1

eval:-54.879
 92 reward mean: -105.44, reward max: -45.34, novelty mean:   0.17
eval:-49.476
 93 reward mean: -95.13, reward max: -40.46, novelty mean:   0.18
eval:-48.371
 94 reward mean: -90.38, reward max: -42.34, novelty mean:   0.18
eval:-49.74399999999999
 95 reward mean: -87.77, reward max: -42.20, novelty mean:   0.18
eval:-44.416
 96 reward mean: -86.53, reward max: -38.90, novelty mean:   0.18
eval:-45.549
 97 reward mean: -87.20, reward max: -35.38, novelty mean:   0.18
eval:-43.36
 98 reward mean: -78.75, reward max: -38.04, novelty mean:   0.18
eval:-42.676
 99 reward mean: -76.55, reward max: -35.22, novelty mean:   0.18
eval:-43.282
100 reward mean: -73.86, reward max: -38.60, novelty mean:   0.18
eval:-39.431000000000004
101 reward mean: -75.02, reward max: -34.52, novelty mean:   0.17
eval:-43.528999999999996
102 reward mean: -69.49, reward max: -31.46, novelty mean:   0.17
eval:-36.77
103 reward mean: -65.62, reward max: -36.29, novelty mean:   0.17
eval:-41.657000000

KeyboardInterrupt: 

In [122]:
for j in range(154,200):
    result = trainer.train()
    means.append(result["episode_reward_mean"])
    maxs.append(result["episode_reward_max"])
    nov.append(result["episode_novelty_mean"])
    evall.append(result["episode_elite_eval"])
    print(s.format(j,result["episode_reward_mean"], result["episode_reward_max"], result["episode_novelty_mean"]))

#Collected data needs to be cleaned 
result = subprocess.run(['ls', os.path.join(ray._private.utils.get_user_temp_dir(), names[i])], stdout=subprocess.PIPE)
removed = 0
for j, name in enumerate(str(result.stdout)[2:].split('\\n')[0:-1]):
    f = open(os.path.join(ray._private.utils.get_user_temp_dir(), names[i], name))
    try:
        json.load(f)
    except ValueError as err:
        os.remove(os.path.join(ray._private.utils.get_user_temp_dir(), names[i], name)) 
        removed += 1
print('Removed ' + str(removed) + ' files, of ' + str(j) + 'files')
make_archive(names[i], 'zip', os.path.join(ray._private.utils.get_user_temp_dir(), names[i]))

evals_all.append(evall); np.save(names[i]+'_evals.npy', np.array(evall))
means_all.append(means); np.save(names[i]+'_means.npy', np.array(means))
max_all.append(maxs); np.save(names[i]+'_maxs.npy', np.array(maxs))
novel_all.append(nov); np.save(names[i]+'_nov.npy', np.array(nov))
archives.append(np.stack(trainer.novelty_history)); np.save(names[i]+'_archive.npy', np.stack(trainer.novelty_history))

eval:-25.243000000000006
154 reward mean: -63.47, reward max: -23.13, novelty mean:   0.18
eval:-25.742000000000008
155 reward mean: -59.58, reward max: -21.60, novelty mean:   0.17
eval:-26.033000000000005
156 reward mean: -51.83, reward max: -22.67, novelty mean:   0.16
eval:-24.213000000000005
157 reward mean: -51.09, reward max: -21.78, novelty mean:   0.16
eval:-25.050000000000004
158 reward mean: -44.08, reward max: -22.42, novelty mean:   0.15
eval:-23.449000000000005
159 reward mean: -44.50, reward max: -20.79, novelty mean:   0.15
eval:-23.742000000000008
160 reward mean: -43.26, reward max: -21.74, novelty mean:   0.15
eval:-23.117000000000004
161 reward mean: -40.24, reward max: -20.31, novelty mean:   0.14
eval:-24.746000000000006
162 reward mean: -42.07, reward max: -22.05, novelty mean:   0.14
eval:-22.944000000000006
163 reward mean: -41.56, reward max: -21.14, novelty mean:   0.15
eval:-23.78300000000001
164 reward mean: -40.56, reward max: -22.82, novelty mean:   0.16


In [None]:
import json
import subprocess
from shutil import make_archive
model = {"fcnet_hiddens": [512,512],
         "fcnet_activation": "relu",}

names = ["8_data_s"]
#names = ["50_store","150_store","250_store","350_store"]
means_all = []; max_all = []; novel_all = []; archives = []; evals_all = []
for i, co in enumerate([.8]):
    if os.path.isdir(os.path.join(ray._private.utils.get_user_temp_dir(), names[i])):
        shutil.rmtree(os.path.join(ray._private.utils.get_user_temp_dir(), names[i]))

    config = GAConfig().training(
                             episodes_per_batch=25, 
                             reward_coefficient=co,
                             model=model, 
                             noise_stdev=0.035,
                             noise_decay=0.997,
                             store_novelty_probs=0.1,
                             individuals_per_worker=4,
                             novelty_max_size=10000, #Go as high as you dare
                             experience_sample_rate=0,
                             tourney_size=12,
                             novelty_k=30,
                             novelty_type='state', #'action' or 'state'
                             experience_store_dir=names[i])\
                             .resources(num_gpus=1).rollouts(num_rollout_workers=30).debugging(log_level='ERROR')
    trainer = config.build(env="CybORG")
    print(co)
    s = "{:3d} reward mean: {:6.2f}, reward max: {:6.2f}, novelty mean: {:6.2f}"
    means = []; maxs = []; nov = []; evall = []
    for j in range(int(200)):
        result = trainer.train()
        means.append(result["episode_reward_mean"])
        maxs.append(result["episode_reward_max"])
        nov.append(result["episode_novelty_mean"])
        evall.append(result["episode_elite_eval"])
        print(s.format(j,result["episode_reward_mean"], result["episode_reward_max"], result["episode_novelty_mean"]))

    #Collected data needs to be cleaned 
    result = subprocess.run(['ls', os.path.join(ray._private.utils.get_user_temp_dir(), names[i])], stdout=subprocess.PIPE)
    removed = 0
    for j, name in enumerate(str(result.stdout)[2:].split('\\n')[0:-1]):
        f = open(os.path.join(ray._private.utils.get_user_temp_dir(), names[i], name))
        try:
            json.load(f)
        except ValueError as err:
            os.remove(os.path.join(ray._private.utils.get_user_temp_dir(), names[i], name)) 
            removed += 1
    print('Removed ' + str(removed) + ' files, of ' + str(j) + 'files')
    make_archive(names[i], 'zip', os.path.join(ray._private.utils.get_user_temp_dir(), names[i]))
    
    evals_all.append(evall); np.save(names[i]+'_evals.npy', np.array(evall))
    means_all.append(means); np.save(names[i]+'_means.npy', np.array(means))
    max_all.append(maxs); np.save(names[i]+'_maxs.npy', np.array(maxs))
    novel_all.append(nov); np.save(names[i]+'_nov.npy', np.array(nov))
    archives.append(np.stack(trainer.novelty_history)); np.save(names[i]+'_archive.npy', np.stack(trainer.novelty_history))

2022-06-26 03:46:25,633	INFO trainable.py:159 -- Trainable.setup took 11.095 seconds. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.


0.8




eval:-604.106
  0 reward mean: -743.70, reward max: -567.11, novelty mean:   0.00
eval:-557.18
  1 reward mean: -707.55, reward max: -485.17, novelty mean:   0.00
eval:-433.129
  2 reward mean: -655.85, reward max: -414.99, novelty mean:   0.35
eval:-310.934
  3 reward mean: -523.53, reward max: -287.51, novelty mean:   0.35
eval:-234.82500000000005
  4 reward mean: -398.51, reward max: -228.35, novelty mean:   0.36
eval:-210.80100000000002
  5 reward mean: -290.99, reward max: -208.74, novelty mean:   0.38
eval:-199.086
  6 reward mean: -248.45, reward max: -193.57, novelty mean:   0.40
eval:-186.377
  7 reward mean: -237.41, reward max: -175.81, novelty mean:   0.39
eval:-178.978
  8 reward mean: -225.53, reward max: -172.55, novelty mean:   0.44
eval:-180.81900000000002
  9 reward mean: -217.36, reward max: -179.08, novelty mean:   0.41
eval:-175.125
 10 reward mean: -199.87, reward max: -164.30, novelty mean:   0.40
eval:-169.766
 11 reward mean: -199.10, reward max: -166.09, novel

In [None]:
import matplotlib.pyplot as plt
import umap

archives = []
archives.append(np.load('95_data_a_archive.npy'))
archives.append(np.load('9_data_a_archive.npy'))
archives.append(np.load('85_data_a_archive.npy'))
archives.append(np.load('ppo_novel_actions.npy'))

names = ['Reward CoEf: 0.95', 'Reward CoEf: 0.9', 'Reward CoEf: 0.85', 'PPO']

fig = plt.figure(figsize=(12,12))
runner = 0
#reducer = umap.UMAP(n_components=2, n_neighbors=15, min_dist=0.1)
#umap_embedding = reducer.fit_transform(np.concatenate(archives))
ax1 = fig.add_subplot(111)
for i, a in enumerate(archives[:4]):
    ax1.scatter(umap_embedding[runner:runner+len(a),0],umap_embedding[runner:runner+len(a),1], label=names[i], alpha=0.1)
    runner += len(a)
plt.legend(loc='upper right')
plt.title('uMAP Plot of Action Based Novelty Experiment')
plt.show()

2022-06-25 09:08:13,362	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_evaluate()[39m (pid=24499, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f9f6b78e2e0>)
  File "<ipython-input-103-b83cfbe6498e>", line 256, in do_evaluate
TypeError: evaluation_rollout() takes 1 positional argument but 2 were given
2022-06-25 09:08:13,363	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_evaluate()[39m (pid=24495, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f71b2f9e2e0>)
  File "<ipython-input-103-b83cfbe6498e>", line 256, in do_evaluate
TypeError: evaluation_rollout() takes 1 positional argument but 2 were given
2022-06-25 09:08:13,364	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_evaluate()[39m (pid=24514, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f83d16c72e0>)
  File "<ipython-input-103-b83cfbe6

2022-06-25 09:08:13,383	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_evaluate()[39m (pid=24520, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f73e6d472e0>)
  File "<ipython-input-103-b83cfbe6498e>", line 256, in do_evaluate
TypeError: evaluation_rollout() takes 1 positional argument but 2 were given
2022-06-25 09:08:13,386	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_evaluate()[39m (pid=24504, ip=192.168.16.2, repr=<__main__.Worker object at 0x7ff109615220>)
  File "<ipython-input-103-b83cfbe6498e>", line 256, in do_evaluate
TypeError: evaluation_rollout() takes 1 positional argument but 2 were given
2022-06-25 09:08:13,387	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_evaluate()[39m (pid=24511, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f9da34fb2e0>)
  File "<ipython-input-103-b83cfbe6

In [None]:
with open('fml.txt') as f:
    lines = f.readlines()
reward = []
for l in lines:
    if "eps 100 is: " in l: 
        reward.append(float((l[59:64])))

In [None]:
fig = plt.figure(figsize=(12,12))
plt.plot(np.load('9_data_s_maxs.npy'), label="Reward CoEf: 0.9")
plt.plot(np.load('8_data_s_maxs.npy'), label="Reward CoEf: 0.8")
plt.plot(np.load('7_data_s_maxs.npy'), label="Reward CoEf: 0.7")
plt.plot(reward[0:200], label="PPO")
plt.title('Reward Accumilation Using Action Based Novelty')
plt.xlabel('Iteration')
plt.ylabel('Reward')
plt.legend(loc='lower right')
plt.xlim([0,200])
plt.ylim([-60,-15])
plt.show()

In [None]:
len(archives)

In [None]:
import matplotlib.pyplot as plt
import umap
from matplotlib import animation, rc

rewards = np.load('ppo_reward.npy')
novel = np.load('ppo_novel.npy')

whole = [archives[1]]
whole.append(novel)

#reducer = umap.UMAP(n_components=2, n_neighbors=15, min_dist=0.1)
#umap_embedding = reducer.fit_transform(np.concatenate(whole))
animation_frames = 150
runner = 0
samples = 0
start = 0
for i, a in enumerate(archives):
    if i == 0:
        start = runner
        samples = len(a)
    runner += len(a)
filenames = []
for i in range(150):
    fig = plt.figure(figsize=(10, 8), dpi=100)
    i_line = int(i*(150/animation_frames))
    i_scatter = int(i*(samples/animation_frames))
    ax = plt.subplot(1, 2, 1)
    ax.set_xlim([-4,15])
    ax.set_ylim([-4,11])
    ax.set_title('uMAP of State Embedding')
    ax1 = plt.subplot(1, 2, 2)
    ax1.set_xlim([0,150])
    ax1.set_ylim([-100,-20])
    ax1.set_title('Reward Accumilation')
    ax1.set_xlabel('Iteration')
    ax1.set_ylabel('Reward')
    ax1.plot(max_all[1][:i_line+1], label="GA")
    ax1.plot(rewards[:i+1], label="PPO")
    ax1.legend(loc='lower right')
    ax.scatter(umap_embedding[0:i_scatter+1,0], umap_embedding[0:i_scatter+1,1])
    ax.scatter(umap_embedding[samples:samples+i+1,0], umap_embedding[samples:samples+i+1,1])
    plt.savefig('gagif/'+str(i)+'.png')
    filenames.append('gagif/'+str(i)+'.png')

import imageio
images = []
for filename in filenames:
    images.append(imageio.imread(filename))
imageio.mimsave('movie.gif', images, duration=0.1)

In [None]:
imageio.mimsave('movie.gif', images, duration=0.1)

In [None]:
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(16,12))
ax1 = fig.add_subplot(111)
#ax1.plot(max_all[0] ,label="Reward CoEf = 0.6, Max")
#ax1.plot(max_all[0] ,label="Reward CoEf = 0.7, Max")
ax1.plot(max_all[0] ,label="Reward CoEf = 0.7")
ax1.plot(max_all[1],label="Reward CoEf = 0.8")
ax1.plot(max_all[2], label="Reward CoEf = 0.9")
#ax1.plot(means_all[0], label="Reward CoEf = 0.7, Mean", color='b')
#ax1.plot(means_all[1], label="Reward CoEf = 0.8, Mean", color='g')
#ax1.plot(means_all[2], label="Reward CoEf = 0.9, Mean", color='r')
#ax1.plot(means_all[3], label="Reward CoEf = 1, Mean", color='c')
#ax1.plot(means_all[5], label="Reward CoEf = 1, Mean", color='m')
plt.legend(loc='lower right')
plt.title('Novelty Experiment (B_lineAgent)')
plt.xlabel('Batch')
plt.ylabel('Max Reward')
plt.show()

In [None]:
from datetime import datetime
import json
import logging
import numpy as np
import os
from six.moves.urllib.parse import urlparse
import time

try:
    from smart_open import smart_open
except ImportError:
    smart_open = None

from ray.rllib.policy.sample_batch import MultiAgentBatch
from ray.rllib.offline.io_context import IOContext
from ray.rllib.offline.output_writer import OutputWriter
from ray.rllib.utils.annotations import override, PublicAPI
from ray.rllib.utils.compression import pack, compression_supported
from ray.rllib.utils.typing import FileType, SampleBatchType
from ray.util.ml_utils.json import SafeFallbackEncoder
from typing import Any, Dict, List

logger = logging.getLogger(__name__)

WINDOWS_DRIVES = [chr(i) for i in range(ord("c"), ord("z") + 1)]


# TODO(jungong) : use DatasetWriter to back JsonWriter, so we reduce
#     codebase complexity without losing existing functionality.
@PublicAPI
class JsonWriterEdit(OutputWriter):
    """Writer object that saves experiences in JSON file chunks."""

    @PublicAPI
    def __init__(
        self,
        path: str,
        ioctx: IOContext = None,
        max_file_size: int = 64 * 1024 * 1024,
        compress_columns: List[str] = frozenset([]), #["obs", "new_obs"]
    ):
        """Initializes a JsonWriter instance.

        Args:
            path: a path/URI of the output directory to save files in.
            ioctx: current IO context object.
            max_file_size: max size of single files before rolling over.
            compress_columns: list of sample batch columns to compress.
        """


        self.ioctx = ioctx or IOContext()
        self.max_file_size = max_file_size
        self.compress_columns = compress_columns
        if urlparse(path).scheme not in [""] + WINDOWS_DRIVES:
            self.path_is_uri = True
        else:
            path = os.path.abspath(os.path.expanduser(path))
            # Try to create local dirs if they don't exist
            try:
                os.makedirs(path)
            except OSError:
                pass  # already exists
            assert os.path.exists(path), "Failed to create {}".format(path)
            self.path_is_uri = False
        self.path = path
        self.file_index = 0
        self.bytes_written = 0
        self.cur_file = None

    @override(OutputWriter)
    def write(self, sample_batch: SampleBatchType):
        start = time.time()
        data = _to_json(sample_batch, self.compress_columns)
        f = self._get_file()
        f.write(data)
        if hasattr(f, "flush"):  # legacy smart_open impls
            f.flush()
        self.bytes_written += len(data)
        f.close()
        logger.debug(
            "Wrote {} bytes to {} in {}s".format(len(data), f, time.time() - start)
        )

    def _get_file(self) -> FileType:
        timestr = datetime.utcnow().strftime('%H:%M:%S:%f')[:-3]
        path = os.path.join(
            self.path,
            "output-{}_worker-{}_{}.json".format(
                timestr, self.ioctx.worker_index, self.file_index
            ),
        )
        return open(path, "w")


def _to_jsonable(v, compress: bool) -> Any:
    if compress and compression_supported():
        return str(pack(v))
    elif isinstance(v, np.ndarray):
        return v.tolist()
    return v


def _to_json_dict(batch: SampleBatchType, compress_columns: List[str]) -> Dict:
    out = {}
    if isinstance(batch, MultiAgentBatch):
        out["type"] = "MultiAgentBatch"
        out["count"] = batch.count
        policy_batches = {}
        for policy_id, sub_batch in batch.policy_batches.items():
            policy_batches[policy_id] = {}
            for k, v in sub_batch.items():
                policy_batches[policy_id][k] = _to_jsonable(
                    v, compress=k in compress_columns
                )
        out["policy_batches"] = policy_batches
    else:
        out["type"] = "SampleBatch"
        for k, v in batch.items():
            out[k] = _to_jsonable(v, compress=k in compress_columns)
    return out


def _to_json(batch: SampleBatchType, compress_columns: List[str]) -> str:
    out = _to_json_dict(batch, compress_columns)
    return json.dumps(out, cls=SafeFallbackEncoder)

In [None]:
import ray
import os
import shutil
import subprocess
os.mkdir(os.path.join(ray._private.utils.get_user_temp_dir(), "8_data_s_large."))
result = subprocess.run(['ls', os.path.join(ray._private.utils.get_user_temp_dir(), "8_data_s")], stdout=subprocess.PIPE)
for i, s in enumerate(str(result.stdout)[2:].split('\\n')[0:-1]):
    if i % 5 == 0:
        shutil.copyfile(os.path.join(ray._private.utils.get_user_temp_dir(), "8_data_s",s), os.path.join(ray._private.utils.get_user_temp_dir(), "8_data_s_smallish",s))

In [None]:
import subprocess
result = subprocess.run(['ls', os.path.join(ray._private.utils.get_user_temp_dir(), "8_data_s_smallish")], stdout=subprocess.PIPE)
len(str(result.stdout)[2:].split('\\n')[0:])

In [None]:
from shutil import make_archive
make_archive("8_data_s_small", 'zip', os.path.join(ray._private.utils.get_user_temp_dir(), "8_data_s_small"))

2022-06-24 14:55:08,408	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_rollouts()[39m (pid=8184, ip=192.168.16.2, repr=<__main__.Worker object at 0x7fddf218e940>)
  File "<ipython-input-14-ffd2de595bdb>", line 268, in do_rollouts
  File "<ipython-input-14-ffd2de595bdb>", line 211, in rollout
  File "<ipython-input-13-8e26cdbe66b8>", line 62, in rollout
IndexError: index 92 is out of bounds for axis 0 with size 54
2022-06-24 14:55:08,410	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_rollouts()[39m (pid=8178, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f705f06da00>)
  File "<ipython-input-14-ffd2de595bdb>", line 268, in do_rollouts
  File "<ipython-input-14-ffd2de595bdb>", line 211, in rollout
  File "<ipython-input-13-8e26cdbe66b8>", line 62, in rollout
IndexError: index 131 is out of bounds for axis 0 with size 54
2022-06-24 14:55:08,411	ERROR worker.py:94 

2022-06-24 14:55:08,414	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_rollouts()[39m (pid=8200, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f6ec43458e0>)
  File "<ipython-input-14-ffd2de595bdb>", line 268, in do_rollouts
  File "<ipython-input-14-ffd2de595bdb>", line 211, in rollout
  File "<ipython-input-13-8e26cdbe66b8>", line 62, in rollout
IndexError: index 124 is out of bounds for axis 0 with size 54
2022-06-24 14:55:09,408	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_rollouts()[39m (pid=8181, ip=192.168.16.2, repr=<__main__.Worker object at 0x7f1a8648b970>)
  File "<ipython-input-14-ffd2de595bdb>", line 268, in do_rollouts
  File "<ipython-input-14-ffd2de595bdb>", line 211, in rollout
  File "<ipython-input-13-8e26cdbe66b8>", line 62, in rollout
IndexError: index 81 is out of bounds for axis 0 with size 54
2022-06-24 14:55:09,409	ERROR worker.py:94 

2022-06-24 14:55:10,412	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_rollouts()[39m (pid=8203, ip=192.168.16.2, repr=<__main__.Worker object at 0x7feb2f493a30>)
  File "<ipython-input-14-ffd2de595bdb>", line 268, in do_rollouts
  File "<ipython-input-14-ffd2de595bdb>", line 211, in rollout
  File "<ipython-input-13-8e26cdbe66b8>", line 62, in rollout
IndexError: index 135 is out of bounds for axis 0 with size 54
2022-06-24 14:55:10,413	ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): [36mray::Worker.do_rollouts()[39m (pid=8188, ip=192.168.16.2, repr=<__main__.Worker object at 0x7ff0b83ac9d0>)
  File "<ipython-input-14-ffd2de595bdb>", line 268, in do_rollouts
  File "<ipython-input-14-ffd2de595bdb>", line 211, in rollout
  File "<ipython-input-13-8e26cdbe66b8>", line 62, in rollout
IndexError: index 117 is out of bounds for axis 0 with size 54
2022-06-24 14:55:10,414	ERROR worker.py:94