# SAC with CustomADTEnvContinuous

In [None]:
import datetime,gym,time,os,psutil,ray
import numpy as np
import tensorflow as tf
from util import gpu_sess,suppress_tf_warning,tic,toc
from sac import ReplayBuffer,create_sac_model,create_sac_graph,\
    save_sac_model_and_buffers,restore_sac_model_and_buffers
np.set_printoptions(precision=2)
suppress_tf_warning() # suppress warning 
gym.logger.set_level(40) # gym logger 

from episci.environment_wrappers.tactical_action_adt_env_continuous import CustomADTEnvContinuous
from episci.agents.utils.constants import Agents, RewardType
print ("Packaged loaded. TF version is [%s]."%(tf.__version__))

### Rollout Worker

In [None]:
def get_env():
    from episci.environment_wrappers.tactical_action_adt_env_continuous import CustomADTEnvContinuous
    from episci.agents.utils.constants import Agents, RewardType
    
    red_distribution = {
        Agents.SPOT_4G: 0.15,
        Agents.SPOT_5G: 0.30,
        Agents.SPOT_RANDOM: 0.45,
        Agents.EXPERT_SYSTEM_TRIAL_2: 0.6,
        Agents.EXPERT_SYSTEM_TRIAL_3_SCRIMMAGE_4: 0.75,
        Agents.EXPERT_SYSTEM: 1.0
    }
    env_config = {
        "red_distribution": red_distribution,
        "reward_type": RewardType.SHAPED
    }
    return CustomADTEnvContinuous(env_config)

def get_eval_env():
    # from episci.environment_wrappers.tactical_action_adt_env_continuous import CustomADTEnvContinuous
    # from episci.agents.utils.constants import Agents, RewardType
    red_distribution = {
        Agents.SPOT_4G: 0.15,
        Agents.SPOT_5G: 0.30,
        Agents.SPOT_RANDOM: 0.45,
        Agents.EXPERT_SYSTEM_TRIAL_2: 0.6,
        Agents.EXPERT_SYSTEM_TRIAL_3_SCRIMMAGE_4: 0.75,
        Agents.EXPERT_SYSTEM: 1.0
    }
    env_config = {
        "red_distribution": red_distribution,
        "reward_type": RewardType.SHAPED
    }
    return CustomADTEnvContinuous(env_config)


In [None]:
class RolloutWorkerClass(object):
    """
    Worker without RAY (for update purposes)
    """
    def __init__(self,hdims=[256,256],actv=tf.nn.relu,
                 lr=1e-3,gamma=0.99,alpha=0.1,polyak=0.995,seed=1):
        self.seed = seed
        # Each worker should maintain its own environment
        import gym
        from util import suppress_tf_warning
        suppress_tf_warning() # suppress TF warnings
        gym.logger.set_level(40) 
        self.env = get_eval_env()
        odim,adim = self.env.observation_space.shape[0],self.env.action_space.shape[0]
        self.odim = odim
        self.adim = adim
        _ = self.env.reset()
        
        # Create SAC model and computational graph 
        self.model,self.sess = create_sac_model(
            odim=self.odim,adim=self.adim,hdims=hdims,actv=actv)
        self.step_ops,self.target_init = \
            create_sac_graph(self.model,lr=lr,gamma=gamma,alpha=alpha,polyak=polyak)
        
        # Initialize model 
        tf.set_random_seed(self.seed)
        np.random.seed(self.seed)
        self.sess.run(tf.global_variables_initializer())
        self.sess.run(self.target_init)
    
    def get_action(self,o,deterministic=False):
        act_op = self.model['mu'] if deterministic else self.model['pi']
        return self.sess.run(act_op, feed_dict={self.model['o_ph']:o.reshape(1,-1)})[0]

    def get_weights(self):
        """
        Get weights
        """
        weight_vals = self.sess.run(self.model['main_vars'])
        return weight_vals
    
