<a href="https://colab.research.google.com/github/TsienJin/SC3000-Group-Project/blob/main/TJ_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
import random
import sys
import matplotlib

from copy import deepcopy
import torch

import os

import matplotlib
from collections import namedtuple
import numpy as np
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F

import random
from collections import deque

from collections import namedtuple

import gym


is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display


In [3]:
Observation = namedtuple("observation", ("cartPos", "cartVel", "poleAngle", "poleVel"))

Environment = namedtuple("environment", ("observation", "reward", "isDone", "isTruncated"))

# Referenced as "experience" in the DQN paper
Record = namedtuple("record", ("state", "action", "nextState", "reward"))


In [19]:

class ParseEnvironment:
    def __init__(self, environment:[float], reward:float=None, isDone:bool=None, isTruncated:bool=None, *args):
        self.cartPos = environment[0]
        self.cartVel = environment[1]
        self.poleAngle = environment[2]
        self.poleVel = environment[3]

        self.reward = reward
        self.isDone = isDone
        self.isTruncated = isTruncated

    def __str__(self):
        return f"""cPOS: {self.cartPos}\ncVEL: {self.cartVel}\npANG: {self.poleAngle}\npVEL: {self.poleVel}\nreward: {self.reward}\nisDone: {self.isDone}\nisTruncated: {self.isTruncated}"""

    def __repr__(self):
        return self.__str__()


    def toObservation(self) -> Observation:
        return Observation(self.cartPos, self.cartVel, self.poleAngle, self.poleVel)

    def toTensor(self):
        return torch.FloatTensor((self.cartPos, self.cartVel, self.poleAngle, self.poleVel))

    def toFloat32(self) -> [np.float32]:
        return np.array([self.cartPos, self.cartVel, self.poleAngle, self.poleVel], type=np.float32)

    def toEnvironment(self) -> Environment:
        return Environment(self.toObservation(), self.reward, self.isDone, self.isTruncated)


class ParseRecord:
    def __init__(self, state: ParseEnvironment, action: int, nextState: ParseEnvironment, reward: float):
        assert action in [0, 1]
        self.state = state
        self.action = action
        self.nextState = nextState
        self.reward = reward

    def toRecord(self) -> Record:
        return Record(self.state, self.action, self.nextState, self.reward)


In [7]:

class Memory:
    def __init__(self, maxCapacity:int=10000):
        self.cap = maxCapacity
        self.memory = deque([], maxlen=maxCapacity)

    def __len__(self) -> int:
        return len(self.memory)

    def __str__(self) -> str:
        return f"""Memory() capacity [{self.__len__}/{self.cap}]"""

    def push(self, record:ParseRecord) -> None:
        self.memory.append(record)

    def sample(self, size:int) -> [ParseRecord]:
        assert size>0
        return random.sample(self.memory, size)


In [8]:

class DQN(nn.Module):

    def __init__(self,
                 n_obsv: int, n_actions: int, n_layer: int = 1, n_layerSize: int = 6,
                 learningRate: float = 0.0001, gamma: float = 0.95,
                 expDecay: float = 0.999, expMin: float = 0.001, expMax: float = 1.0,
                 _device: str = "cpu",
                 memory: Memory = Memory()):
        """

        :param n_obsv: size of observation space
        :param n_actions: size of action space
        :param n_layer: number of hidden layers
        :param n_layerSize: number of neurons per hidden layer
        :param learningRate:
        :param gamma: discount for future values
        :param expDecay:
        :param expMin:
        :param expMax:
        :param _device: defaults to "cpu"
        """

        super(DQN, self).__init__()

        # Ensuring that values are proper
        assert n_layer >= 0
        assert n_obsv > 0
        assert n_actions > 0
        assert n_layerSize > 0
        assert 0 < learningRate < 1
        assert 0 < gamma < 1
        assert 0 < expDecay < 1
        assert 0 < expMin < 1
        assert 0 < expMax <= 1

        self.learningRate = learningRate
        self.gamma = gamma
        self.expDecay = expDecay
        self.expMin = expMin
        self.expMax = expMax

        self.n_obsv = n_obsv
        self.n_actions = n_actions
        self.n_layer = n_layer
        self.n_layerSize = n_layerSize

        self.memory = memory

        self.layers = nn.ModuleList(self.__createLayers())
        self.optim = optim.Adam(self.parameters(), lr=self.learningRate)
        self.crit = torch.nn.SmoothL1Loss()  # Huber loss

        self.to(_device)

    def __createLayers(self):
        """
        Private method to generate neural network given the specified params in __init__()
        :return: [nn.Linear()]
        """
        # init layers starting with input shape to layer size
        layers = [nn.Linear(self.n_obsv, self.n_layerSize)]

        # creates more layers with specified layer size
        for _ in range(self.n_layer):
            layers.append(nn.Linear(self.n_layerSize, self.n_layerSize))

        # adds final output layer
        layers.append(nn.Linear(self.n_layerSize, self.n_actions))

        return layers

    def forward(self, x:torch.float32) -> torch.float32:
        """
        Processes the given state and returns a tensor with qValues for actions
        :param x: <torch.tensor> with shape (1,4) and type float32 | State of current observation as a tensor
        :return: <torch.tensor> with shape (1,2) | Tensor of qValues
        """
        assert (x.dim() == torch.randn(4).dim())
        for layer in self.layers:
            x = F.relu(layer(x))
        return x

