In [10]:
#https://www.youtube.com/watch?v=ewRw996uevM&list=PLZbbT5o_s2xoWNVdDudn51XM8lOuZ_Njv&index=18
#https://www.youtube.com/watch?v=0bt0SjbS3xc&list=PLZbbT5o_s2xoWNVdDudn51XM8lOuZ_Njv&index=13

In [11]:
import numpy as np
from cartpole1 import QLearnCartPoleSolver
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import torch.nn.functional as F
import gym
import tensorflow as tf
import numpy as np
from tensorflow import keras
import gym

import torch
import torch.nn as nn
import random
import torchvision

def get_device():
    if torch.cuda.is_available(): 
        dev = "cuda:0" 
    else: 
        dev = "cpu" 
    return torch.device(dev)

device = get_device()


In [12]:
def get_model(state_shape, action_shape):
    learning_rate = 0.001
    init = tf.keras.initializers.HeUniform()
    model = keras.Sequential()
    model.add(keras.layers.Dense(24, input_shape=state_shape, activation='relu', kernel_initializer=init))
    model.add(keras.layers.Dense(12, activation='relu', kernel_initializer=init))
    model.add(keras.layers.Dense(action_shape, activation='linear', kernel_initializer=init))
    model.compile(loss=tf.keras.losses.Huber(), optimizer=tf.keras.optimizers.Adam(lr=learning_rate), metrics=['accuracy'])
    return model

In [13]:


class DQNSolver(QLearnCartPoleSolver):

    def __init__(self, action_space, state_space, epsilon_decay_rate=0.995):
        super().__init__(env, episodes=1000, min_epsilon=0.001)
        self.memory = deque([], maxlen=100000)
        self.epsilon_decay_rate = epsilon_decay_rate
        self.epsilon = 1
        self.batch_size = 32
        self.lr = 0.01
        self.model = get_model(state_space, action_space)

    def action(self, state, epsilon):
        if np.random.random() <= epsilon: 
            return self.env.action_space.sample()
        else:
            np.argmax(self.model.predict(state).flatten())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))


    def updated_q_value(self, state, action, reward, next_state):
        return (reward + self.discount * np.argmax(self.model.predict(next_state)))

    def preprocess_state(self, state):
        return np.reshape(state, [1, 4])
    
    def set_weights(self, weight):
        self.model.set_weights(weight)

    def get_weights(self):
        return self.model.get_weights()

    def replay(self):
        if self.batch_size >= len(self.memory):
            return
        current = []
        target = []

        batch = random.sample(self.memory, self.batch_size)
        current_qs_list = self.model.f(torch.tensor(batch[0]).to(self.device))
        for index, state, action, reward, next_state, done in enumerate(batch):
            current_q = current_qs_list[index]
            current_q[action] = self.updated_q_value(state, action, reward, next_state)
            current.append(current_q)
            target.append(state)
        loss = F.mse_loss(current - target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    

env = gym.make('CartPole-v1')


model = DQNSolver(state_space = env.observation_space.shape, action_space= env.action_space.n)
target_model = DQNSolver(state_space = env.observation_space.shape, action_space= env.action_space.n)
target_model.set_weights(model.get_weights())





In [14]:
epsilon = 1 
max_epsilon = 1 
min_epsilon = 0.01 
decay = 0.01

memory = deque([], maxlen=100000)

In [15]:
def remember(memory, state, action, reward, next_state, done):
        memory.append((state, action, reward, next_state, done))


def get_action(env,model, state, epsilon):
    random_number = np.random.rand()
    if random_number <= epsilon:
        return random.randrange(env.action_space.n)
    else:
        encoded = state
        encoded_reshaped = encoded.reshape([1, encoded.shape[0]])
        predicted = model.model.predict(encoded_reshaped).flatten()
        return np.argmax(predicted)


def replay(env, replay_memory, model, target_model, done):
    learning_rate = 0.7 # Learning rate
    discount_factor = 0.618

    MIN_REPLAY_SIZE = 1000
    if len(replay_memory) < MIN_REPLAY_SIZE:
        return

    batch_size = 64 * 2
    mini_batch = random.sample(replay_memory, batch_size)
    current_states = np.array([transition[0] for transition in mini_batch])
    current_qs_list = model.model.predict(current_states)
    new_current_states = np.array([transition[3] for transition in mini_batch])
    future_qs_list = target_model.model.predict(new_current_states)

    X = []
    Y = []
    for index, (observation, action, reward, new_observation, done) in enumerate(mini_batch):
        if not done:
            max_future_q = reward + discount_factor * np.max(future_qs_list[index])
        else:
            max_future_q = reward

        current_qs = current_qs_list[index]
        current_qs[action] = (1 - learning_rate) * current_qs[action] + learning_rate * max_future_q

        X.append(observation)
        Y.append(current_qs)
    model.model.fit(np.array(X), np.array(Y), batch_size=batch_size, verbose=0, shuffle=True)

In [17]:
def train(episodes, env, memory, model, target_model, epsilon):
    for episode in range(episodes):
        done = False
        reward_current_ep = 0
        steps = 0
        state = env.reset()
        while not done:
            steps += 1
            env.render()

            action = get_action(env, model, state, epsilon)
            next_state, reward, done, _ = env.step(action) 
            remember(memory, state, action, reward, next_state, done)
            state = next_state
            reward_current_ep += reward
            replay(env, memory, model, target_model, done)
            if steps >= 100:
                print('Copying main network weights to the target network weights')
                target_model.set_weights(model.get_weights())
                steps = 0
                break
        print(f"score {reward_current_ep} for ep {episode+1}")
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay * episode)
    env.close()


def run (env, model, epsilon):
    done = False
    reward_current_ep = 0
    steps = 0
    state = env.reset()
    while not done:
        steps += 1
        env.render()
        action = get_action(env, model, state, epsilon)
        next_state, reward, done, _ = env.step(action) 
        state = next_state
        reward_current_ep += reward
    env.close()



train(100, env, memory, model, target_model, epsilon)
run(env, model, epsilon)


score 14.0 for ep 1
score 8.0 for ep 2
score 28.0 for ep 3
score 12.0 for ep 4
score 30.0 for ep 5
score 44.0 for ep 6
score 21.0 for ep 7
score 25.0 for ep 8
score 22.0 for ep 9
score 11.0 for ep 10
score 20.0 for ep 11
score 39.0 for ep 12
score 31.0 for ep 13
score 15.0 for ep 14
score 18.0 for ep 15
score 23.0 for ep 16
score 11.0 for ep 17
score 29.0 for ep 18
score 12.0 for ep 19
score 18.0 for ep 20
score 26.0 for ep 21
score 25.0 for ep 22
score 9.0 for ep 23
score 22.0 for ep 24
score 18.0 for ep 25
score 33.0 for ep 26
score 19.0 for ep 27
score 15.0 for ep 28
score 13.0 for ep 29
score 14.0 for ep 30
score 17.0 for ep 31
score 14.0 for ep 32
score 14.0 for ep 33
score 13.0 for ep 34
score 11.0 for ep 35
score 24.0 for ep 36
score 9.0 for ep 37
score 22.0 for ep 38
score 29.0 for ep 39
score 11.0 for ep 40
score 14.0 for ep 41
score 14.0 for ep 42
score 11.0 for ep 43
score 22.0 for ep 44
score 13.0 for ep 45
score 25.0 for ep 46
score 8.0 for ep 47
score 12.0 for ep 48
score

KeyboardInterrupt: 