## Linear Q-Learning

In [None]:
import sys
import pygame as pg
import numpy.random as npr
from sklearn import linear_model
from sklearn import tree


class SwingyMonkey:

    def __init__(self, sound=True, text=None, action_callback=None, 
                 reward_callback=None, tick_length=100):
        """Constructor for the SwingyMonkey class.

        Possible Keyword Arguments:

        sound: Boolean variable on whether or not to play sounds.
               Defaults to True.

        text: Optional string to display in the upper right corner of
              the screen.

        action_callback: Function handle for determining actions.
                         Takes a dictionary as an argument.  The
                         dictionary contains the current state of the
                         game.

        reward_callback: Function handle for receiving rewards. Takes
                         a scalar argument which is the reward.

        tick_length: Time in milliseconds between game steps.
                     Defaults to 100ms, but you might want to make it
                     smaller for training."""

        # Don't change these!!!
        self.screen_width  = 600
        self.screen_height = 400
        self.horz_speed    = 25
        self.impulse       = 15
        self.gravity       = npr.choice([1,4])
        self.tree_mean     = 5
        self.tree_gap      = 200
        self.tree_offset   = -300
        self.edge_penalty  = -10.0
        self.tree_penalty  = -5.0
        self.tree_reward   = 1.0

        # Store arguments.
        self.sound         = sound
        self.action_fn     = action_callback
        self.reward_fn     = reward_callback
        self.tick_length   = tick_length
        self.text          = text

        # Initialize pygame.
        pg.init()
        try:
            pg.mixer.init()
        except:
            print("No sound.")
            self.sound = False

        # Set up the screen for rendering.
        self.screen = pg.display.set_mode((self.screen_width, self.screen_height), 0, 32)

        # Load external resources.
        self.background_img = pg.image.load('res/jungle-pixel.bmp').convert()
        self.monkey_img     = pg.image.load('res/monkey.bmp').convert_alpha()
        self.tree_img       = pg.image.load('res/tree-pixel.bmp').convert_alpha()
        if self.sound:
            self.screech_snd    = pg.mixer.Sound('res/screech.wav')
            self.blop_snd       = pg.mixer.Sound('res/blop.wav')

        # Set up text rendering.
        self.font = pg.font.Font(None, 36)

        # Track locations of trees and gaps.
        self.trees     = []
        self.next_tree = 0
        
        # Precompute some things about the monkey.
        self.monkey_left  = self.screen_width/2 - self.monkey_img.get_width()/2
        self.monkey_right = self.monkey_left + self.monkey_img.get_width()
        self.monkey_loc   = self.screen_height/2 - self.monkey_img.get_height()/2

        # Track game state.
        self.vel   = 0
        self.hook  = self.screen_width
        self.score = 0
        self.iter  = 0

    def get_state(self):
        '''Returns a snapshot of the current game state, computed
        relative to to the next oncoming tree.  This is a dictionary
        with the following structure:
        { 'score': <current score>,
          'tree': { 'dist': <pixels to next tree trunk>,
                    'top':  <screen height of top of tree trunk gap>,
                    'bot':  <screen height of bottom of tree trunk gap> },
          'monkey': { 'vel': <current monkey y-axis speed in pixels per iteration>,
                      'top': <screen height of top of monkey>,
                      'bot': <screen height of bottom of monkey> }}'''                      

        # Find the next closest tree.
        next_tree = None
        for tree in self.trees:
            if tree['x']+290 >= self.monkey_left:
                next_tree = tree.copy()
                break

        if not next_tree:
            next_tree = self.trees[0].copy()

        # Construct the state dictionary to return.
        return { 'score': self.score,
                 'tree': { 'dist': next_tree['x']+215-self.monkey_right,
                           'top': self.screen_height-next_tree['y'],
                           'bot': self.screen_height-next_tree['y']-self.tree_gap},
                 'monkey': { 'vel': self.vel,
                             'top': self.screen_height - self.monkey_loc + self.monkey_img.get_height()/2,
                             'bot': self.screen_height - self.monkey_loc - self.monkey_img.get_height()/2}}

    def game_loop(self):
        '''This is called every game tick.  You call this in a loop
        until it returns false, which means you hit a tree trunk, fell
        off the bottom of the screen, or jumped off the top of the
        screen.  It calls the action and reward callbacks.'''

        # Render the background.
        self.screen.blit(self.background_img, (self.iter,0))
        if self.iter < self.background_img.get_width() - self.screen_width:
            self.screen.blit(self.background_img, (self.iter+self.background_img.get_width(),0))

        # Perhaps generate a new tree.
        if self.next_tree <= 0:
            self.next_tree = self.tree_img.get_width() * 5 + int(npr.geometric(1.0/self.tree_mean))
            self.trees.append( { 'x': self.screen_width+1,
                                 'y': int((0.3 + npr.rand()*0.65)*(self.screen_height-self.tree_gap)),
                                 's': False })
        # Process input events.
        for event in pg.event.get():
            if event.type == pg.QUIT:
                sys.exit()
            elif self.action_fn is None and event.type == pg.KEYDOWN:
                self.vel = npr.poisson(self.impulse)
                self.hook = self.screen_width

        # Perhaps take an action via the callback.
        if self.action_fn is not None and self.action_fn(self.get_state()):
            self.vel = npr.poisson(self.impulse)
            self.hook = self.screen_width

        # Eliminate trees that have moved off the screen.
        self.trees = [x for x in self.trees if x['x'] > -self.tree_img.get_width()]

        # Monkey dynamics
        self.monkey_loc -= self.vel
        self.vel        -= self.gravity

        # Current monkey bounds.
        monkey_top = self.monkey_loc - self.monkey_img.get_height()/2
        monkey_bot = self.monkey_loc + self.monkey_img.get_height()/2

        # Move trees to the left, render and compute collision.
        self.next_tree -= self.horz_speed
        edge_hit = False
        tree_hit = False
        pass_tree = False
        for tree in self.trees:
            tree['x'] -= self.horz_speed

            # Render tree.
            self.screen.blit(self.tree_img, (tree['x'], self.tree_offset))

            # Render gap in tree.
            self.screen.blit(self.background_img, (tree['x'], tree['y']),
                             (tree['x']-self.iter, tree['y'],
                              self.tree_img.get_width(), self.tree_gap))
            if self.iter < self.background_img.get_width() - self.screen_width:
                self.screen.blit(self.background_img, (tree['x'], tree['y']),
                                 (tree['x']-(self.iter+self.background_img.get_width()), tree['y'],
                                  self.tree_img.get_width(), self.tree_gap))
                
            trunk_left  = tree['x']
            trunk_right = tree['x'] + self.tree_img.get_width()
            trunk_top   = tree['y']
            trunk_bot   = tree['y'] + self.tree_gap

            # Compute collision.
            if (((trunk_left < (self.monkey_left+15)) and (trunk_right > (self.monkey_left+15))) or
                ((trunk_left < self.monkey_right) and (trunk_right > self.monkey_right))):
                #pg.draw.rect(self.screen, (255,0,0), (trunk_left, trunk_top, trunk_right-trunk_left, trunk_bot-trunk_top), 1)
                #pg.draw.rect(self.screen, (255,0,0), (self.monkey_left+15, monkey_top, self.monkey_img.get_width()-15, monkey_bot-monkey_top), 1)
                if (monkey_top < trunk_top) or (monkey_bot > trunk_bot):
                    tree_hit = True
            
            # Keep score.
            if not tree['s'] and (self.monkey_left+15) > trunk_right:
                tree['s'] = True
                self.score += 1
                pass_tree = True
                if self.sound:
                    self.blop_snd.play()

        # Monkey swings down on a vine.
        if self.vel < 0:
            pg.draw.line(self.screen, (92,64,51), (self.screen_width/2+20, self.monkey_loc-25), (self.hook,0), 4)

        # Render the monkey.
        self.screen.blit(self.monkey_img, (self.monkey_left, monkey_top))

        # Fail on hitting top or bottom.
        if monkey_bot > self.screen_height or monkey_top < 0:
            edge_hit = True

        # Render the score
        score_text = self.font.render("Score: %d" % (self.score), 1, (230, 40, 40))
        self.screen.blit(score_text, score_text.get_rect())

        if self.text is not None:
            text = self.font.render(self.text, 1, (230, 40, 40))
            textpos = text.get_rect()
            self.screen.blit(text, (self.screen_width-textpos[2],0,textpos[2],textpos[3]))

        # Render the display.
        pg.display.update()

        # If failed, play sound and exit.  Also, assign rewards.
        if edge_hit:
            if self.sound:
                ch = self.screech_snd.play()
                while ch.get_busy():
                    pg.time.delay(500)
            if self.reward_fn is not None:
                self.reward_fn(self.edge_penalty)
            if self.action_fn is not None:
                self.action_fn(self.get_state())
            return False
        if tree_hit:
            if self.sound:
                ch = self.screech_snd.play()
                while ch.get_busy():
                    pg.time.delay(500)
            if self.reward_fn is not None:
                self.reward_fn(self.tree_penalty)
            if self.action_fn is not None:
                self.action_fn(self.get_state())
            return False

        if self.reward_fn is not None:
            if pass_tree:
                self.reward_fn(self.tree_reward)
            else:
                self.reward_fn(0.0)            
        
        # Wait just a bit.
        pg.time.delay(self.tick_length)

        # Move things.
        self.hook -= self.horz_speed
        self.iter -= self.horz_speed
        if self.iter < -self.background_img.get_width():
            self.iter += self.background_img.get_width()

        return True