In [11]:

class Agent:
    # Number of episodes
    MAX_EP = 1_000_000

    # Q Value vals
    DISCOUNT = 0.9
    LEARNING_RATE = 0.001

    # Epsilon GREEDY vals
    EPS = 0.9999
    EPS_DECAY = 0.999
    EPS_MIN = 0.05
    EPS_MAX = 1.0

    # Memory vals
    MEM_SIZE = 50_000
    MIN_MEM_SIZE = 1_000
    MEM_BATCH = 200
    TARGET_UPDATE_FREQ = 75

    def __init__(self, maxEp:int=10_000, env=gym.make("CartPole-v1")):

        # Bootstrapping to maintain stability of prediction
        self.memory = Memory(maxCapacity=self.MEM_SIZE)
        self.model = DQN(n_obsv=4, n_actions=2, n_layer=10, n_layerSize=10,learningRate=self.LEARNING_RATE, memory=self.memory)  # updates every iteration
        self.targetModel = deepcopy(self.model)  # updates only once threshold has been reached

        # Setting individual stats for the environment to run
        self.maxEpisode = maxEp
        self.env = env
        self.episodeCounter = 0
        self.totalReward = 0

    def __printStats(self):
        print(f"EP: {self.EPS:.3f} | MEM: {len(self.memory)} | EP: {self.episodeCounter} | AVG: {self.totalReward/self.episodeCounter:.5f}")

    def predict(self, environment:ParseEnvironment) -> int:
        if self.EPS < self.EPS_MIN:
            res = self.targetModel.forward(environment.toTensor())
            return torch.argmax(res).detach().numpy()
        else:
            # print("USING RANDOM")
            self.EPS = self.EPS * self.EPS_DECAY
            return random.randint(0,1)


    def getMaxQ(self, environment:ParseEnvironment) -> torch.tensor:
        res = self.targetModel.forward(environment.toTensor())
        return res.clone().detach().numpy()


    def train(self):
        if len(self.memory) < self.MIN_MEM_SIZE:
            return

        batch = self.memory.sample(size=self.MEM_BATCH)

        allStates = np.array([record.state for record in batch])  # need to check if this works; no intellisense
        predicted = [self.getMaxQ(record.state) for record in batch]
        predictedNew = [self.getMaxQ(record.nextState) for record in batch]

        oldValsToFit = []
        valsToFit = []

        for index, env in enumerate(batch):
            maxFutureQ = np.max(self.getMaxQ(env.nextState))

            if not env.state.isDone:
                newQ = env.reward + self.DISCOUNT * maxFutureQ
            else:
                newQ = env.reward

            oldFit = self.getMaxQ(env.state)
            toFit = deepcopy(oldFit)
            toFit[env.action] = (1-self.LEARNING_RATE)*oldFit[env.action] + self.LEARNING_RATE * newQ


            oldValsToFit.append(oldFit)
            valsToFit.append(toFit)

        loss = self.model.crit(torch.tensor(np.array(oldValsToFit), requires_grad=True), torch.tensor(np.array(valsToFit), requires_grad=True))
        self.model.optim.zero_grad()
        loss.backward()
        self.model.optim.step()

        if self.episodeCounter % self.TARGET_UPDATE_FREQ == 0:
            self.targetModel.load_state_dict(self.model.state_dict())


    def run(self):
        while self.episodeCounter < self.maxEpisode:
            self.episodeCounter += 1
            cReward = 0.0
            curEnv = ParseEnvironment(self.env.reset()[0], reward=1.0, isDone=False, isTruncated=False)

            while curEnv.isDone is not True:

                # interact with env
                action = self.predict(curEnv)
                prevEnv = curEnv
                curEnv = ParseEnvironment(*self.env.step(action))

                # save record of what just happened
                thisRecord = ParseRecord(prevEnv, action, curEnv, curEnv.reward)
                self.memory.push(thisRecord)

                # # train model
                self.train()

                # # update local variables
                cReward += curEnv.reward
                self.totalReward += curEnv.reward
                self.__printStats()


  deprecation(
  deprecation(


In [20]:
agent = Agent(maxEp=1000)
agent.run()

IndexError: ignored