# Original article
https://blog.valohai.com/reinforcement-learning-tutorial-part-1-q-learning

# Environment

In [1]:
# %load ./../../../games/dungeon-simulator/dungeon_simulator.py
import random


class DungeonSimulator:
	def __init__(self, length=5, slip=0.1, small=2, large=10):
		self.length = length
		self.slip = slip # Probability of flipping the action
		self.small = small
		self.large = large
		self.state = 0


	def take_action(self, action):
		''' Executes the action and returns the next state and the received reward.'''
		if random.random() < self.slip:
			action = not action
		reward = 0
		if action == BACKWARD:
			reward = self.small
			self.state = 0
		elif action == FORWARD:
			if self.state < self.length - 1:
				self.state += 1
				reward = 0
			else:
				reward = self.large

		return self.state, reward


	def reset(self):
		self.state = 0
		return self.state

# Agent

In [2]:
import random


class Drunkard:
    def __init__(self):
        self.q_table = None
        
    def get_next_action(self, state):
        # Random walk
        return FORWARD if random.random() < 0.5 else BACKWARD
    
    def update(self, old_state, new_state, action, reward):
        pass # I don't care! I'm drunk!!

In [3]:
import random


class Accountant:
    def __init__(self):
        # Spreadsheet (Q-table) for rewards accounting
        self.q_table = [
            [ 0, 0, 0, 0, 0 ], # FORWARD states
            [ 0, 0, 0, 0, 0 ] # BACKWARD states
        ]
        
    def get_next_action(self, state):
        # Is FORWARD reward bigger?
        if self.q_table[FORWARD][state] > self.q_table[BACKWARD][state]:
            return FORWARD
        elif self.q_table[BACKWARD][state] > self.q_table[FORWARD][state]:
            return BACKWARD
        return FORWARD if random.random() < 0.5 else BACKWARD
    
    def update(self, old_state, new_state, action, reward):
        self.q_table[action][old_state] += reward

In [4]:
import random


class Gambler:
    def __init__(self, learning_rate=0.1, discount=0.95, exploration_rate=1.0, iterations=10000):
        self.q_table = [
            [ 0, 0, 0, 0, 0 ], # FORWARD states
            [ 0, 0, 0, 0, 0 ] # BACKWARD states
        ]
        self.learning_rate = learning_rate
        self.discount = discount
        self.exploration_rate = exploration_rate
        self.exploration_delta = exploration_rate / iterations # Shift from exploration to exploitation
        
    def get_next_action(self, state):
        if random.random() > self.exploration_rate:
            return self.greedy_action(state)
        else:
            return self.random_action()
        
    def greedy_action(self, state):
        if self.q_table[FORWARD][state] > self.q_table[BACKWARD][state]:
            return FORWARD
        elif self.q_table[BACKWARD][state] > self.q_table[FORWARD][state]:
            return BACKWARD
        
        return self.random_action()
    
    def random_action(self):
        return FORWARD if random.random() < 0.5 else BACKWARD
    
    def update(self, old_state, new_state, action, reward):
        # Old Q-table value
        old_value = self.q_table[action][old_state]
        future_action = self.greedy_action(new_state)
        future_reward = self.q_table[future_action][new_state]
        
        # Main Q-table updating algorithm
        new_value = old_value + self.learning_rate * (reward + self.discount * future_reward - old_value)
        self.q_table[action][old_state] = new_value
        
        if self.exploration_rate > 0:
            self.exploration_rate -= self.exploration_delta

In [5]:
import random
import tensorflow as tf
import numpy as np


