
# Imports

In [1]:
import random
import time
import os

import tensorflow as tf
from tensorflow.python.client import device_lib
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Input
from keras.layers import Conv2D, MaxPooling2D
from keras import regularizers
from tensorflow.keras import backend as kb
from keras import optimizers
import gym

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

tf.debugging.set_log_device_placement(True)

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
Using TensorFlow backend.


# Setting up env

In [54]:
env = gym.make('MountainCar-v0')
env.reset()

array([-0.57776089,  0.        ])

# Playing a random game

In [3]:
def play_a_random_game_first():
    for step_index in range(goal_steps):
        #env.render()
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        print("Step {}:".format(step_index))
        print("action: {}".format(action))
        print("observation: {}".format(observation))
        print("reward: {}".format(reward))
        print("done: {}".format(done))
        print("info: {}".format(info))
        if done:
            break
    env.reset()

In [4]:
play_a_random_game_first()


Step 0:
action: 1
observation: [-4.81833566e-01 -3.14762199e-04]
reward: -1.0
done: False
info: {}
Step 1:
action: 0
observation: [-0.48346075 -0.00162718]
reward: -1.0
done: False
info: {}
Step 2:
action: 1
observation: [-0.48538824 -0.00192749]
reward: -1.0
done: False
info: {}
Step 3:
action: 2
observation: [-0.48660168 -0.00121344]
reward: -1.0
done: False
info: {}
Step 4:
action: 2
observation: [-0.48709203 -0.00049035]
reward: -1.0
done: False
info: {}
Step 5:
action: 1
observation: [-0.48785564 -0.0007636 ]
reward: -1.0
done: False
info: {}
Step 6:
action: 0
observation: [-0.4898868  -0.00203116]
reward: -1.0
done: False
info: {}
Step 7:
action: 0
observation: [-0.49317038 -0.00328357]
reward: -1.0
done: False
info: {}
Step 8:
action: 2
observation: [-0.49568185 -0.00251147]
reward: -1.0
done: False
info: {}
Step 9:
action: 0
observation: [-0.49940245 -0.0037206 ]
reward: -1.0
done: False
info: {}
Step 10:
action: 1
observation: [-0.50330436 -0.00390192]
reward: -1.0
done: False

# Run with GPU/CPU

In [83]:
def run(device, function, repeats, **kwargs):
    """
    Run a given function on the specified device with the provided keyword arguments
    """
    with tf.device(device):
        t0 = time.time()

        # Run function with all additional keyword arguments provided
        model = function(**kwargs)

        t = time.time() - t0
    return model


# Might be different on other pc
cpu = '/device:CPU:0'
gpu = '/device:GPU:0'

local_device_protos = device_lib.list_local_devices()
print([x.name for x in local_device_protos])
#tf.device('/device:GPU:0')

['/device:CPU:0', '/device:XLA_GPU:0', '/device:XLA_CPU:0']


# Model

In [84]:
def build_model(input_size, output_size):
    ## Building the nnet that approximates q 
    model = Sequential()
    model.add(Dense(64, input_dim = input_size , activation = 'relu'))
    model.add(Dense(32, activation = 'relu'))
    model.add(Dense(output_size, activation = 'linear'))
    model.compile(optimizer=optimizers.Adam(), loss = 'mse')

    return model

In [85]:
print(env.observation_space.shape[0])
print(env.action_space.n)
model = build_model(env.observation_space.shape[0], env.action_space.n)

2
3


# Replay

In [102]:
def replay(replay_model, replay_memory, batch_size=32):
    # choose randomly from replay
    batch = np.random.choice(replay_memory, batch_size, replace=True)
    
    # create seperate list for state, action, reward, next action and done
    s_l = np.array(list(map(lambda x: x['state'], batch)))
    a_l = np.array(list(map(lambda x: x['action'], batch)))
    r_l = np.array(list(map(lambda x: x['reward'], batch)))
    sprime_l = np.array(list(map(lambda x: x['next_state'], batch)))
    done_l   = np.array(list(map(lambda x: x['done'], batch)))
    
    # Find q(s', a') for all possible actions a'. Store in list
    # We'll use the maximum of these values for q-update  
    qvals_sprime_l = replay_model.predict(sprime_l)
    
    # Find q(s,a) for all possible actions a. Store in list
    target_f = replay_model.predict(s_l)
    
    # q-update target
    # For the action we took, use the q-update value  
    # For other actions, use the current nnet predicted value
    for i,(s,a,r,qvals_sprime, done) in enumerate(zip(s_l,a_l,r_l,qvals_sprime_l, done_l)): 
        if not done:
            target = r + gamma * np.max(qvals_sprime)
        else:
            target = r
        target_f[i][a] = target
    
    # Update weights of neural network with fit() 
    # Loss function is 0 for actions we didn't take
    replay_model.fit(s_l, target_f, epochs=1, verbose=0)
    return model

# Train model

In [107]:
def train_model(model=model, episodes=1000, gamma=0.99, epsilon=1, batch_size=32, mem_max_size=100000,
                copy=49):     
    reward_sums = [] # store score of each episode
    replay_memory = [] # replay memory
    
    replay_model = model
    
    for ep in range(episodes): 
        state = env.reset()
        done = False
        reward_sum = 0
        
        # Stop when top is reached
        while not done:
            # render game to see car learning
            # env.render()

            # Feedforward pass for current state to get predicted q-values for all actions 
            qvals_state = model.predict(state.reshape(1,-1))
            
            # Choose action which is epsilon greedy
            if np.random.random() < epsilon:
                # If random float < epsilon take random action
                action = env.action_space.sample()
            else:
                # Take the best predicted action (index of best action)
                action = np.argmax(qvals_state)
            
            # Take step, store results 
            next_state, reward, done, info = env.step(action)
            reward_sum += reward
            
            # add to memory, respecting memory buffer limit 
            if len(replay_memory) > mem_max_size:
                replay_memory.pop(0)
            replay_memory.append({"state":state,"action":action,"reward":reward,
                                  "next_state":next_state,"done":done})
            
            # Update state
            state = next_state
            
            # Train the nnet that approximates q(s,a), using the replay memory
            replay_model = replay(replay_model, replay_memory, batch_size = batch_size)
            
            # Decrease epsilon until we hit a target threshold 
            if epsilon > 0.01:
                epsilon -= 0.001
        
        if ep != 0 and not ep % copy:
            model = replay_model
            
        if not ep % int(episodes*0.1):
            print("Iteration: {}, Total reward: {}".format(ep, reward_sum))
        reward_sums.append(reward_sum)

In [None]:
trained_model = run(device=gpu, function=train_model, repeats=1)

Iteration: 0, Total reward: -200.0
Iteration: 100, Total reward: -106.0
Iteration: 200, Total reward: -116.0
Iteration: 300, Total reward: -106.0
Iteration: 400, Total reward: -122.0
Iteration: 500, Total reward: -164.0


In [None]:
scores = []
choices = []
for each_game in range(100):
    state = env.reset()
    score = 0
    for step_index in range(goal_steps):
        # Uncomment this line if you want to see how our bot playing
        # env.render()
        
        action = np.argmax(trained_model.predict(state))
        
        state, reward, done, info = env.step(action)
        
        score += reward
        if done:
            break

    scores.append(score)

print(scores)
print('Average Score:',sum(scores)/len(scores))
print('choice 1:{}  choice 0:{} choice 2:{}'.format(choices.count(1)/len(choices),choices.count(0)/len(choices),choices.count(2)/len(choices)))