In [8]:
# The goal here is to build a reinforcement learning skeleton project
# that can be adapted both to my own sims and openai gym/other stuff.
# Also, there are annotations by nifty bits of code so that I can learn it
# better and faster.
from collections import deque, namedtuple

import numpy as np
import tensorflow as tf
import random

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers.legacy import Adam

# import environment here, or build it in-notebook idc

ALPHA = 0.01
GAMMA = 0.99
MEMORY_SIZE = 1000
NUM_STEPS_FOR_UPDATE = 5
#...

# When I customize this enough, maybe name my setup after Otaku things, because DQN. haha. So funny.
# With this, I am basically performing transfer learning on myself: I'm starting with someone else's model (Andrew Ng) and then gradually descending towards
# a solution that is more tuned towards me (while improving my knowledge)

In [9]:
## environment stuff
# Note to self: I should make whatever environment I build's API resemble 
# that of the openai thing, so that I can freely switch between my own
# sims and theirs depending on the project.
# ADDITIONAL NOTE: minesweeper is probably not a good initial game. Way too luck-based.

state_size = 100 # change based on what is being considered
num_actions = 10 # Figure out how to represent "actions" efficiently


In [10]:
# Set up networks
q_network = Sequential([
    Input(state_size),
    Dense(units = 128, activation = "relu"),
    Dense(units = 128, activation = "relu"),
    Dense(units = num_actions)
])

target_q_network = Sequential([
    Input(state_size),
    Dense(units = 128, activation = "relu"),
    Dense(units = 128, activation = "relu"),
    Dense(units = num_actions)
])
optimizer = Adam(ALPHA)

# define environment tuple
experience = namedtuple("Experience", field_names=["state","action","reward","next_state","done"])

In [11]:
def compute_loss(experiences, gamma, q_network,):
    
    # unpack values of tuples as lists (basically take the columns out)
    states, actions, rewards, next_states, done_vals = experiences
    
    # Compute Q(s,a) for all experience tuples (this will be used to get the y-target value)
    max_q = tf.reduce_max(target_q_network(next_States), axis=-1)
    finished_constant = 1 - done_vals
    
    y_targets = rewards + gamma*max_qsa*finished_constant
    
    q_values = q_network(states)
    # Get someone clever to tell me what the line below does
    q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),
                                                tf.cast(actions, tf.int32)], axis=1))
    
    loss = MSE(y_targets,q_values)
    
    

In [12]:
# define helpers
EPSILON_DECAY = 0.99
EPSILON_FLOOR = 0.0

MINIBATCH_SIZE = 128

def update_if_conditions(t,steps_for_update,buffer): # update only every some # of steps
                                                     # and also only when buffer is at least as large as the batch size.
    if ((steps_for_update % t == 0) and (len(buffer) > MINIBATCH_SIZE)):
        return true
    return false
    
def sample_experiences(buffer): # Taken from Andrew Ng's coursera ML course
    random.sample(buffer,k=MINIBATCH_SIZE)
    states = tf.convert_to_tensor( # conversion to tensor so that methods like tf.reduce_max work later
    np.array([e.state for e in experiences if e is not None]), dtype=tf.float32
    )
    # Note the neat inline for loop and if statement going on. Be sure to ask about that.
    actions = tf.convert_to_tensor(
        np.array([e.action for e in experiences if e is not None]), dtype=tf.float32
    )
    
    rewards = tf.convert_to_tensor(
        np.array([e.reward for e in experiences if e is not None]), dtype=tf.float32
    )
    next_states = tf.convert_to_tensor(
        np.array([e.next_state for e in experiences if e is not None]), dtype=tf.float32
    )
    done_vals = tf.convert_to_tensor(
        np.array([e.done for e in experiences if e is not None]).astype(np.uint8),
        dtype=tf.float32,
    )
    return (states, actions, rewards, next_states, done_vals) # all of the above is done to take the tuple columns and make them tensorflow tensors
                                                              # so that they're easier to work with. This returns the tensors as a group of separate items.
    