class DeepGambler:
    def __init__(self, learning_rate=0.1, discount=0.95, exploration_rate=1.0, iterations=10_000):
        self.learning_rate = learning_rate
        self.discount = discount
        self.exploration_rate = exploration_rate
        self.exploration_delta = exploration_rate / 10_000
        
        # Input has five neurons, each represents a single game state
        self.input_count = 5
        # Output is two neurons, each represents a Q-value for each action
        self.output_count = 2
        
        self.session = tf.Session()
        self.define_model()
        self.session.run(self.initializer)
        
    def define_model(self):
        '''Define tensorflow model graph.'''
        # Input is an array of 5 items (states one-hot encoded)
        # Input is 2-dimensional due to possibility of batched training data (why does this change the input?)
        # NOTE: In this example we assume no batching
        self.model_input = tf.placeholder(dtype=tf.float32, shape=[ None, self.input_count ])
        
        # Two hidden layers of 16 neurons with sigmoid activation initialized to zero for stability
        fc1 = tf.layers.dense(self.model_input, 16, activation=tf.sigmoid, kernel_initializer=tf.constant_initializer(np.zeros((self.input_count, 16))))
        fc2 = tf.layers.dense(fc1, 16, activation=tf.sigmoid, kernel_initializer=tf.constant_initializer(np.zeros((16, self.output_count))))
        
        # Output is two values, Q for both possible actions (FORWARD and BACKWARD)
        # Output is 2-dimensional, due to possibility of batched training data (again, why??)
        self.model_output = tf.layers.dense(fc2, self.output_count)
        
        # This is for feeding training output (a.k.a ideal target values)
        self.target_output = tf.placeholder(shape=[ None, self.output_count ], dtype=tf.float32)
        # Loss is mean squared difference between current output and ideal target values
        loss = tf.losses.mean_squared_error(self.target_output, self.model_output)
        # Optimizer adjusts weights to minimize loss, with the speed of the learning rate
        self.optimizer = tf.train.GradientDescentOptimizer(learning_rate=self.learning_rate).minimize(loss)
        # Initializer sets weights to initial values
        self.initializer = tf.global_variables_initializer()
        
    def get_Q(self, state):
        '''Ask model to estimate Q value for specific state (via inference).'''
        # Model input: Single state represented by array of 5 items (state one-hot encoded)
        # Model output: Array of Q values for single state
        return self.session.run(self.model_output, feed_dict={ self.model_input: self.to_one_hot(state) })[0]
    
    def to_one_hot(self, state):
        '''Turn state into 2d one_hot tensor (e.g. 3 -> [[ 0, 0, 0, 1, 0]]).'''
        one_hot = np.zeros((1, 5))
        one_hot[0, [ state ]] = 1
        return one_hot
    
    def get_next_action(self, state):
        if random.random() > self.exploration_rate: # Exploit
            return self.greedy_action(state)
        else: # Explore
            return self.random_action()
        
    def greedy_action(self, state):
        '''Returns the action with the bigger Q-value, as estimated by our model (via inference)'''
        return np.argmax(self.get_Q(state))
    
    def random_action(self):
        return FORWARD if random.random() < 0.5 else BACKWARD
    
    def train(self, old_state, action, reward, new_state):
        # Ask the model for the Q values of the old state
        old_state_Q_values = self.get_Q(old_state)
        # Ask the model for the Q values of the new state
        new_state_Q_values = self.get_Q(new_state)
        # Change the Q value of the action we took to what we expect (so we can train towards our expected Q values)
        old_state_Q_values[action] = reward + self.discount * np.amax(new_state_Q_values)
        
        # Set up training data
        training_input = self.to_one_hot(old_state)
        target_output = [ old_state_Q_values ]
        training_data = { self.model_input: training_input, self.target_output: target_output }
        
        # Train
        self.session.run(self.optimizer, feed_dict=training_data)
        
    def update(self, old_state, new_state, action, reward):
        # Train our model with new data
        self.train(old_state, action, reward, new_state)
            
        # Shift our exploration_rate toward zero
        if self.exploration_rate > 0:
            self.exploration_rate -= self.exploration_delta

