<a href="https://colab.research.google.com/github/AssistMoli/deep_deducing/blob/main/CartPole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing requirements

In [None]:
!sudo apt-get install python3.10

In [None]:
!pip install pandas==2.0.3 numpy==1.25.2 scipy==1.11.4 swig==4.2.1 ufal.pybox2d==2.3.10.3 gym==0.25.2 pygame==2.5.2 tqdm torch==2.0.1

In [None]:
# For local
# cuda==11.8.0 cudnn==8.9.7.29
# pip install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cu118

# importing modules

In [None]:
import numpy as np
import pandas as pd
from scipy.special import expit
import gym
import copy
import os
import sys
from tqdm import tqdm
import random
import math

import pickle

import multiprocessing
import time
import csv
from collections import Counter

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.nn.utils.rnn as rnn_utils
from torch.utils.data import DataLoader, TensorDataset
import gc

# Checking cuda

In [None]:
if torch.cuda.is_available():
    for i in range(torch.cuda.device_count()):
        print(f"Device {i}: {torch.cuda.get_device_name(i)}")
    device_index = 0
    device = torch.device(f"cuda:{device_index}")
    print('using cuda...')
else:
    device = torch.device("cpu") 
    print('using cpu...')

# Creating a class for building model

In [None]:
# Our multihead-attention code comes from the reference material in following websites. Hoewever, we made some changes to the masking mechanism:
# https://medium.com/the-dl/transformers-from-scratch-in-pytorch-8777e346ca51
# https://towardsdatascience.com/build-your-own-transformer-from-scratch-using-pytorch-84c850470dcb
# https://ai.plainenglish.io/building-and-training-a-transformer-from-scratch-fdbf3db00df4
# However, we are not going to use multihead-attention here. I wrote it down just for future research.

# class MultiheadAttention(nn.Module):
#     def __init__(self, d_model, num_heads):
#         super(MultiheadAttention, self).__init__()
#         assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
# 
#         self.bias = False
# 
#         self.d_model   = d_model
#         self.num_heads = num_heads
#         self.d_k       = d_model // num_heads
# 
#         self.W_q  = nn.Linear(d_model, d_model, bias=self.bias)
#         self.W_k  = nn.Linear(d_model, d_model, bias=self.bias)
#         self.W_v  = nn.Linear(d_model, d_model, bias=self.bias)
#         self.W_o  = nn.Linear(d_model, d_model, bias=self.bias)
# 
#     def scaled_dot_product_attention(self, Q, K, V, mask=None):
# 
#         attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
# 
#         if mask is not None:
# 
#             mask = mask.unsqueeze(1).repeat(1, attn_scores.size(1), 1)
#             mask = mask.unsqueeze(3).repeat(1, 1, 1, attn_scores.size(2))
#             attn_scores = attn_scores.masked_fill(mask == True, -1e9)
#             attn_scores = attn_scores.masked_fill(mask.transpose(-2, -1) == True, -1e9)
#         else:
#             attn_scores = attn_scores
# 
#         attn_probs = torch.softmax(attn_scores, dim=-1)
# 
#         if mask is not None:
#             inverted_mask = ~mask
#             attn_probs = attn_probs * inverted_mask.type_as(attn_probs)
# 
#         output = torch.matmul(attn_probs, V)
# 
#         return output
# 
#     def split_heads(self, x):
#         batch_size, seq_length, d_model = x.size()
#         return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
#         #  (batch_size, seq_length, d_model) - > (batch_size, seq_length, self.num_heads, self.d_k) -> (batch_size, self.num_heads, seq_length, self.d_k)
# 
#     def combine_heads(self, x):
#         batch_size, _, seq_length, d_k = x.size()
#         return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
# 
#     def forward(self, Q, K, V, mask=None):
#         Q = self.split_heads(self.W_q(Q))
#         K = self.split_heads(self.W_k(K))
#         V = self.split_heads(self.W_v(V))
#         attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
#         output      = self.W_o(self.combine_heads(attn_output))
#         return output