#if __name__ == '__main__':
    
    # Create the game object.
#    game = SwingyMonkey()

    # Loop until you hit something.
#    while game.game_loop():
#        pass




In [None]:
states = []
rewards = []
actions = []

In [None]:
fit0 = linear_model.LinearRegression()
fit1 = linear_model.LinearRegression()

fit0.fit([[0, 257.0, 485.0, 294, 1369.0, -0.0, 0, 0, 0, 257.0, 485.0, 294, 1369.0, -0.0, 0, 0]], [0])
fit1.fit([[0, 257.0, 485.0, 294, 1369.0, -0.0, 0, 0, 0, 257.0, 485.0, 294, 1369.0, -0.0, 0, 0]], [0])

In [None]:
# Imports.
import numpy as np
import numpy.random as npr
import pygame as pg

#from SwingyMonkey import SwingyMonkey


class Learner(object):
    '''
    This agent jumps randomly.
    '''

    def __init__(self):
        self.last_state  = []
        self.last_action = 0
        self.last_reward = 0
        self.states = []
        self.rewards = []
        self.actions = []
        self.velocities = []
        self.gravity = 0
        self.nstates = 0
        self.epsilon = 1

    def reset(self):
        self.last_state  = []
        self.last_action = 0
        self.last_reward = 0
        self.states = []
        self.rewards = []
        self.actions = []
        self.velocities = []
        self.gravity = 0
        self.nstates = 0

    def action_callback(self, state):
        '''
        Implement this function to learn things and take actions.
        Return 0 if you don't want to jump and 1 if you do.
        '''

        # You might do some learning here based on the current state and the last state.

        # You'll need to select and action and return it.
        # Return 0 to swing and 1 to jump.
        
        #self.epsilon = 0.995 * self.epsilon
        
        if (self.gravity == 0) and (self.nstates == 1):
            self.gravity = state['monkey']['vel'] - states[-1][0]

        state = [state['monkey']['vel'], state['monkey']['top'], state['tree']['dist'], state['tree']['top'], (state['monkey']['top'] - state['tree']['top']), (state['monkey']['top'] - state['tree']['top']) * state['monkey']['vel'], state['tree']['dist'] * state['monkey']['vel'], self.gravity]
        state = np.append(np.array(state), np.array(state) * self.gravity)
        new_state  = np.copy(state) 
            
        self.last_state  = new_state

            
            
        if ((np.random.uniform() < agent.rate) and len(np.nonzero(1-np.array(actions))[0][:-1]) > 0):
            targets = []
            for index in np.nonzero(1-np.array(actions))[0][:-1]:
                targets.append(rewards[index] + 0.8 * fit0.predict([states[index + 1]])[0])

            #self.fit0 = linear_model.LinearRegression()
            fit0.fit(np.array(states)[np.nonzero(1-np.array(actions))[0]][:-1], targets)
    
        if ((np.random.uniform() < agent.rate) and len(np.nonzero(actions)[0][:-1]) > 0):
            targets = []
            for index in np.nonzero(actions)[0][:-1]:
                targets.append(rewards[index] + 0.8 * fit1.predict([states[index + 1]])[0])

            #self.fit1 = linear_model.LinearRegression()
            fit1.fit(np.array(states)[np.nonzero(actions)[0]][:-1], targets)
        

        if self.gravity == 0:
            self.last_action = 0
        elif np.random.uniform() < self.epsilon and (len(states) > 25):
            self.last_action = int(fit1.predict([states[-1]])[0] > fit0.predict([states[-1]])[0])
        else:  
            self.last_action = np.random.choice([0,1], p = [0.8, 0.2])
            
                                                        
        if self.last_reward >= 0:
            actions.append(self.last_action)
            states.append(state)
            self.states.append(state)
            self.actions.append(self.last_action)
            
        self.nstates += 1
                                        
        return self.last_action

    def reward_callback(self, reward):
        '''This gets called so you can see what reward you get.'''
        
        if reward < 0:
            reward = reward / 5
        elif reward == 0:
            reward = 0.1
        rewards.append(reward)

        self.last_reward = reward
        