@ray.remote
class RayRolloutWorkerClass(object):
    """
    Rollout Worker with RAY
    """
    def __init__(self,worker_id=0,hdims=[256,256],actv=tf.nn.relu,
                 ep_len_rollout=1000):
        # Parse
        self.worker_id = worker_id
        self.ep_len_rollout = ep_len_rollout
        # Each worker should maintain its own environment
        import gym
        from util import suppress_tf_warning
        suppress_tf_warning() # suppress TF warnings
        gym.logger.set_level(40) 
        self.env = get_env()
        odim,adim = self.env.observation_space.shape[0],self.env.action_space.shape[0]
        self.odim = odim
        self.adim = adim
        _ = self.env.reset()
        
        # Replay buffers to pass
        self.o_buffer = np.zeros((self.ep_len_rollout,self.odim))
        self.a_buffer = np.zeros((self.ep_len_rollout,self.adim))
        self.r_buffer = np.zeros((self.ep_len_rollout))
        self.o2_buffer = np.zeros((self.ep_len_rollout,self.odim))
        self.d_buffer = np.zeros((self.ep_len_rollout))
        
        # Create SAC model
        self.model,self.sess = create_sac_model(
            odim=self.odim,adim=self.adim,hdims=hdims,actv=actv)
        self.sess.run(tf.global_variables_initializer())
        print ("Ray Worker [%d] Ready."%(self.worker_id))
        
        # Flag to initialize assign operations for 'set_weights()'
        self.FIRST_SET_FLAG = True
        
        # Flag to initialize rollout
        self.FIRST_ROLLOUT_FLAG = True
        
    def get_action(self,o,deterministic=False):
        act_op = self.model['mu'] if deterministic else self.model['pi']
        return self.sess.run(act_op, feed_dict={self.model['o_ph']:o.reshape(1,-1)})[0]
    
    def set_weights(self,weight_vals):
        """
        Set weights without memory leakage
        """
        if self.FIRST_SET_FLAG:
            self.FIRST_SET_FLAG = False
            self.assign_placeholders = []
            self.assign_ops = []
            for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
                a = weight_tf_var
                assign_placeholder = tf.placeholder(a.dtype, shape=a.get_shape())
                assign_op = a.assign(assign_placeholder)
                self.assign_placeholders.append(assign_placeholder)
                self.assign_ops.append(assign_op)
        for w_idx,weight_tf_var in enumerate(self.model['main_vars']):
            # Memory-leakage-free assign (hopefully)
            self.sess.run(self.assign_ops[w_idx],
                          {self.assign_placeholders[w_idx]:weight_vals[w_idx]})
            
    def rollout(self):
        """
        Rollout
        """
        if self.FIRST_ROLLOUT_FLAG:
            self.FIRST_ROLLOUT_FLAG = False
            self.o = self.env.reset() # reset environment
        # Loop
        for t in range(ep_len_rollout):
            self.a = self.get_action(self.o,deterministic=False) 
            self.o2,self.r,self.d,_ = self.env.step(self.a)
            # Append
            self.o_buffer[t,:] = self.o
            self.a_buffer[t,:] = self.a
            self.r_buffer[t] = self.r
            self.o2_buffer[t,:] = self.o2
            self.d_buffer[t] = self.d
            # Save next state 
            self.o = self.o2
            if self.d: self.o = self.env.reset() # reset when done 
        return self.o_buffer,self.a_buffer,self.r_buffer,self.o2_buffer,self.d_buffer
    
    def evaluate(self,red=None):
        """
        Evaluate
        """
        o,d,ep_ret,ep_len = self.env.reset(red=red),False,0,0
        while not(d or (ep_len == max_ep_len_eval)):
            a = self.get_action(o,deterministic=True)
            o,r,d,_ = self.env.step(a)
            ep_ret += r # compute return 
            ep_len += 1
        blue_health,red_health = self.env.blue_health,self.env.red_health
        eval_res = [ep_ret,ep_len,blue_health,red_health] # evaluation result 
        return eval_res
    
print ("Rollout worker classes (with and without RAY) ready.")

### Hyperparameters