class build_model(nn.Module):
    def __init__(self,
                 input_neuron_size_,
                 hidden_neuron_size,
                 input_neuron_size,
                 input_sequence_size,
                 output_neuron_size,
                 neural_type,
                 num_layers,
                 num_heads,
                 hidden_activation,
                 output_activation,
                 initializer,
                 optimizer,
                 loss,
                 alpha,
                 mask_value):

        super(build_model, self).__init__()

        self.input_neuron_size_   = int(input_neuron_size_)
        self.hidden_neuron_size   = int(hidden_neuron_size)
        self.input_neuron_size    = int(input_neuron_size)
        self.input_sequence_size  = int(input_sequence_size)
        self.output_neuron_size   = int(output_neuron_size)
        self.neural_type          = neural_type
        self.num_heads            = num_heads

        self.hidden_activation    = hidden_activation
        self.output_activation    = output_activation
        self.initializer          = initializer
        self.optimizer            = optimizer
        self.loss                 = loss
        self.alpha                = alpha
        self.mask_value           = mask_value

        self.bias = False

        self.num_layers = num_layers

        neural_types = {
            'rnn': nn.RNN,
            'gru': nn.GRU,
            'lstm': nn.LSTM
        }

        self.fully_connected_layer_in_0      = nn.Linear(self.input_neuron_size_, self.hidden_neuron_size, bias=self.bias)
        self.fully_connected_layer_in_1      = nn.Linear(self.hidden_neuron_size, self.hidden_neuron_size, bias=self.bias)
        self.fully_connected_layer_out_0     = nn.Linear(self.hidden_neuron_size, self.hidden_neuron_size, bias=self.bias)
        self.fully_connected_layer_out_1     = nn.Linear(self.hidden_neuron_size, self.input_neuron_size_, bias=self.bias)

        self.recurrent_layer_0               = neural_types[neural_type.lower()](self.input_neuron_size, self.hidden_neuron_size, num_layers=self.num_layers, batch_first=False, bias=self.bias)

        # self.attention_layer_0               = MultiheadAttention(self.hidden_neuron_size, num_heads= self.num_heads)
        # self.att_norm_0                      = nn.LayerNorm(self.hidden_neuron_size)
        # self.fully_connected_layer_0         = nn.Linear(self.hidden_neuron_size, self.hidden_neuron_size, bias=self.bias)
        # self.fc_norm_0                       = nn.LayerNorm(self.hidden_neuron_size)

        self.fully_connected_layer_7         = nn.Linear(self.hidden_neuron_size * self.input_sequence_size, self.output_neuron_size, bias=self.bias)

        # Activation functions
        self.hidden_activation = self.get_activation(self.hidden_activation)
        self.output_activation = self.get_activation(self.output_activation)

        # Initialize weights for fully connected layers
        self.initialize_weights(self.initializer  )

        # Optimizer
        optimizers = {
            'adam': optim.Adam,
            'sgd': optim.SGD,
            'rmsprop': optim.RMSprop
        }
        self.selected_optimizer = optimizers[self.optimizer.lower()](self.parameters(), lr=self.alpha)

        # Loss function
        losses = {
            'mean_squared_error': torch.nn.MSELoss(),
            'binary_crossentropy': torch.nn.BCELoss()
        }
        self.loss_function = losses[self.loss .lower()]

    def forward(self, initial_hidden, x, padding_mask):

        h  = self.fully_connected_layer_in_0(initial_hidden)
        h  = self.hidden_activation(h)
        h  = self.fully_connected_layer_in_1(h)
        h  = self.hidden_activation(h)
        h  = torch.unsqueeze(h, dim=0).repeat(self.num_layers, 1, 1)

        out          = x.permute(1, 0, 2)
        lengths      = (out != self.mask_value).any(dim=2).sum(dim=0).cpu().long() # since x is (sequence_length, batch_size, input_size), we should use sum(dim=0)
        out          = rnn_utils.pack_padded_sequence(out, lengths, batch_first=False, enforce_sorted=False)

        # Forward propagate RNN
        if self.neural_type == 'lstm':
            out, h     = self.recurrent_layer_0(out, (h, h))
            h          = h[0] 
        else:
            out, h     = self.recurrent_layer_0(out, h)
            h          = h 

        out, _     = rnn_utils.pad_packed_sequence(out, batch_first=False)
        padding    = (0, 0, 0, 0, 0, self.input_sequence_size - out.size(0))
        out        = F.pad(out, padding, "constant", 0)
        out        = out.permute(1, 0, 2)

        # h  = self.hidden_activation(h)
        h  = self.fully_connected_layer_out_0(h)
        h  = self.hidden_activation(h)
        h  = self.fully_connected_layer_out_1(h)
        h  = self.output_activation(h)

        # if padding_mask is not None:
        #     padding_mask = torch.any(padding_mask, dim=-1) # to (batch_size, sequence_length)
        # out_ = self.attention_layer_0(out, out, out, padding_mask)
        # out  = self.att_norm_0(out + out_)
        # out_ = self.fully_connected_layer_0(out)
        # out  = self.fc_norm_0(out + out_)

        out = torch.flatten(out, start_dim=1)

        out = self.fully_connected_layer_7(out)
        out = self.output_activation(out)

        return out, h


    def custom_activation(self, x):
        return torch.sigmoid(x - 1.5)

    def get_activation(self,  activation):
        activations = {
            'relu': nn.ReLU(),
            'leaky_relu': nn.LeakyReLU(),
            'sigmoid': nn.Sigmoid(),
            'tanh': nn.Tanh()
        }
        return activations[ activation.lower()]

    def initialize_weights(self, initializer):
        initializers = {
            'random_uniform': nn.init.uniform_,
            'random_normal': nn.init.normal_,
            'glorot_uniform': nn.init.xavier_uniform_,
            'glorot_normal': nn.init.xavier_normal_,
            'xavier_uniform': nn.init.xavier_uniform_,
            'xavier_normal': nn.init.xavier_normal_
        }
        initializer = initializers[initializer.lower()]
        for layer in self.children():
            if isinstance(layer, nn.Linear):
                initializer(layer.weight)


