This notebook is adapted from:
https://github.com/JannesKlaas/sometimes_deep_sometimes_learning/blob/master/reinforcement.ipynb


In [137]:
import numpy as np
import random
import scipy

from file_utils import *

In [147]:
_SIZE = 7
_FRACTION = (0.5, 0.7)
HORIZ, VERT = 0, 1
_printfreq = 30
_BALLS_TO_LEVELUP = 20
_outfile = "gamestats.txt"

In [5]:
def apply_gravity_to_column(column):
    '''
    An entire column is adjusted for 'gravity.' All the zeros float to the top.
    All the other numbers come down as needed.
    '''

    original = column[:] #original
    updated  = column[:] #this can be changed
    flip_flag = 1
    safety_brkr = 0
    flip_occurred = False
    
    while flip_flag:
        a = updated[:]
        safety_brkr += 1
        flip_flag = 0 # off
        for index, (up, down) in enumerate(zip(a[:-1], a[1:])):        
            if up and not down:
                updated[index], updated[index+1] = 0, up
                # print("After ", index, "Column looks like:", column)
                flip_flag = 1 # at least one flip happened, so keep going
                flip_occurred = True
                if safety_brkr >= 100:
                    flip_flag = 0
                    
    return (flip_occurred, original, updated)



In [6]:
def generate_init_grid(_SIZE):
    '''
    drop-7 starting grid with a few rules.
    
    Explode as needed (vertical)
    Explode as needed (horizontal)
    '''
    grid = np.zeros((_SIZE,_SIZE), dtype=np.int) # Example array
    
    for x in np.nditer(grid, op_flags=['readwrite']):
        #generate a U(0,1). If it is less than _fraction, then get a random integer from 1..7
        if random.random() <= random.uniform(_FRACTION[0], _FRACTION[1]):
            x[...] = random.randint(1,_SIZE) #ellipsis will modify the right element
    
    #apply gravity to each column
    for colnum in range(grid.shape[1]):
        _,_, new = apply_gravity_to_column(grid[:, colnum])
        grid[ :, colnum] = new
    
    return grid



In [7]:
def row(grid, rnum, _string):
    grid[_SIZE-1-rnum, :] = list(_string)
    return grid
    
def zerow(grid, rnum):
    grid[_SIZE-1-rnum,:] = list('0')*_SIZE

def zecol(grid, cnum):
    grid[:, cnum] = list('0')*_SIZE

def grid_of_zeros(size=_SIZE):
    return np.zeros((size,size), dtype=np.int)

def grid_of_ones(size=_SIZE):
    return np.ones((size,size), dtype=np.int)

In [9]:
recreate_grid(7)