def run_games(learner, hist, iters = 100, t_len = 100):
    '''
    Driver function to simulate learning by having the agent play a sequence of games.
    '''

    for ii in range(iters):
        # Make a new monkey object.
        swing = SwingyMonkey(sound=False,                  # Don't play sounds.
                             text="Epoch %d" % (ii),       # Display the epoch on screen.
                             tick_length = t_len,          # Make game ticks super fast.
                             action_callback=learner.action_callback,
                             reward_callback=learner.reward_callback)

        # Loop until you hit something.
        while swing.game_loop():
            pass
        
        # Save score history.
        hist.append(swing.score)

        # Reset the state of the learner.
        learner.reset()
    pg.quit()
    return (states, rewards, actions)

In [None]:
# Select agent.
agent = Learner()
#agent.fit0 = fit0
#agent.fit1 = fit1

# Empty list to save history.
hist = []

In [None]:
# Run games.
scores = []
epsilons = [0, 0.5, 0.6, 0.7, 0.8, 0.9, 0.9, 0.9, 0.9, 0.9, 0.95, 0.95]
rates = [0.1, 0.1, 0.1, 0.01, 0.01, 0.01, 0.01, 0.01, 0.005, 0.005, 0.005, 0.005]

for i in range(len(epsilons)):
    hist = []
    agent.epsilon = epsilons[i]
    agent.rate = rates[i]
    states, rewards, actions = run_games(agent, hist, 50, 10)
    scores.append(hist)


