In [4]:
from obstacle_tower_env import ObstacleTowerEnv
%matplotlib inline
from matplotlib import pyplot as plt


In [5]:
import torch
from torch import nn,optim
import numpy as np
import torch.nn.functional as F


In [6]:



class EncoderNetwork(nn.Module):
    def __init__(self):
        super(EncoderNetwork,self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3,out_channels=32,kernel_size=(3,3),stride=2,padding=1)
        self.conv2 = nn.Conv2d(32,32,kernel_size=(3,3),stride=2,padding=1)
        self.conv3 = nn.Conv2d(32,32,kernel_size=(3,3),stride=2,padding=1)
        self.conv4 = nn.Conv2d(32,32,kernel_size=(3,3),stride=2,padding=1)

    def forward(self,x):
        x = F.normalize(x)
        y = F.elu(self.conv1(x))
        y = F.elu(self.conv2(y))
        y = F.elu(self.conv3(y))
        y = F.elu(self.conv4(y))
        print("dimesions of y in encoder is ",y)
        y= y.flatten(start_dim=1)
    


class InverseNetowrk(nn.Module):
    def __init__(self,no_actions):
        super(InverseNetowrk,self).__init__()
        self.linear_layerOne = nn.Linear(576,100)
        self.linear_layerTwo = nn.Linear(100,no_actions)

    def forward(self,state1,state2):
        x = torch.cat((state1,state2),dim=1)
        y = F.relu(self.linear_layerOne(x))
        y = self.linear_layerTwo(y)
        y = F.softmax(y,dim=1)
        
        return y

class ForwardNetwork(nn.Module):
    def __init__(self,no_actions):
        super(ForwardNetwork,self).__init__()
        self.linear_layerOne = nn.Linear(576,256)
        self.linear_layerTwo = nn.Linear(256,no_actions)


    def forward(self,state,action):
        action_ = torch.zeros(action.shape[0],12)
        indices = torch.stack((torch.arange(action.shape[0]),action.squeeze()),dim=0)
        indices = indices.tolist()
        action_[indices] = 1.
        x = torch.cat((state,action_),dim=1)
        y = F.relu(self.linear_layerOne(x))
        y = self.linear_layerTwo(y)

        return y

class NeuralNetwork(nn.Module):
    def __init__(self,no_actions):
        super(NeuralNetwork,self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3,out_channels=32,kernel_size=(3,3),stride=2,padding=1)
        self.conv2 = nn.Conv2d(32,32,kernel_size=(3,3),stride=2,padding=1)
        self.conv3 = nn.Conv2d(32,32,kernel_size=(3,3),stride=2,padding=1)
        self.conv4 = nn.Conv2d(32,32,kernel_size=(3,3),stride=2,padding=1)
        self.linear_layerOne = nn.Linear(288,100)
        self.linear_layerTwo = nn.Linear(100,no_actions)

    def forward(self,x):
        x = F.normalize(x)
        y = F.relu(self.conv1(x))
        y = F.relu(self.conv2(y))
        y = F.relu(self.conv3(y))
        y = F.relu(self.conv4(y))
        y= y.flatten(start_dim=2)
        y = y.view(y.shape[0],-1,32)
        y= y.flatten(start_dim=1)
        y = F.relu(self.linear_layerOne(y))
        y = self.linear_layerTwo(y)
        return y   

In [None]:
from random import shuffle
from secrets import choice

class ExperienceRelayMemory:
    def __init__(self,N=500,batch_size=100):
        self.N = N
        self.batch_size =batch_size
        self.memory = []
        self.counter = 0

    def add_memory(self,state1,action,reward,state2):
        self.counter += 1
        if self.counter % 500 == 0:
            self.shuffle_memory()

        if len(self.memory) < self.N:
            self.memory.append((state1,action,reward,state2))
        else:
            rand_idx  = np.random.randint(0,self.N-1)
            self.memory[rand_idx] = (state1,action,reward,state2)

    def shuffle_memory(self):
        shuffle(self.memory)

    def get_batch(self):
        if len(self.memory) < self.batch_size:
            batch_size = len(self.memory)
        else:
            batch_size = self.batch_size
        if len(self.memory)< 1:
            print("Errror Memory is empty")
            return None

        index =  np.random.choice(np.arange(len(self.memory)),batch_size,replace=False)
        batch = [self.memory[i] for i in index ]
        state1_batch = torch.stack([x[0].squeeze(dim=0) for x in batch],dim=0)
        action_batch = torch.Tensor([x[1] for x in batch]).long()
        reward_batch = torch.Tensor([x[2] for x in batch])
        state2_batch = torch.stack([x[0].squeeze(dim=0) for x in batch],dim=0)

        return state1_batch,action_batch,reward_batch,state2_batch

        
        
        
        


        

In [None]:
params = {
    'batch_size':150,
    'beta':0.2,
    'lambda':0.1,
    'eta' : 1.0,
    'gamma':0.2,
    'max_episode_length': 100,
    'min_progress':15,
    'action_repeats':6,
    'frames_per_second':3
}

In [None]:
replay = ExperienceRelayMemory(N=1000,batch_size=params['batch_size'])
Qmodel = NeuralNetwork()
encoder = EncoderNetwork()
forward_model = ForwardNetwork()
inverse_model =InverseNetowrk()

forward_loss = nn.MSELoss(reduction='none')
inverse_loss = nn.CrossEntropyLoss(reduction='none')
qloss = nn.MSELoss()

all_model_params = list(Qmodel.parameters()+list(encoder.parameters()))
all_model_params += list(forward_model.parameters()) + list(inverse_model.parameters())


opt = optim.Adam(lr=0.001,params=all_model_params)



In [None]:
def loss_function(q_loss,inverse_loss,forward_loss):
    loss_ = (1-params['beta']) * inverse_loss
    loss_ += params['beta'] * forward_loss
    loss_ = loss_.sum() / loss_.flatten().shape[0]
    loss = loss_ + params['lambda']* q_loss

    return loss