# Function for updating input value using error backprop

In [None]:

def update_action_value(epoch_for_deducing,
            model_loader,
            desired_reward,
            state,
            action_value,
            beta):

    model_loader_ = copy.deepcopy(model_loader)

    for epoch in range(epoch_for_deducing):

        random.shuffle(model_loader_)

        for model in model_loader_:

            action = torch.sigmoid(action_value)

            action = action.clone().detach().requires_grad_(True)
            if action.grad is not None:
                action.grad.zero_()
            for param in model.parameters():
                param.requires_grad = False
            loss_function = model.loss_function

            output, _ = model(state, action, padding_mask=None)
            total_loss = loss_function(output, desired_reward)

            total_loss.backward() # Error Backpropagation
            action_value -= action.grad * (1 - action) * action * beta # Update Input Data

    return action_value




# Function for updating weight matrices using error backprop

Elastic weight consolidation
https://arxiv.org/pdf/1612.00796

In [None]:
def update_model(batch_size,
                 epoch_for_learning,
                 model,
                 train_loader,
                 dataset,
                 prev_model,
                 prev_gradient_matrix,
                 prev_train_loader_size,
                 EWC_lambda):

    prev_model_param = prev_model.state_dict()




    past_slice = 1000
    combined_list = [1] * len(train_loader) + [0] * past_slice
    index_list = list(range(len(train_loader) ))




    model.train()

    selected_optimizer = model.selected_optimizer
    for param_group in selected_optimizer.param_groups:
        param_group['lr'] = model.alpha * batch_size

    for epoch in range(epoch_for_learning):

        random.shuffle(combined_list)
        random.shuffle(index_list)
        i = 0

        for j in range(len(train_loader) + past_slice ):

            if combined_list[j] == 1:

              state, action, reward, next_state, padding_mask = dataset[index_list[i]]
              i+=1

              state  = state.unsqueeze(0)
              action = action.unsqueeze(0)
              reward = reward.unsqueeze(0)
              next_state = next_state.unsqueeze(0)
              padding_mask = padding_mask.unsqueeze(0)

              next_state = torch.unsqueeze(next_state, dim=0).repeat(model.num_layers, 1, 1)

              selected_optimizer.zero_grad()
              loss_function = model.loss_function

              output, output_state = model(state, action, padding_mask)
              total_loss = loss_function(output, reward) + loss_function(output_state, next_state)

              total_loss.backward()

              selected_optimizer.step() # Update Model Weight

            else:

              selected_optimizer.zero_grad()

              for name, param in model.named_parameters():
                    param.grad = (prev_gradient_matrix[name] - (prev_model_param[name] - param) ) * EWC_lambda * ( prev_train_loader_size /past_slice )

              selected_optimizer.step() # Update Model Weight




    present_gradient_matrix = {name: torch.zeros_like(param) for name, param in model.named_parameters()}

    for epoch in range(1):

        random.shuffle(combined_list)
        random.shuffle(index_list)
        i = 0

        for j in range(len(train_loader)):

              state, action, reward, next_state, padding_mask = dataset[index_list[i]]
              i+=1

              state  = state.unsqueeze(0)
              action = action.unsqueeze(0)
              reward = reward.unsqueeze(0)
              next_state = next_state.unsqueeze(0)
              padding_mask = padding_mask.unsqueeze(0)

              next_state  = torch.unsqueeze(next_state, dim=0).repeat(model.num_layers, 1, 1)

              selected_optimizer.zero_grad()
              loss_function = model.loss_function

              output, output_state = model(state, action, padding_mask)
              total_loss = loss_function(output, reward) + loss_function(output_state, next_state)

              total_loss.backward()

              for name, param in model.named_parameters():
                    present_gradient_matrix[name] += param.grad

        for j in range(1):

              selected_optimizer.zero_grad()

              for name, param in model.named_parameters():
                    param.grad = (prev_gradient_matrix[name] - (prev_model_param[name] - param) ) * EWC_lambda * prev_train_loader_size


    present_gradient_matrix = {name: param / ( (len(train_loader) + prev_train_loader_size) ) for name, param in present_gradient_matrix.items()}

    present_train_loader_size = len(train_loader) + prev_train_loader_size

    return model, present_gradient_matrix, present_train_loader_size