In [None]:
n_cpu = 11
n_workers = 10
total_steps,evaluate_every,print_every = 50000,200,10
ep_len_rollout = 50*30 # 30sec rollout
hdims,actv = [256,256],tf.nn.relu
batch_size,update_count = 1024,500 # batchsize / number of updates
red_list = [Agents.SPOT_4G,Agents.SPOT_5G,Agents.SPOT_RANDOM,
            Agents.EXPERT_SYSTEM_TRIAL_2,Agents.EXPERT_SYSTEM_TRIAL_3_SCRIMMAGE_4,
            Agents.EXPERT_SYSTEM]
num_eval,max_ep_len_eval = len(red_list),15e3

### Initialize Workers

In [None]:
ray.init(num_cpus=n_cpu,
         memory = 5*1024*1024*1024,
         object_store_memory = 10*1024*1024*1024,
         driver_object_store_memory = 5*1024*1024*1024)
tf.reset_default_graph()
R = RolloutWorkerClass(hdims=hdims,actv=actv,
                       lr=1e-4,gamma=0.99,alpha=1.0,polyak=0.995,seed=0)
workers = [RayRolloutWorkerClass.remote(worker_id=i,hdims=hdims,actv=actv,
                                        ep_len_rollout=ep_len_rollout) 
           for i in range(n_workers)]
print ("RAY initialized with [%d] cpus and [%d] workers."%
       (n_cpu,n_workers))

In [None]:
time.sleep(1)

### Replay Buffers

In [None]:
replay_buffer_long = ReplayBuffer(odim=R.odim,adim=R.adim,size=int(1e6))
replay_buffer_short = ReplayBuffer(odim=R.odim,adim=R.adim,size=int(1e5))

### Loop

