In [4]:
# DQN
import keras
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import gymnasium as gym

%matplotlib inline

In [5]:
'''
use the dense to generate the action
'''
class Network():
    def __init__(self, actionNum, stateShape, learningRate):

        self.model = keras.Sequential([
            keras.layers.Dense(32, input_shape = (stateShape, )),
            keras.layers.Dense(64),
            keras.layers.LeakyReLU(),
            keras.layers.Dense(actionNum),
        ])

        self.model.compile(
            loss = 'mse',
            optimizer = keras.optimizers.Adam(learningRate)
        )
        
    def Call(self, state):
        return self.model(state, training = False)

    def CopyVariable(self, network):
        '''
        param:
            network:    the other network model
                type:   Sequencial
        func:
            Copy the variable of other network model to self 
        '''
        for selfLayer, otherLayer in zip(self.model.layers, network.layers):
            selfLayer.set_weights(otherLayer.get_weights())

class DQN(keras.Model):
    def __init__(self, actionNum, stateShape, epsilon, gamma, batchSize, poolSize, learningRate, environment, startCount, learningCut, updateTargetCut):
        '''
        param:
            actionNum:          the number of action
            stateShape:         the shape of state
            epsilon:            the parameter which is used in e-Greedy
            gamma:              used in the loss of Q Max
            batchSize:          the batch size
            poolSize:           the size of memory pool
            learningRate:       for optimize the network
            environment:        the game which network will learn to play
            startCount:         the step when network start to update
            learningCut:        after one episode's step reach to the startCount, every learningCut we will update the parameter of network
            updateTargetCut:  when reach this number, copy the weight of ChooseActionNetwork to the TargetNetwork
        '''
        super().__init__()
    
        self.environment = environment

        self.actionNum = actionNum
        self.stateShape = stateShape
        self.epsilon = epsilon
        self.gamma = gamma
        self.batchSize = batchSize
        self.poolSize = poolSize

        self.chooseActionNet = Network(actionNum, stateShape, learningRate)
        self.targetNetwork = Network(actionNum, stateShape, learningRate)

        self.memoryPool = {
            'state': [],
            'action': [],
            'reward': [],
            'nextState': [],
        }

        # self.memoryPool = []

        self.memoryLength = 0

        self.startCount = startCount
        self.learningCut = learningCut
        self.updateTargetCut = updateTargetCut

    def ChooseAction(self, state):
        if np.random.uniform() > self.epsilon:
            return np.random.randint(self.actionNum)

        predictValue = self.chooseActionNet.Call(state)
        return np.argmax(predictValue)

    def StoreMemory(self, lastState, action, reward, nextState):
        self.memoryPool['state'].append(lastState)
        self.memoryPool['action'].append(action)
        self.memoryPool['reward'].append(reward)
        self.memoryPool['nextState'].append(nextState)

        self.memoryLength += 1

        if self.memoryLength > self.poolSize:
            self.memoryPool['state'].pop(0)
            self.memoryPool['action'].pop(0)
            self.memoryPool['reward'].pop(0)
            self.memoryPool['nextState'].pop(0)

        # self.memoryPool.append(
        #     (lastState, action, reward, nextState)
        # )

    def ChooseMemory(self):
        '''
        func:
            Select a batchSize of observations from the memoryPool
        return:
            laststate
            action
            reward
            nextState
        '''

        # 生成需要的 index
        index = np.random.sample(self.poolSize, size = self.batchSize, replace = False)

        laststate = []
        action = []
        reward = []
        nextState = []

        for idx in index:
            laststate.append(self.memoryPool['state'][idx])
            action.append(self.memoryPool['action'][idx])
            reward.append(self.memoryPool['reward'][idx])
            nextState.append(self.memoryPool['nextState'][idx])

        return np.array(laststate), np.array(action), np.array(reward), np.array(nextState)

    def UpdateNetwork(self):
        '''
        param:

        func:
            use the memory to make the batch to feed the network
        '''

        lastState, action, reward, nextState = self.ChooseMemory()
        
        evaluation = self.chooseActionNet.Call(lastState)
        target = self.targetNetwork.Call(nextState)

        target = reward + self.gamma * target.numpy.max()

        self.chooseActionNet.model.fit(evaluation, target)


    def CopyNetwork(self):
        self.targetNetwork.CopyVariable(self.chooseActionNet.model)

    def Train(self):
        isDone = False
        lastState = self.environment.reset()
        learningCount = 0
        afterCount = 0

        while not isDone:

            action = self.ChooseAction(lastState)

            nextState, reward, isDone, _, _ = self.environment.step(action)

            self.StoreMemory(lastState, action, reward, nextState)            
            
            if learningCount > self.startCount:
                
                if afterCount % self.learningCut == 0:
                    self.UpdateNetwork()
                
                if afterCount % self.updateTargetCut == 0:
                    self.CopyNetwork()

                afterCount += 1

            lastState = nextState
            learningCount += 1

        


In [13]:
env = gym.make('CartPole-v1')
ACTIONNUM = env.action_space.n
STATESHAPE = env.observation_space.shape[0]
EPSILON = 0.7
BATCHSIZE = 64
POOLSIZE = 2000
LEARNINGRATE = 0.3