In [None]:
# Directional EWC
def EWC_loss( EWC_lambda, model, present_model, present_gradient_matrix, prev_model, prev_gradient_matrix, prev_train_loader_size):
    prev_model_param = prev_model.state_dict()
    loss = 0
    for name, param in model.named_parameters():
        loss += ( ((present_gradient_matrix[name] - prev_gradient_matrix[name])     *     (param - prev_model_param[name])) **2 ).sum()
    return EWC_lambda * loss

# Gradient-based EWC
def EWC_loss( EWC_lambda, model, present_model, present_gradient_matrix, prev_model, prev_gradient_matrix, prev_train_loader_size):
    present_model_param = present_model.state_dict()
    prev_model_param    = prev_model.state_dict()
    loss = 0
    for name, param in model.named_parameters():
        loss += (  (( (present_gradient_matrix[name] - (present_model_param[name]  - param)) -
                      (prev_gradient_matrix[name]    - (prev_model_param[name]     - param))   )**2   )
                  *((  param - prev_model_param[name]   )**2                                                )
                  *((  param - present_model_param[name])**2                                                )        ).sum()
    return EWC_lambda * loss

# Traditional EWC
def EWC_loss( EWC_lambda, model, present_model, present_gradient_matrix, prev_model, prev_gradient_matrix, prev_train_loader_size):
    prev_model_param = prev_model.state_dict()
    loss = 0
    for name, param in model.named_parameters():
        loss += ( ((prev_gradient_matrix[name])**2) * ((param - prev_model_param[name])**2)     ).sum()
    return EWC_lambda * loss

def update_model(batch_size,
                 epoch_for_learning,
                 model,
                 train_loader,
                 dataset,
                 prev_model,
                 prev_gradient_matrix,
                 prev_train_loader_size,
                 EWC_lambda):




    # retrieving present_gradient_matrix
    present_model = copy.deepcopy(model)
    present_gradient_matrix =  {name: torch.zeros_like(param) for name, param in model.named_parameters()}

    for epoch in range(1):

        model.train()

        selected_optimizer = model.selected_optimizer
        for param_group in selected_optimizer.param_groups:
            param_group['lr'] = model.alpha * batch_size

        for state, action, reward, next_state, padding_mask in train_loader:

            next_state  = torch.unsqueeze(next_state, dim=0).repeat(model.num_layers, 1, 1)

            selected_optimizer.zero_grad()
            loss_function = model.loss_function

            output, output_state = model(state, action, padding_mask)
            total_loss = loss_function(output, reward) + loss_function(output_state, next_state)

            total_loss.backward()     # Error Backpropagation

            for name, param in model.named_parameters():
                present_gradient_matrix[name] += param.grad

    present_gradient_matrix = {name: param / len(train_loader) for name, param in present_gradient_matrix.items()}




    for epoch in range(epoch_for_learning):

        model.train()

        selected_optimizer = model.selected_optimizer
        for param_group in selected_optimizer.param_groups:
            param_group['lr'] = model.alpha * batch_size

        for state, action, reward, next_state, padding_mask in train_loader:

            next_state  = torch.unsqueeze(next_state, dim=0).repeat(model.num_layers, 1, 1)

            selected_optimizer.zero_grad()
            loss_function = model.loss_function

            output, output_state = model(state, action, padding_mask)
            total_loss = loss_function(output, reward)  + loss_function(output_state, next_state)

            total_loss += EWC_loss(EWC_lambda, model, present_model, present_gradient_matrix, prev_model, prev_gradient_matrix, prev_train_loader_size)

            total_loss.backward()     # Error Backpropagation

            selected_optimizer.step() # Update Model Weight




    # training and updating present_gradient_matrix
    updated_present_gradient_matrix = {name: torch.zeros_like(param) for name, param in model.named_parameters()}

    for epoch in range(1):

        model.train()

        selected_optimizer = model.selected_optimizer
        for param_group in selected_optimizer.param_groups:
            param_group['lr'] = model.alpha * batch_size

        for state, action, reward, next_state, padding_mask in train_loader:

            next_state  = torch.unsqueeze(next_state, dim=0).repeat(model.num_layers, 1, 1)

            selected_optimizer.zero_grad()
            loss_function = model.loss_function

            output, output_state = model(state, action, padding_mask)
            total_loss = loss_function(output, reward)  + loss_function(output_state, next_state)

            total_loss.backward()     # Error Backpropagation

            for name, param in model.named_parameters():
                updated_present_gradient_matrix[name] += param.grad 

    updated_present_gradient_matrix = {name: param / len(train_loader) for name, param in updated_present_gradient_matrix.items()}





    present_train_loader_size = len(train_loader) + prev_train_loader_size

    return model, present_gradient_matrix, present_train_loader_size