In [None]:
start_time = time.time()
n_env_step = 0 # number of environment steps
for t in range(int(total_steps)):
    esec = time.time()-start_time
    
    # Synchronize worker weights
    weights = R.get_weights()
    set_weights_list = [worker.set_weights.remote(weights) for worker in workers] 
    
    # Make rollout and accumulate to Buffers
    t_start = time.time()
    ops = [worker.rollout.remote() for worker in workers]
    rollout_vals = ray.get(ops)
    for rollout_val in rollout_vals:
        o_buffer,a_buffer,r_buffer,o2_buffer,d_buffer = rollout_val
        for i in range(ep_len_rollout):
            o,a,r,o2,d = o_buffer[i,:],a_buffer[i,:],r_buffer[i],o2_buffer[i,:],d_buffer[i]
            replay_buffer_long.store(o, a, r, o2, d) 
            replay_buffer_short.store(o, a, r, o2, d) 
            n_env_step += 1
    sec_rollout = time.time() - t_start
    
    # Update
    t_start = time.time()
    for _ in range(int(update_count)):
        batch_long = replay_buffer_long.sample_batch(batch_size//2) 
        batch_short = replay_buffer_short.sample_batch(batch_size//2) 
        feed_dict = {R.model['o_ph']: np.concatenate((batch_long['obs1'],batch_short['obs1'])),
                     R.model['o2_ph']: np.concatenate((batch_long['obs2'],batch_short['obs2'])),
                     R.model['a_ph']: np.concatenate((batch_long['acts'],batch_short['acts'])),
                     R.model['r_ph']: np.concatenate((batch_long['rews'],batch_short['rews'])),
                     R.model['d_ph']: np.concatenate((batch_long['done'],batch_short['done']))
                    }
        outs = R.sess.run(R.step_ops, feed_dict)
        q1_vals,q2_vals = outs[3],outs[4]
    sec_update = time.time() - t_start
    
    # Synchronize worker weights (after update)
    weights = R.get_weights()
    set_weights_list = [worker.set_weights.remote(weights) for worker in workers] 
    
    # Print
    if (t == 0) or (((t+1)%print_every) == 0): 
        print ("[%d/%d] rollout:[%.1f]s update:[%.1f]s."%
               (t+1,total_steps,sec_rollout,sec_update))
    
    # Evaluate
    if (t == 0) or (((t+1)%evaluate_every) == 0): 
        ram_percent = psutil.virtual_memory().percent # memory usage
        print ("[Eval. start] step:[%d/%d][%.1f%%] #step:[%.1e] time:[%s] ram:[%.1f%%]."%
               (t+1,total_steps,t/total_steps*100,
                n_env_step,
                time.strftime("day:[%d] %H:%M:%S", time.gmtime(time.time()-start_time)),
                ram_percent)
              )
        
        LOCAL_EVAL = 0
        if LOCAL_EVAL:
            ep_ret_sum = 0
            for eval_idx in range(num_eval): 
                red = red_list[eval_idx]
                o,d,ep_ret,ep_len = R.env.reset(red=red),False,0,0
                while not(d or (ep_len == max_ep_len_eval)):
                    a = R.get_action(o,deterministic=True)
                    o,r,d,_ = R.env.step(a)
                    ep_ret += r # compute return
                    ep_len += 1
                ep_ret_sum += ep_ret
                blue_health,red_health = R.env.blue_health,R.env.red_health
                print (" [%d/%d] [%s] ep_ret:[%.4f] ep_len:[%d]. blue health:[%.2f] red health:[%.2f]"
                    %(eval_idx,num_eval,red,ep_ret,ep_len, blue_health,red_health))
            ep_ret_avg = ep_ret_sum / num_eval
            print ("[Eval. done] time:[%s] ep_ret_avg:[%.3f]."%
                   (time.strftime("day:[%d] %H:%M:%S", time.gmtime(time.time()-start_time)),
                    ep_ret_avg)
                  )
        else: # parallel evaluation with Ray
            ops = []
            for i_idx in range(num_eval):
                worker,red = workers[i_idx],red_list[i_idx]
                ops.append(worker.evaluate.remote(red=red))
            eval_vals = ray.get(ops)
            ep_ret_sum = 0
            for i_idx in range(num_eval):
                red,eval_val = red_list[i_idx],eval_vals[i_idx]
                ep_ret,ep_len,blue_health,red_health = eval_val[0],eval_val[1],eval_val[2],eval_val[3]
                ep_ret_sum += ep_ret
                print (" [%d/%d] [%s] ep_ret:[%.4f] ep_len:[%d]. blue health:[%.2f] red health:[%.2f]"
                    %(i_idx,len(eval_vals),red,ep_ret,ep_len,blue_health,red_health))
            ep_ret_avg = ep_ret_sum / num_eval
            print ("[Eval. done] time:[%s] ep_ret_avg:[%.3f]."%
                   (time.strftime("day:[%d] %H:%M:%S", time.gmtime(time.time()-start_time)),
                    ep_ret_avg)
                  )
        
        # Save current SAC model and replay buffers 
        npz_path = '../data/net/adt_cont_tactic/model_and_buffers.npz'
        save_sac_model_and_buffers(npz_path,R,replay_buffer_long,replay_buffer_short,
                                   VERBOSE=False,IGNORE_BUFFERS=True)

print ("Done.")

### Close

In [None]:
ray.shutdown()

### Save model weights and replay buffers

In [None]:
# Path to save the npz file 
npz_path = '../data/net/adt_cont_tactic/model_and_buffers_final.npz'
save_sac_model_and_buffers(npz_path,R,replay_buffer_long,replay_buffer_short,
                           VERBOSE=False,IGNORE_BUFFERS=True)

### Restore 

In [None]:
npz_path = '../data/net/adt_cont_tactic/model_and_buffers_final.npz'
restore_sac_model_and_buffers(npz_path,R,replay_buffer_long,replay_buffer_short,VERBOSE=True)

### Test Run

In [None]:
eval_env = get_eval_env()
red_list = [Agents.SPOT_4G,Agents.SPOT_5G,Agents.SPOT_RANDOM,
            Agents.EXPERT_SYSTEM_TRIAL_2,Agents.EXPERT_SYSTEM_TRIAL_3_SCRIMMAGE_4,
            Agents.EXPERT_SYSTEM]
red = red_list[0]
o,d,ep_ret,ep_len = eval_env.reset(red=red),False,0,0
while not(d or (ep_len == max_ep_len_eval)):
    a = R.get_action(o,deterministic=True)
    o,r,d,_ = eval_env.step(a)
    ep_ret += r # compute return 
    ep_len += 1
print ("[Evaluate] ep_ret:[%.4f] ep_len:[%d]"
    %(eval_idx,ep_len))
eval_env.close() # close env