## Visualizations

In [None]:
import matplotlib.pyplot as plt
plt.plot(np.max(scores, axis = 1))
plt.title('Max Score by Stage')
plt.xlabel('Stage')
plt.ylabel('Max Score')
plt.show()

In [None]:
preds1 = fit0.predict(states)
preds2 = fit1.predict(states)
preds = []
for i in range(len(preds1)):
    preds.append(np.max([preds1[i], preds2[i]]))

In [None]:
plt.plot(preds[-1000:-500])
plt.title("Q-max by State")
plt.xlabel("State Index")
plt.ylabel("Q-max")
plt.show()

In [None]:
accs = np.array(states).T[6]
acc1 = (accs == -1)
acc2 = (accs == -4)

In [None]:
preds1_1 = fit0.predict(np.array(states)[acc1])
preds2_1 = fit1.predict(np.array(states)[acc1])
preds_1 = []
for i in range(len(preds1_1)):
    preds_1.append(np.max([preds1_1[i], preds2_1[i]]))

In [None]:
preds1_2 = fit0.predict(np.array(states)[acc2])
preds2_2 = fit1.predict(np.array(states)[acc2])
preds_2 = []
for i in range(len(preds1_2)):
    preds_2.append(np.max([preds1_2[i], preds2_2[i]]))

In [None]:
plt.hist(preds_1, bins = 100)
plt.hist(preds_2, bins = 100)
plt.show()

In [None]:
plt.hist(preds_2, bins = 100)
plt.show()

In [None]:
import seaborn
seaborn.kdeplot(preds_1)
seaborn.kdeplot(preds_2)
plt.title("Q-max Distributions by Gravity")
plt.xlabel("Q-max")
plt.ylabel("Probability Density")
plt.show()