def update_epsilon(epsilon):
    # Reduce epsilon by a fraction during each iteration, to a minimum of
    # the epsilon floor
    return max(epsilon*EPSILON_DECAY,EPSILON_FLOOR)

def get_action_epsilon(qvals,epsilon): # Get lowest-cost action, or pick randomly with epsilon% chance
    if (random.random() > epsilon):
        # pick random action
        choice = random.randint(0,qvals.numpy().shape[0])
        return choice # assumes action takes form of a number. # possible alternative from Coursera: return random.choice(np.arange(4))
    else:
        # pick best action (according to DQN)
        choice = np.argmax(qvals.numpy()[0]) # Take the first of the "max rewards in list" array generated ny np.argmax. The tensorflow tensor that is qvals needs to be converted first.
        

    

In [13]:
# Compute and apply gradients function
# mostly taken from coursera ml spec course
@tf.function
def agent_learn(experiences,gamma):
    
    with tf.GradientTape() as tape:
        loss = compute_loss(experiences,gamma,q_network)
    
    gradients = tape.gradient(loss, q_network.trainable_variables)
    
    optimizer.apply_gradients(zip(gradients,q_network.trainable_variables))
    

In [14]:
# without any of the history stuff to start

memory_buffer = deque(maxlen=MEMORY_SIZE) # Deques with a max_len remove their first elements
                                          # (they're "pushed out") if at their max length when new
                                          # elements are added to the end. This makes it a natural structure
                                          # for a fixed buffer we're drawing from.
num_episodes = 2000
max_turns = 1000 # Will vary heavily based on what the sim is
                 # max-turns here lines up nicely with memory_size
stop_at_score = 200

total_point_history = []
    
num_p_av = 200
epsilon = 1.0

for i in range(num_episodes):
    state = env.reset
    total_points = 0 # history stuff
    for t in range(max_turns):
        # Choose action A (with epsilon-greedy policy) based on current state
        state_qn = np.expand_dims(state, axis=0) # State will be same shape as input, need to do this to get it into 2d array
        q_values = q_network(state_qn)
        
        action = get_action_epsilon(q_values, epsilon) #!EA make function
        
        next_state, reward, done, _ = env.step(action)
        
        memory_buffer.append(experience(state,action,reward,next_state,done))
        
        update = update_if_conditions(t,NUM_STEPS_FOR_UPDATE, memory_buffer) #!EA
        
        if update:
            # Sample random mini-batch of experience tuples from memory buffer
            experiences = sample_experiences(memory_buffer) #!EA
            
            # ...And learn from 'em. Set y-targets based on estimates and update
            # DQN weights
            agent_learn(experiences,GAMMA)
            
        state = next_state.copy() # Since the state is a list, this should be resistant to api changes
        total_points += reward
        
        # Keeps iterating continuously in the same situation, unless sim is resolved
        if done:
            break
    total_point_history.append(total_points) # note no assignment here, because .append changes the object which is stored in memory, not some temporary value. No assignment is needed.
    av_latest_points = np.mean(total_point_history[-num_p_av:]) # colon here means "everything after num_p_av from the left"
    
            
    epsilon = update_epsilon(epsilon) #!EA make function
    
    print(f"\nEpisode {i+1}: total point average over last {num_p_av} episodes: {av_lastest_points}",end="")
    
    if ((av_latest_points % num_p_av) == 0):        
        # Print permenantly; things after this will be on a new line. This gives some sort of history/context as the algorithm runs.
        print(f"\rEpisode {i+1}: total point average over last {num_p_av} episodes: {av_latest_points:.2f}")
        
    if av_latest_points >= stop_at_score:
        q_network.save('reinforcement_learning.h5')
        break
    

        
    
        
        
            
            
        


NameError: name 'env' is not defined

In [None]:
# To get network to make decisions, call network on state and pick best action
def decide(state):
    expanded = np.expand_dims(state,axis=0) # change state into something tf can work with
    decision = q_network(expanded)         # lay out Q values for each choice
    return np.argmax(decision.numpy()[0]) # picks choice with highest Q.