In [6]:
import torch.nn as nn
import torch.nn.functional as F

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.fc1 = nn.Linear(5, 16)
        #self.fc1.weight.data.fill_(0.0)
        #self.fc1.bias.data.fill_(0.0)
        self.fc2 = nn.Linear(16, 16)
        #self.fc2.weight.data.fill_(0.0)
        #self.fc2.bias.data.fill_(0.0)
        self.fc3 = nn.Linear(16, 2)
        #self.fc3.weight.data.fill_(0.0)
        #self.fc3.bias.data.fill_(0.0)
        
    def forward(self, x):
        x = torch.sigmoid(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        x = self.fc3(x)
        return x

In [7]:
import random

import torch
import torch.optim as optim

class DeepPytorchGambler:
    def __init__(self, learning_rate=0.0001, discount=0.95, exploration_rate=1.0, iterations=10_000, trained_model=None):
        self.learning_rate = learning_rate
        self.discount = discount
        self.exploration_rate = exploration_rate
        self.exploration_delta = exploration_rate / iterations
        
        self.input_count = 5
        self.output_count = 2
        
        self.define_model(trained_model)
    
    def define_model(self, trained_model):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        if trained_model:
            self.model = trained_model.to(self.device)
        else:
            self.model = Model().to(self.device)
        
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
    
    def get_Q(self, state):
        x = torch.tensor(self.to_one_hot(state)).to(self.device)
        return self.model(x)
        
    def to_one_hot(self, state):
        '''Turn state into 2d one_hot tensor (e.g. 3 -> [[ 0, 0, 0, 1, 0]]).'''
        one_hot = np.zeros((1, 5))
        one_hot[0, [ state ]] = 1
        return one_hot
    
    def get_next_action(self, state):
        if random.random() < self.exploration_rate:
            return self.random_action()
        else:
            return self.greedy_action(state)
        
    def random_action(self):
        return random.randrange(0, 5) # Maybe change the probability distribution?
    
    def greedy_action(self, state):
        #print("Greedy1:", torch.max(self.get_Q(state), 0)[0])
        #print("Greedy2:", torch.max(self.get_Q(state), 0)[1])
        return torch.max(self.get_Q(state), 0)[1]
    
    def update(self, old_state, new_state, action, reward):
        self.train(old_state, new_state, action, reward)
        # TODO: Maybe change algorithm?
        if self.exploration_rate > 0:
            self.exploration_rate = max(0.2, self.exploration_rate - self.exploration_delta)
        
    def train(self, old_state, new_state, action, reward):
        old_state_values = self.get_Q(old_state)
        new_state_values = self.get_Q(new_state)
        
        new_reward = reward + self.discount * torch.max(new_state_values)
        updated_state_values = torch.tensor(old_state_values)
        updated_state_values[action] = new_reward
        
        
        old_state_values = torch.tensor(old_state_values, device=self.device).float()
        updated_state_values = torch.tensor(updated_state_values, device=self.device).float()
        # in your training loop:
        self.optimizer.zero_grad()   # zero the gradient buffers
        loss = torch.autograd.Variable(F.smooth_l1_loss(old_state_values, updated_state_values), requires_grad=True)
        loss.backward()
        self.optimizer.step()    # Does the update

# Orchestration

In [8]:
import random
import json
import argparse
import time

In [9]:
FORWARD = 0
BACKWARD = 1

In [10]:
parser = argparse.ArgumentParser()
parser.add_argument("--agent", type=str, default="GAMBLER", help="Which agent to use")
parser.add_argument("--learning-rate", type=float, default=0.1, help="How quickly the algorithm tries to learn")
parser.add_argument("--discount", type=float, default=0.95, help="Discount for estimated future action") # Reward?
parser.add_argument("--iterations", type=int, default=2000, help="Iteration count")
FLAGS, unparsed = parser.parse_known_args()

In [11]:
learning_rate = 0.01
discount = 0.95
iterations = 10_000

In [12]:
agent = Drunkard() # 12754
agent = Accountant() # 17548
agent = Gambler() # 25890

# 21186 with learning rate 0.01 (not as good, because a deep neural net is overkill for this simple Q-table)
# Seems to vary quite a bit
agent = DeepGambler(learning_rate=learning_rate) 
agent = DeepPytorchGambler()

Instructions for updating:
Use keras.layers.dense instead.
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Use tf.cast instead.


RuntimeError: CUDA error: out of memory

In [None]:
dungeon = DungeonSimulator()
dungeon.reset()
total_reward = 0

In [None]:
for step in range(iterations):
    old_state = torch.tensor(dungeon.state)
    action = agent.get_next_action(old_state)
    new_state, reward = dungeon.take_action(action)
    agent.update(old_state, new_state, action, reward)
    
    total_reward += reward
    if step % 250 == 0:
        print(json.dumps({"step": step, "total_reward": total_reward}))
        
    time.sleep(0.00001) # Avoid spamming stdout too fast (why?)
    
#print("Final Q-table:", agent.q_table)