In [1]:
%%capture
!pip install gymnasium
!pip install gymnasium[classic-control]

In [2]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import torch
from torch import nn
import torch.nn.functional as F
import os
import datetime


import import_ipynb
from TrulyPlastic_allOpt_5 import plastic_nn
from TrulyPlastic_allOpt_5 import input_layer
from TrulyPlastic_allOpt_5 import layer

importing Jupyter notebook from TrulyPlastic_allOpt_5.ipynb


In [4]:

class replay_memory():
    def __init__(s, maxlen):
        s.memory = deque([], maxlen=maxlen)

    def append(s, transition):
        s.memory.append(transition)

    def sample(s, sample_size):
        return random.sample(s.memory, sample_size)

    def __len__(s):
        return len(s.memory)




class DQN():
   
    def __init__(s, ct = 0, tag=0, path = r'test', 
                 game_name = 'MountainCar-v0', 
                  
                 mini_batch_size = 32,  num_divisions = 1, 
                 replay_memory_size = 100000, 
                 network_sync_rate = 50000, discount_factor_g = 0.9):
        
        if (ct == 0):
            ct = datetime.datetime.now()
            ct = str(ct)
            ct = ct.replace(":", "-")
            ct = ct.replace(" ", "_")
            ct = ct[:-7]

        s.ct = ct
        current_directory = os.getcwd()
        final_directory_pics = os.path.join(current_directory, s.ct)
        
        final_directory_pics = os.path.join(final_directory_pics, 'pics')
        if not os.path.exists(final_directory_pics):
            os.makedirs(final_directory_pics)
        
        if (tag != 0):
            s.set_tag(tag)

        s.game_name = game_name
        s.discount_factor_g = discount_factor_g
         
        s.mini_batch_size = mini_batch_size 
        s.num_divisions = num_divisions

        # Divide position and velocity into segments
        s.lin_spaces = []
        env = gym.make(s.game_name)
        obs_space = env.observation_space
        
        for i in range(obs_space.shape[0]):
            s.lin_spaces.append(np.linspace(env.observation_space.low[i], env.observation_space.high[i], s.num_divisions))

        env.close()
        
        
        s.replay_memory_size =  replay_memory_size 
        s.network_sync_rate = network_sync_rate
        
    def set_tag(s, tag):
        s.tag = tag
        s.path = s.ct+'/'+s.tag
        current_directory = os.getcwd()
        final_directory = os.path.join(current_directory, s.path)
        if not os.path.exists(final_directory):
            os.makedirs(final_directory)
        
   
    def plot_progress(self, rewards_per_episode_, epsilon_history_):
        
        plt.figure()
        
        plt.xlabel('epochs')
        plt.ylabel('reward')
        plt.plot(rewards_per_episode_)

        plt.savefig(f'{self.path}/info_rew_{self.tag}.png'.format(self.path, self.tag))
        plt.savefig(f'{self.ct}/pics/info_rew_{self.tag}.png')
        plt.close()

        plt.figure()
        
        plt.xlabel('epochs')
        plt.ylabel('epsilon')
        plt.plot(epsilon_history_)
        plt.savefig(f'{self.path}/info_eps_{self.tag}.png'.format(self.path, self.tag))
        plt.close()

    
    

    

    def train(self, policy_dqn, episodes, render=False):
        target_dqn = plastic_nn()
        target_dqn = policy_dqn.deep_copy()

        env = gym.make(self.game_name, render_mode='human' if render else None)

        
        num_states = env.observation_space.shape[0] # expecting 2: position & velocity
        num_actions = env.action_space.n


        epsilon = 1 # 1 = 100% random actions
        memory = replay_memory(self.replay_memory_size)

        rewards_per_episode = []
        epsilon_history = []

        # Track number of steps taken. Used for syncing policy => target network.
        step_count = 0
        goal_reached = False
        best_rewards = -200

        for i in range(episodes+1):
            state = env.reset()[0]  # Initialize to state 0 seed=int(i+10)
            terminated = False      # True when agent falls in hole or reached goal
            truncated = False
            rewards = 0

            while(not terminated and rewards < 300):
                if random.random() < epsilon:
                    action = env.action_space.sample() # actions: 0=left,1=idle,2=right
                else:
                    res = policy_dqn.forward(self.state_to_dqn_input(state))
                    action = res.argmax().item()

                new_state,reward,terminated,truncated,_ = env.step(action)
                rewards += reward
                memory.append((state, action, new_state, reward, terminated))
                state = new_state
                
                step_count+=1


            rewards_per_episode.append(rewards)
            
            if(terminated):
                goal_reached = True

            # Graph training progress
            if(i!=0 and i%1000==0):
                print(f'Episode {i} Epsilon {epsilon}')
                policy_dqn.save(f'{self.path}/mc_policy_{i}'.format(self.path, i))
                self.add_log_data(f'Episode {i} Epsilon {epsilon}')
                self.plot_progress(rewards_per_episode, epsilon_history)

            if rewards>best_rewards:
                best_rewards = rewards
                print(f'Best rewards so far: {best_rewards}')
                self.add_log_data(f'Best rewards so far: {best_rewards}')
                policy_dqn.save(f'{self.path}/mc_policy_{i}'.format(self.path, i))
                policy_dqn.save(f'{self.path}/mc_policy_best'.format(self.path))
                

            # Check if enough experience has been collected
            if len(memory)>self.mini_batch_size and goal_reached:
                
                #print(f'OPTIMIZE Episode {i} Epsilon {epsilon} rewards {rewards}') # print(rewards)

                mini_batch = memory.sample(self.mini_batch_size)
                self.optimize(mini_batch, policy_dqn, target_dqn)

                # Decay epsilon
                epsilon = epsilon = max(epsilon - 1/episodes, 0.01) # max(epsilon*0.99996, 0.05)#
                epsilon_history.append(epsilon)

                # Copy policy network to target network after a certain number of steps
                if step_count > self.network_sync_rate:
                    target_dqn = policy_dqn.deep_copy()
                    step_count = 0
                   
                
                
        env.close()
        policy_dqn.save(f'{self.path}/mc_policy_last_{self.tag}'.format(self.path, self.tag))
        if (best_rewards == -200):
            policy_dqn.save(f'{self.path}/mc_policy_best'.format(self.path))
        self.save_reward_data(rewards_per_episode)
        #print(rewards_per_episode)



    
    def optimize(self, mini_batch, policy_dqn, target_dqn):
        target_q_list = []
        input_list = []
        
        for state, action, new_state, reward, terminated in mini_batch:
            if terminated:
                target = reward
            else:
                target = reward + self.discount_factor_g * target_dqn.forward(self.state_to_dqn_input(new_state)).max()

            # Get the target set of Q values
            state_dsc = np.asarray(self.state_to_dqn_input(state))
            input_list.append(state_dsc)
            
            target_q = target_dqn.forward(state_dsc)
            
            # Adjust the specific action to the target that was just calculated
            target_q[action] = target            
            target_q_list.append(target_q)

        #BACKPOP AND UPDATE on minibatch
        x = np.asarray(input_list)
        x = x[:, :, 0]
        x = x.T

        y = np.asarray(target_q_list)
        y = y[:, :, 0]
        y = y.T

        policy_dqn.learn_one(x, y)



    def state_to_dqn_input(s, state):
        # d_state = []
        # for i in range(state.shape[0]):
        #     dig = np.digitize(state[i], s.lin_spaces[i])
        #     d_state.append(np.asarray([dig]))

        # return np.asarray(d_state)
        return np.asarray([[state[0]], [state[1]], [state[2]], [state[3]]])


        

    def test(self, policy_dqn, episodes, render = False):
        env = gym.make(self.game_name, render_mode='human' if render else None)
        
        num_states = env.observation_space.shape[0]
        num_actions = env.action_space.n

        done_count = 0
        medium_reward = 0
        reward_list = []
        
        for i in range(episodes):
            state, info = env.reset() 
            done = False
            truncated = False 
            rewards = 0

            while(not done and not truncated):
                state = self.state_to_dqn_input(state)

                res = policy_dqn.forward(state)

                action = res.argmax().item()

                state, reward, done, truncated, _ = env.step(action)
                rewards+=reward
                if (truncated):
                    done_count += 1
                    break

            medium_reward += rewards
            reward_list.append(rewards)
                

        
        env.close()
        medium_reward = medium_reward / episodes
        return done_count*100.0/episodes, medium_reward, reward_list
        
    def save_info(s, info):
        file_path = f'{s.path}/info_{s.tag}.txt'.format(s.path, s.tag)
        f = open(file_path, "a")   
       


        f.write("data {}\n".format(s.ct))
        f.write("tag {}\n".format(s.tag))
        f.write("game_name {}\n".format(s.game_name))
        f.write("reward discount factor {}\n".format(s.discount_factor_g))
        f.write("minibatch size {}\n".format(s.mini_batch_size))
        f.write("num divisions{}\n".format(s.num_divisions))
        f.write("replay memory size {}\n".format(s.replay_memory_size))
        f.write("network sync rate {}\n".format(s.network_sync_rate))
        f.write("info {}\n".format(info))
                         
        f.close()
    
    def add_log_data(s, data):
        file_path = f'{s.path}/info_{s.tag}.txt'.format(s.path, s.tag)
        f = open(file_path, "a") 
        f.write("{}\n".format(data))
                         
        f.close()
        
    def save_reward_data(s, data):
        file_path = f'{s.path}/rewards_{s.tag}.txt'.format(s.path, s.tag)
        f = open(file_path, "a") 
        f.write("{}\n".format(data))
                         
        f.close()