# Function for re-initialize action value in each step

In [None]:
def initialize_input(init, noise_t, noise_r, shape):
    input = 0
    if   init == "random_uniform":
        for _ in range(noise_t):
            input += np.array([  np.random.uniform(low=0, high=1, size=shape)    ]) * noise_r
    elif init == "random_normal":
        for _ in range(noise_t):
            input += np.array([  np.random.normal(loc=0.0, scale= 1, size= shape )    ])  * noise_r
    elif init == "glorot_uniform":
        for _ in range(noise_t):
            limit = np.sqrt(6 / (shape[1] + shape[1]))
            input += np.array([  np.random.uniform(low=-limit, high=limit, size=shape)    ])  * noise_r
    elif init == "glorot_normal":
        for _ in range(noise_t):
            input += np.array([  np.random.normal(loc=0.0, scale= np.sqrt(2 / (shape[1] + shape[1])) , size= shape )    ])  * noise_r
    elif init == "xavier_uniform":
        for _ in range(noise_t):
            limit = np.sqrt(6 / (shape[1] + shape[1]))
            input += np.array([  np.random.uniform(low=-limit, high=limit, size=shape)    ])  * noise_r
    elif init == "xavier_normal":
        for _ in range(noise_t):
            input += np.array([  np.random.normal(loc=0.0, scale= np.sqrt(2 / (shape[1] + shape[1])) , size= shape )    ])  * noise_r
    return input

# Function for sequentialize state, action and reward

In [None]:
def sequentialize(state_list, action_list, reward_list, time_size):

    sequentialized_state_list  = []
    sequentialized_action_list = []
    sequentialized_reward_list = []
    sequentialized_next_state_list  = []

    if time_size > len(state_list[:-1]):
        time_size = len(state_list[:-1])
    else:
      pass

    time_size_ = time_size 
    for i in range(len(action_list)):
        sequentialized_state_list.append( state_list[i ] )
        sequentialized_action_list.append( action_list[i:i+time_size_]  )
        sequentialized_reward_list.append( reward_list[ i + len(action_list[i:i+time_size_]) - 1 ]  )
        sequentialized_next_state_list.append(  state_list[ i + len(action_list[i:i+time_size_]) ]  )

    return sequentialized_state_list, sequentialized_action_list, sequentialized_reward_list, sequentialized_next_state_list




In [None]:
def save_performance_to_csv(performance_log, filename='performance_log.csv'):
    with open(filename, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['Episode', 'Summed_Reward'])
        writer.writerows(performance_log)

# Function for converting state into concatenated one-hot vector

In [None]:

def quantifying(array_size, init, interval, input):
    array = np.zeros(array_size)
    index = int( (input - init) // interval + 1)
    if index >= 0:
        array[ : index] = 1
    return array

def retreive_state(state, main_engine_fire_t, side_engine_fire_t):  # Reminder: change this for your specific task ⚠️⚠️⚠️
    state_0 = quantifying(100, -1.5, 0.03 * 1, state[0]) 
    state_1 = quantifying(100, -1.5, 0.03 * 1, state[1]) 
    state_2 = quantifying(100, -1.5, 0.03 * 1, state[2]) 
    state_3 = quantifying(100, -1.5, 0.03 * 1, state[3]) 
    state_4 = quantifying(100, -1, 0.02 * 1, state[4])   
    state_5 = quantifying(100, -1, 0.02 * 1, state[5])   
    state_6 = quantifying(100, 0, 0.01 * 1, state[6])    
    state_7 = quantifying(100, 0, 0.01 * 1, state[7])    
    state_8 = quantifying(100, 0, 1, main_engine_fire_t) 
    state_9 = quantifying(100, 0, 1, side_engine_fire_t) 
    # state_10 = np.mean(np.array(env.render(mode='rgb_array')), axis=2, keepdims=True).flatten() / 255  
    state_all    = np.atleast_2d(np.concatenate((state_0, state_1, state_2, state_3, state_4, state_5, state_6, state_7, state_8, state_9)))   
    return state_all


# Control board

Note of experience:

In some environments, it is crucial to increase your "max_steps_for_each_episode" so that your agent can "live long enough" to obatin some better rewards to gradually and heuristically learn better strategy.

Also, it's essential to choose between immediate rewards and summed rewards for training your agent. If the current state doesn't encapsulate all crucial past information, using immediate rewards is advisable. This approach prevents confusion caused by varying summed rewards for the same state.

As for reward shaping, it is recommended to increase your reward upper and decrease your reward lower bound.


In [None]:
game_name = "LunarLander-v2"             # Reminder: change this for your specific task ⚠️⚠️⚠️
max_steps_for_each_episode = 200         # Reminder: change this for your specific task ⚠️⚠️⚠️

state_size = 1000                        # Reminder: change this for your specific task ⚠️⚠️⚠️
hidden_size = 250                        # Reminder: change this for your specific task ⚠️⚠️⚠️ (should be dividable by num_heads below)
action_size = 4                          # Reminder: change this for your specific task ⚠️⚠️⚠️
time_size = 25                           # Reminder: change this for your specific task ⚠️⚠️⚠️
chunk_size = 25                          # Reminder: change this for your specific task ⚠️⚠️⚠️
reward_size = 250                        # Reminder: change this for your specific task ⚠️⚠️⚠️

ensemble_size = 10                       # Reminder: change this value to see the impact of MWM-SGD ◀️◀️◀️
neural_type = 'gru'                      # rnn gru lstm
num_layers = 2                           # Reminder: change this for your specific task ⚠️⚠️⚠️
num_heads  = 10                          # should be able to divide hidden_size
hidden_activation = 'tanh'               # relu leaky_relu sigmoid tanh
output_activation = 'sigmoid'            # relu leaky_relu sigmoid tanh
init = "random_normal"                   # random_normal random_uniform xavier_normal xavier_uniform  glorot_normal  glorot_uniform
loss = 'mean_squared_error'              # mean_squared_error  binary_crossentropy
opti = 'sgd'                             # adam sgd rmsprop
alpha = 0.1
epoch_for_learning = 10
batch_size = 1


noise_t = 1
noise_r = 0.1
beta = 0.1
epoch_for_deducing =  int(100/ensemble_size)


episode_for_training             = 100000
replay_range                     = 2                    # Reminder: change this for your specific task ⚠️⚠️⚠️
interval_for_initiating_learning = 50                   # Reminder: change this for your specific task ⚠️⚠️⚠️
EWC_lambda = 1                                          # Reminder: change this for your specific task ⚠️⚠️⚠️



episode_for_testing = 100                # Reminder: change this for your specific task ⚠️⚠️⚠️
render_for_human = False                 # Reminder: change this for your specific task ⚠️⚠️⚠️






mask_value = sys.maxsize
load_pre_model = False
suffix                      = f"ensemble={ensemble_size:05d}_learn={epoch_for_learning:05d}_interval={interval_for_initiating_learning:05d}_deduce={epoch_for_deducing:05d}"
directory                   = f'/content/deep_deducing/{game_name}/'
model_directory             = f'/content/deep_deducing/{game_name}/model_{suffix}'+'_%s.h5'
performance_log_directory   = f'/content/deep_deducing/{game_name}/performace_log_{suffix}.csv'

# Deducing > Learning > Testing


creating or loading models

In [None]:

if not os.path.exists(directory):
    os.makedirs(directory)

if load_pre_model == False:

    model_loader = []
    for _ in range(ensemble_size):
        model = build_model(state_size,
                            hidden_size,
                            action_size,
                            time_size,
                            reward_size,
                            neural_type,
                            num_layers,
                            num_heads,
                            hidden_activation,
                            output_activation,
                            init,
                            opti,
                            loss,
                            alpha,
                            mask_value)
        model.to(device)
        model_loader.append(model)

elif load_pre_model == True:

    model_loader = []
    for _ in range(ensemble_size):
        model = build_model(state_size,
                            hidden_size,
                            action_size,
                            time_size,
                            reward_size,
                            neural_type,
                            num_layers,
                            num_heads,
                            hidden_activation,
                            output_activation,
                            init,
                            opti,
                            loss,
                            alpha,
                            mask_value)
        model.to(device)
        model_loader.append(model)

    for i in range(len(model_loader)):
        model_loader[i].load_state_dict(torch.load( model_directory  % i ))


creating intial gradient matrices 

In [None]:
"""
storing previous models
"""
prev_model_loader = copy.deepcopy(model_loader)
prev_train_loader_size = 1

"""
calculating gradient matrix
"""
prev_gradient_matrix_loader = []
for model in model_loader:
    gradient_matrix = {name: torch.zeros_like(param) for name, param in model.named_parameters()}
    prev_gradient_matrix_loader.append( gradient_matrix )


creating desired reward

In [None]:
desired_reward = np.atleast_2d(np.ones(reward_size))
desired_reward = torch.tensor(desired_reward, dtype=torch.float).to(device)

putting all the previous works into play

In [None]:

performance_log = []
performance_log.append([0, 0])

episode_list = []
sequentialized_state_list = []
sequentialized_action_list = []
sequentialized_reward_list = []
sequentialized_next_state_list = []

env = gym.make(game_name)  
env._max_episode_steps = max_steps_for_each_episode

for training_episode in tqdm(range(episode_for_training)):

    state_list  = []
    action_list = []
    reward_list = []

    summed_reward = 0

    state = env.reset()

    main_engine_fire_t = 0 
    side_engine_fire_t = 0 
    state_all = retreive_state(state, main_engine_fire_t, side_engine_fire_t)
    state_list.append(state_all[0])
    state_all = torch.tensor(state_all, dtype=torch.float).to(device)

    for _ in range(sys.maxsize):

        action_value = initialize_input(init, noise_t, noise_r,(time_size, action_size) )
        action_value = torch.tensor(action_value, dtype=torch.float).to(device)
        action_value = update_action_value(epoch_for_deducing,
                          model_loader,
                          desired_reward,
                          state_all,
                          action_value,
                          beta)

        action = int(torch.argmax(action_value[0, 0]))
        state, reward, done,  info = env.step(action)
        summed_reward += reward




        reward = quantifying(reward_size, -400, (350 - (-400))/reward_size, reward)       # Reminder: change this for your specific task ⚠️⚠️⚠️




        action_list.append(np.eye(action_size)[action])
        reward_list.append(reward)




        if action == 2:                                     
            main_engine_fire_t += 1                         
        elif (action == 1) or (action == 3):                
            side_engine_fire_t += 1                         
        state_all = retreive_state(state, main_engine_fire_t, side_engine_fire_t)
        state_list.append(state_all[0])
        state_all = torch.tensor(state_all, dtype=torch.float).to(device)

        if done:
            print(f'Episode {training_episode+1}: Summed_Reward = {summed_reward}')
            performance_log.append([training_episode+1, summed_reward])
            # Save performance log to CSV
            save_performance_to_csv(performance_log, performance_log_directory)
            break




    env.close()




    """
    sequentializing and setting replay range
    """
    sequentialized_state_list_slice, sequentialized_action_list_slice, sequentialized_reward_list_slice, sequentialized_next_state_list_slice = sequentialize(state_list, action_list, reward_list, chunk_size )

    episode_list .extend( [ training_episode ] * len(sequentialized_state_list_slice))
    sequentialized_state_list       .extend( sequentialized_state_list_slice)
    sequentialized_action_list      .extend( sequentialized_action_list_slice)
    sequentialized_reward_list      .extend( sequentialized_reward_list_slice)
    sequentialized_next_state_list  .extend( sequentialized_next_state_list_slice)




    if (training_episode+1) % interval_for_initiating_learning == 0:




        episode_list                   = [i for i, a, b, c, d in zip(episode_list, sequentialized_state_list, sequentialized_action_list, sequentialized_reward_list, sequentialized_next_state_list) if ((training_episode - replay_range * interval_for_initiating_learning + 1) <= i <= training_episode)     ]
        sequentialized_state_list      = [a for i, a, b, c, d in zip(episode_list, sequentialized_state_list, sequentialized_action_list, sequentialized_reward_list, sequentialized_next_state_list) if ((training_episode - replay_range * interval_for_initiating_learning + 1) <= i <= training_episode)     ]
        sequentialized_action_list     = [b for i, a, b, c, d in zip(episode_list, sequentialized_state_list, sequentialized_action_list, sequentialized_reward_list, sequentialized_next_state_list) if ((training_episode - replay_range * interval_for_initiating_learning + 1) <= i <= training_episode)     ]
        sequentialized_reward_list     = [c for i, a, b, c, d in zip(episode_list, sequentialized_state_list, sequentialized_action_list, sequentialized_reward_list, sequentialized_next_state_list) if ((training_episode - replay_range * interval_for_initiating_learning + 1) <= i <= training_episode)     ]
        sequentialized_next_state_list = [d for i, a, b, c, d in zip(episode_list, sequentialized_state_list, sequentialized_action_list, sequentialized_reward_list, sequentialized_next_state_list) if ((training_episode - replay_range * interval_for_initiating_learning + 1) <= i <= training_episode)     ]




        """
        masking and uploading data
        """

        state_tensor      = torch.stack( [torch.tensor(arr) for arr in sequentialized_state_list]               ).float().to(device)
        action_tensor     = torch.stack( [F.pad(torch.tensor(arr), 
                                                pad=(0, 0, 0, time_size - torch.tensor(arr).size(0)), 
                                                mode='constant', 
                                                value= mask_value) for arr in sequentialized_action_list]       ).float().to(device)
        reward_tensor     = torch.stack( [torch.tensor(arr) for arr in sequentialized_reward_list]              ).float().to(device)
        next_state_tensor = torch.stack( [torch.tensor(arr) for arr in sequentialized_next_state_list]          ).float().to(device)

        row_mask     = torch.all(action_tensor == mask_value, dim = -1)
        padding_mask = torch.zeros_like(action_tensor, dtype = torch.bool)
        padding_mask[row_mask] = True
        padding_mask = padding_mask.to(device)

        dataset     = TensorDataset(state_tensor, action_tensor, reward_tensor, next_state_tensor, padding_mask)
        data_loader = DataLoader(dataset, batch_size = batch_size, shuffle=True)






        """
        learning
        """
        gradient_matrix_loader_ = []
        for i, model in enumerate(model_loader):
            model, gradient_matrix_, train_loader_size_= update_model(batch_size, epoch_for_learning, model, data_loader, dataset, prev_model_loader[i], prev_gradient_matrix_loader[i], prev_train_loader_size, EWC_lambda)
            gradient_matrix_loader_.append(gradient_matrix_)
            model_loader[i] = model
        prev_gradient_matrix_loader = gradient_matrix_loader_
        prev_model_loader           = copy.deepcopy(model_loader)
        prev_train_loader_size      = train_loader_size_




        """
        saving
        """
        for i in range(len(model_loader)):
            torch.save(model_loader[i].state_dict(), model_directory % i)




        gc.collect()
        torch.cuda.empty_cache()



# Deducing (final)

loading models

In [None]:
model_loader = []
for _ in range(ensemble_size):
    model = build_model(state_size,
                        hidden_size,
                        action_size,
                        time_size,  
                        reward_size,
                        neural_type,
                        num_layers,
                        num_heads,
                        hidden_activation,
                        output_activation,
                        init,
                        opti,
                        loss,
                        alpha,
                        mask_value)
    model.to(device)
    model_loader.append(model)

for i in range(len(model_loader)):
    model_loader[i].load_state_dict(torch.load(model_directory % i))

creating desired reward ... again

In [None]:
desired_reward = np.atleast_2d(np.ones(reward_size))
desired_reward = torch.tensor(desired_reward, dtype=torch.float).to(device)

putting all the previous works into play ... again

but this time the agent does not learn

In [None]:
summed_reward_sum = 0

for testing_episode in range(episode_for_testing):

    summed_reward = 0

    if render_for_human == True:
        env = gym.make( game_name, render_mode="human")   
    else:
        env = gym.make( game_name)                    
    env._max_episode_steps = max_steps_for_each_episode

    state                  = env.reset()
    if render_for_human == True:
        env.render()

    main_engine_fire_t = 0 
    side_engine_fire_t = 0 
    state_all = retreive_state(state, main_engine_fire_t, side_engine_fire_t)
    state_all = torch.tensor(state_all, dtype=torch.float).to(device)

    for _ in tqdm(range(max_steps_for_each_episode)):

        action_value = initialize_input(init, noise_t, noise_r, (time_size, action_size) )
        action_value = torch.tensor(action_value, dtype=torch.float).to(device)
        action_value = update_action_value(epoch_for_deducing,
                          model_loader,
                          desired_reward,
                          state_all,
                          action_value,
                          beta)

        action = int(torch.argmax(action_value[0, 0]))
        state, reward, done,  info = env.step(action)
        if render_for_human == True:
            env.render()
        summed_reward += reward

        if done:
            break

        if action == 2:                                  
            main_engine_fire_t += 1                      
        elif (action == 1) or (action == 3):             
            side_engine_fire_t += 1                      
        state_all = retreive_state(state, main_engine_fire_t, side_engine_fire_t)
        state_all = torch.tensor(state_all, dtype=torch.float).to(device)


    env.close()

    print("Summed reward:", summed_reward)
    print(f'Episode: {testing_episode + 1}')
    print('Everaged reward:')
    summed_reward_sum += summed_reward
    print(summed_reward_sum/(testing_episode + 1))