array([[0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [6, 2, 6, 0, 0, 0, 0],
       [7, 1, 3, 1, 3, 5, 2],
       [4, 1, 4, 3, 0, 3, 6]])

In [11]:
generate_init_grid(7)

array([[0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 2, 0, 0, 0, 0, 0],
       [0, 5, 2, 0, 7, 6, 6],
       [5, 1, 7, 0, 1, 6, 2],
       [2, 6, 2, 0, 6, 5, 3],
       [7, 4, 6, 2, 4, 1, 3]])

In [12]:
from itertools import groupby

def mask(vec):
    return([x > 0 for x in vec])


def get_mask_lengths(_vec):
    '''
    Outputs a tuple of rle lengths, 0's and 1's and their rle's
    '''
    
    m = mask(_vec)
    b = range(len(m))
    ml = []
    for group in groupby(iter(b), lambda x: m[x]): # use m[x] as the grouping key.
        ml.append((group[0], len(list(group[1])))) #group[0] is 1 or 0. and group[1] is its rle

    return ml


def blank_out(_num, vec):
    return [0 if x ==_num else x for x in vec]


def inplace_explosions(vec):
    
    potential = True
    exp_occurred = False
    
    original = [x for x in vec] #manually creating a deepcopy
    updated = [x for x in vec] #manually creating a deepcopy
    while potential:
        potential = False
        ml = get_mask_lengths(updated) # number of contiguous non-zeros
        #print(ml)
        start, end = 0, 0
        for piece in ml:
            _len = piece[1]
            start = end
            end = start + _len
            #print(vec[start:end])
            if piece[0]: #True, nonzero elements exist        
                seg = updated[start:end]
                newseg = blank_out(_len, seg)
                if(seg != newseg):
                    potential = True # there could be more explosions
                    exp_occurred = True
                    updated[start:end] = newseg[:]

    unchanged = [1 if i==j else 0 for i,j in zip(original, updated)]
                
    # print("Exp occurred", exp_occurred)
    return (exp_occurred, original, unchanged)

### Logic used in Updating Grid

    init_grid()
    for each row, calculate explosions (but don't execute them)
    for each col, caluclate explosions (but don't execute them)
    EXECUTE ALL THE EXPLOSIONS AT ONCE.
    Grid has been updated.
    Now, Gravity to each of the columns.
    Rinse and repeat!

In [14]:
def apply_explosions_to_grid(grid):
    
    row_mask, col_mask = grid_of_ones(_SIZE), grid_of_ones(_SIZE)
    for i in range(_SIZE):
        _, _, row_mask[i, :] = inplace_explosions(grid[i, :])
        _, _, col_mask[:, i] = inplace_explosions(grid[:, i])
        
        
    for i in range(_SIZE):
        grid[i, :] = grid[i, :] * row_mask[i, :]
        grid[:, i] = grid[:, i] * col_mask[:, i]
        
    
    return(grid)


def apply_gravity_to_grid(grid):    
    
    original = grid.copy()
    for i in range(_SIZE):
        _,_,grid[:, i] = apply_gravity_to_column(grid[:, i])
            
            
    updated = grid.copy()
    return(grid, np.array_equal(updated, original))


In [15]:
def update_grid(grid):

    stop_flag = 0
    while not stop_flag:
        grid = apply_explosions_to_grid(grid)
        grid, stop_flag = apply_gravity_to_grid(grid)
    
    return grid

In [119]:
def recreate_grid(size=_SIZE):
    grid = grid_of_zeros(size)
    grid = row(grid, 0, '4143836')
    grid = row(grid, 1,'7131352')
    grid = row(grid, 2,'6260800')
    #list('52134335')
    return grid
    #zecol(3)

In [124]:
grid = recreate_grid()
#grid = generate_init_grid(7)
print(grid) 
grid = update_grid(grid)
print(grid)

[[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [6 2 6 0 8 0 0]
 [7 1 3 1 3 5 2]
 [4 1 4 3 8 3 6]]
[[0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0]
 [0 0 0 1 8 5 0]
 [4 0 4 3 8 3 6]]


In [17]:
update_grid(grid)

array([[0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 3, 6, 0, 0],
       [5, 5, 4, 5, 1, 2, 0]])

### Logic used the Game Loop


    init_grid()
    
    while not game_over:
        update_grid()
        generate_next_ball()
        get_column()
        drop_ball_in_column()
    


In [197]:
def generate_next_ball(size = _SIZE):
    return random.randint(1,size)

def get_column_to_drop_ball(grid, ball):
    
    #need better logic soon
    return random.randint(0,_SIZE-1)
    
     
def nz(grid):
    return np.count_nonzero(grid)

def is_grid_full(grid):
    nz = np.count_nonzero(grid)
    return nz == (_SIZE * _SIZE)
    
def drop_ball_in_column(grid, col, ball):
    '''
    If valid column, find the first zero in the column and replace the value there.
    If column is full, return illegal flag
    If grid is full game_over
    '''
    game_over = is_grid_full(grid)
    gcol = grid[:, col]
    slot = np.where(gcol==0)[0]
    if not slot.size: #returned []
        need_another_col = True
    else:
        need_another_col = False

    if not game_over and not need_another_col:
        grid[slot[-1], col] = ball # place in the last zero column, from the top
        
    if game_over:
        need_another_col = False
  
        
    return(grid, game_over, need_another_col)




In [178]:
class _stats(object):
    

    def reset():
        _stats.ball_count = 0
        _stats.levelup_count = 0
        print("", file=open(_outfile, "w"))
        
    def ball_drop():
        _stats.ball_count += 1
        
    def level_up():
        _stats.levelup_count += 1


    

In [209]:
def top_row_occupied(grid, _SIZE):
    return np.count_nonzero(grid[0, :])

def level_up(grid):
    '''
    Add a row of balls to the bottom of the grid.
    If the top row has any ball, Game Over
    '''
    # if top row has something, return grid and gameover
    if(top_row_occupied(grid, _SIZE)):
        return grid, 1 #game over
        
        
    original = grid.copy()
    for i in range(_SIZE - 1):
        grid[i, :] = original[i+1, :]

    for j in range(_SIZE):
        grid[-1, : j] = random.randint(1,_SIZE)
            
    
    return grid, 0

In [221]:
def play():
    
    
    s = _stats
    s.reset()
    game_over = False
    col = 0
    #grid = recreate_grid()
    grid = generate_init_grid(_SIZE)
    
    
    while not game_over:
        s.ball_drop() #stats
        need_another_col = True
        grid = update_grid(grid)
        if s.ball_count % _BALLS_TO_LEVELUP == 0:
            s.level_up()
            print(grid, s.ball_count, file=open(_outfile, "a"))
            print("LEVEL UP", file=open(_outfile, "a"))
            grid, game_over = level_up(grid)
            print(grid, s.ball_count, file=open(_outfile, "a"))
            if game_over:
                break
            grid = update_grid(grid) #update grid after Leveling up
            
        ball = generate_next_ball(_SIZE)
        print("Ball is ", ball, "column is", col, "nz", nz(grid), file=open(_outfile, "a"))
        
        while need_another_col:
            col = get_column_to_drop_ball(grid, ball)
            grid, game_over, need_another_col = drop_ball_in_column(grid, col, ball)
            

    print("GAME OVER")
    print(grid)
    print(np.count_nonzero(grid))
    print("DONE")
    print(s.ball_count, s.levelup_count)
          

In [223]:
%%time
play()

GAME OVER
[[0 0 4 0 0 0 0]
 [0 0 5 7 0 0 0]
 [0 0 4 4 7 0 0]
 [0 0 2 2 7 0 0]
 [0 0 1 2 1 0 6]
 [0 0 1 1 1 0 4]
 [5 5 5 1 1 5 4]]
24
DONE
1040 52
Wall time: 1.91 s


In [175]:
s.ball_count

1

In [None]:
vec = [7,2,3,4,4,5,6]
inplace_explosions(vec)

[531...
[

In [None]:
vec = [0, 0, 0, 4, 7, 4, 7]
vec = [4, 1, 4, 3, 7, 7, 2]

vec = [int(x) for x in list('4327147')]
#apply_explosions_to_line(vec, HORIZ)
update_line(vec, 1)

In [None]:
generate_init_grid(4)


In [None]:
class Drop7(object):
    """
    Class Drop7 is the actual game.
    
    
    """
    def __init__(self, grid_size=7):
        self.grid_size = grid_size
        self.reset()

    def _update_state(self, action):
        """
        Input: action and states
        Ouput: new states and reward
        """
        state = self.state
        if action == 0:  # left
            action = -1
        elif action == 1:  # stay
            action = 0
        else:
            action = 1  # right
        f0, f1, basket = state[0]
        new_basket = min(max(1, basket + action), self.grid_size-1)
        f0 += 1
        out = np.asarray([f0, f1, new_basket])
        out = out[np.newaxis]

        assert len(out.shape) == 2
        self.state = out

    def _draw_state(self):
        im_size = (self.grid_size,)*2
        state = self.state[0]
        canvas = np.zeros(im_size)
        canvas[state[0], state[1]] = 1  # draw fruit
        canvas[-1, state[2]-1:state[2] + 2] = 1  # draw basket
        return canvas
        
    def _get_reward(self):
        fruit_row, fruit_col, basket = self.state[0]
        if fruit_row == self.grid_size-1:
            if abs(fruit_col - basket) <= 1:
                return 1
            else:
                return -1
        else:
            return 0

    def _is_over(self):
        if self.state[0, 0] == self.grid_size-1:
            return True
        else:
            return False

    def observe(self):
        canvas = self._draw_state()
        return canvas.reshape((1, -1))

    def act(self, action):
        self._update_state(action)
        reward = self._get_reward()
        game_over = self._is_over()
        return self.observe(), reward, game_over

    def reset(self):
        n = np.random.randint(0, self.grid_size-1, size=1)
        m = np.random.randint(1, self.grid_size-2, size=1)
        self.state = np.asarray([0, n, m])[np.newaxis]
        
    def get_state(self):
        return self.state

In [None]:
class Catch(object):
    """
    Class Catch is the actual game.
    
    
    """
    def __init__(self, grid_size=7):
        self.grid_size = grid_size
        self.reset() # redraws the grid...

    def _update_state(self, action):
        """
        Input: action and states
        Ouput: new states and reward
        
        This is typically called by act().
        """
        state = self.state
        print("state before action", self.state)
        if action == 0:  # left
            action = -1
        elif action == 1:  # stay
            action = 0
        else:
            action = 1  # right
        fr, fc, basket = state[0]
        new_basket = min(max(1, basket + action), self.grid_size-1)
        fr += 1 #ball drops down
        out = np.asarray([fr, fc, new_basket])
        out = out[np.newaxis]

        assert len(out.shape) == 2
        self.state = out

    def _draw_state(self):
        im_size = (self.grid_size,)*2
        state = self.state[0]
        canvas = np.zeros(im_size)
        canvas[state[0], state[1]] = 1  # draw fruit
        canvas[-1, state[2]-1:state[2] + 2] = 1  # draw basket
        return canvas
        
    def _get_reward(self):
        fruit_row, fruit_col, basket = self.state[0]
        print(self.state[0])
        # only the final action (catch or drop the ball) gets a nonzero reward
        if fruit_row == self.grid_size-1: 
            if abs(fruit_col - basket) <= 1:
                return 1
            else:
                return -1
        else:
            return 0

    def _is_over(self):
        if self.state[0, 0] == self.grid_size-1:
            return True
        else:
            return False

    def observe(self):
        canvas = self._draw_state()
        return canvas.reshape((1, -1))

    def act(self, action):
        self._update_state(action)
        reward = self._get_reward()
        game_over = self._is_over()
        return self.observe(), reward, game_over

    def reset(self):
        n = np.random.randint(0, self.grid_size-1, size=1) #column of the Ball
        m = np.random.randint(1, self.grid_size-2, size=1) # column of Basket center
        self.state = np.asarray([0, n, m])[np.newaxis]
        
    def get_state(self):
        return self.state

In [None]:
env = Drop7()
env.get_state()

In [None]:
# Define environment/game
grid_size = 7
env = Drop7(grid_size)

In [None]:
env.reset()

In [None]:
env.observe()

In [None]:
env._draw_state()

In [None]:
env.reset()

In [None]:
env.act(-1)
env._draw_state()
env.get_state(), env._get_reward()


In [None]:
env.observe()

In [None]:
env.get_state()[0,1]

In [None]:
c

In [None]:
state =  np.asarray([0, 1, 2])[np.newaxis]

In [None]:
np.asarray([1,1,1]).shape, np.asarray([1,1,1])[np.newaxis][np.newaxis].shape

In [None]:
state[0,1]

In [None]:
grid_size=7
im_size = (grid_size,)*2
grid = np.zeros(im_size)
grid

In [None]:
grid[:,2]

In [None]:
grid[2]  = 3

In [None]:
grid[:,2] = 4

In [None]:
grid[6,6] = 6

In [None]:
#temp =  grid[:,1][:-1]
im = (7,7)
a = np.zeros(im)
temp = a[1:, 3]
a

In [None]:
temp

In [None]:
temp = grid[:,3][:-1].copy()

In [None]:
temp

In [None]:
grid[:, 3] = np.zeros(7)

In [None]:
grid[1:, 1] = temp

In [None]:
temp

In [None]:
temp = np.array([1,0,3, 0, 1])
list(zip(temp[:-1], temp[1:]))

In [None]:
grid.shape[1]

In [None]:
for col in g.shape[1]:
    #print(g[:,col])
    g[:, col] = apply_gravity_to_column(g[:, col])

print(g)

In [None]:
temp[0], temp[1] = temp[1], temp[0]

In [None]:
temp

In [13]:
# this is not being used
def update_line(vec, colflag=True):
    '''
    Apply explosions and Gravity
    Calculate points
    '''
    something_changed = True
    safety_brkr =0
    updated = vec[:]
    while something_changed:        
        something_changed = False
        safety_brkr += 1
        # print(safety_brkr)
                
        change, _, updated = inplace_explosions(updated)
        if(change):
            something_changed = True

        if(colflag):
            change, _, updated = apply_gravity_to_column(updated)
            if(change):
                something_changed = True


        if safety_brkr == 100:
            print("Safety Circuit Breaker. Something is not right")
            break
            
        
    return updated
    