## alg

In [5]:
class DQN_search():
    def __init__(s, dqn_model = 0, policy_dqn = 0):
        if (policy_dqn == 0):
            s.policy_dqn = plastic_nn(optimizer="Adam")
        else:
            s.policy_dqn = policy_dqn

        if (dqn_model == 0):
            s.dqn_model = DQN()
        else:
            s.dqn_model = dqn_model
        pass

    def set_DQN(s, dqn_model):
        s.dqn_model = dqn_model
        
    def set_NN(s, policy_dqn):
        s.policy_dqn = policy_dqn

    def set_a_type_array(s, a_type):
        s.atype = a_type

    def set_lr(s, lr):
        s.lr = lr

    def set_nn_topology(s, layers_net):
        s.policy_dqn.delete_layers()
        s.policy_dqn.append_layers(layers_net)
        s.policy_dqn.optimizer_reset()
                
    

    def create_net(s, shape):
        layers_net = []
        layers_net.append(input_layer(shape[0]))
        for i in range(1, len(shape)):
            new_layer = layer(lr = s.lr, prev_size = shape[i-1], my_size=shape[i], activation_type=s.atype[i])
            # new_layer.print_info()
            layers_net.append(new_layer)
        return layers_net

    def train(s, epochs):       
        s.dqn_model.train(s.policy_dqn, epochs, False)

    def update_info_tag(s, m, shape):

        shape_tag= ''.join(str(x) for x in shape)
        s.dqn_model.set_tag(tag = shape_tag+'_'+str(m))
        s.dqn_model.save_info(f'lr: {s.lr} \nshape:{shape} \na1:{s.atype}\n')

    def test(s, n_tests_for_model):
        path = s.dqn_model.path

        s.policy_dqn.load(f"{path}/mc_policy_best")

        return s.dqn_model.test(s.policy_dqn, n_tests_for_model, render = False)
        
                
                
        
    def search(s, n_models, TSR, start_shape, shape_steps, final_shape,
              epochs = 2000, n_tests_for_model = 100,
               adaptive_n_models = False, min_best_result = 0
              ):
        
        shape = start_shape

        steps_dif = shape_steps.shape[0]
        step_i = 0

        flag = False
        adapt_cnt = 0
        while not (np.array_equal(shape, final_shape)):  
            m = 0
            best_result = min_best_result
            while (m < n_models): 
                print(m, shape)
                s.set_nn_topology(s.create_net(shape))
                if (flag):
                    numb = str(m) +'_'+str(adapt_cnt)
                else:
                    numb = m
                s.update_info_tag(numb, shape)

                #train
                s.train(epochs)

                # test model
                test_res, medium_reward, reward_list = s.test(n_tests_for_model)
                
                print('test_res ', test_res)
                print('medium_reward ', medium_reward)

                if (test_res >= TSR):
                    print('success shape: ', shape)
                    return test_res, shape, s.policy_dqn

                if (adaptive_n_models):
                    if (test_res > best_result):
                        m -= 1
                        if (flag == True):
                            adapt_cnt+=1
                        flag = True
                        best_result = test_res
                    else:
                        flag = False
                        adapt_cnt = 0
                m+=1
            
            #shape step
            step_mask = shape_steps[step_i]
            step_i += 1
            step_i %= steps_dif
    
            shape+=step_mask

    
        print('search falied') 
        return False, 0, 0