#NES Tetris DQN

###Introduction

###Status:
Implementing base gym environment

Agent will use different attributes of the  current board state to evaluate the EV of each possible move, and then executes said move.

Get possible moves -> evaluate the state of each move -> select the move with the best EV

###Implemented:
- Base board state
- Added new piece creation and handling of next pieces
- Added basic rendering of board and piece info
- Added moving and rotating of pieces
- Added validation of pieces and moves
- Added getting of valid moves in current state
- Added wrapper function for moving piece in board
- Added checking whether a piece should be set on the board
- Added above function into wrapper move function in board
- Added checking whether a piece should end the game / top off
- Added reverting of move when illegal move is made (used when player tops off)
- Added natural piece drop (available moves will reflect the natural drop if present)
- Added line clears and checking for line clears
- Added updating of levels
- Added scoring system
- Added returning of board state
- Added option to force single direction movement in get_available_moves
- Added openai gym env
- Added agent
- Added setup and learning of agent
- Tidied up training
- Attempt to tune the agent for better performance
- Add features from Dellacherie's Algorithm for agent
- Fixed board states not having a new copy of the original pieces_table

###Issues:

###To Implement:
Add checking in piece class for illegal moves (right now validation is only done at the board level), will need to reference the board by passing in the board in the function parameters

Add support for earlier levels (where it can take several moves before a piece falls down)

Add support for tucks (have to check the frames to see if it can be done)

###Agent Status:
- Contemporary tetris agent's features are being used, however, the result is not very good. Most features are being pulled from Dellacherie's Algorithm, which has proven results.
- This setup of NES Tetris differs from conventional Tetris simulations. These may explain why the agent struggles at the moment. Notable differences are:
  1. Conventional setups do not enforce a forced drop, and thus are able to move pieces perfectly. The current setup will automatically drop pieces depending on the frame count. This constrains the ability for the agent to get pieces into an optimal position, and I believe that this is the major issue that the agent faces. For instance, at level 18, a piece drops 2 cells for every move it makes.
  2. Conventional setups may sometimes allow holding of pieces, while NES does not. This prevents the agent from possibly getting out of unlucky piece sequence, although it is nowhere near that level of doing so.
  3. Certain setups use a random "7-bag" system, which is in place for most modern versions of Tetris. This system enforces the complete set of pieces to be used, albeit randomly arranged. As such, there is consistency in gameplay, whereas the setup used here is truly random, meaning that the agent can lose at some point due to a bad piece sequence.
- When considering a piece, the agent simulates the piece being hard dropped and evaluates it as such.
- Note: The aim of this project is NOT to create an agent that performs as many line clears as possible. The aim is to design an agent that can not only stack efficiently, but also score well with a fixed set of line clears (230 lines for NES Tetris). Ideally, we'd be able to have an agent that can play the entire game of NES Tetris, including the transition and kill screen.

* Personal best
* Total Games:  550
* Took total / per game (seconds): 633.1551685333252 / 12.663103404045104
* Total Steps:  132296
* Epsilon:  0.0001
*
* Average:  533.06 / 230.72
* Median:  303.0 / 0.0
* Mean:  533.06 / 230.72
* Min:  228 / 0
* Max:  2566 / 2163

###Goals:
- Get an agent to stack flat in the first place
- Get an agent to stack efficiently in 9 columns

###Features Explored:
Commonly used features such as landing height and transitions are used. Several more are introduced, which should allow the agent to perform better:
  1. 

- Possible improvements:
  1. Add future moves for the agent to use / consider. Right now, the agent only considers moving left or right once. This can be expanded to moving twice or even thrice. This requires more computation, but can possibly help the agent work better.
  2. Enforce general NES Tetris strategies. This includes adding features that correspond to building on the left, keeping a right well. 

###Emulating a real player
- Players tend to maintain a right well, and stack efficiently in the leftmost 9 columns. This can be done via a binary value, where the agent checks the presence of a right well (or if any well for that matter), and also to check the current piece ID if the right well is present. Typically, right wells are preferred, because it is perceived to be harder to play with a center well. Ideally, the agent just stacks on the 9 left columns, and specifically ignores the right well until it gets an I piece. So if a move will result in a right well being disrupted, then the feature should heavily discourage it. If an I piece is present with the presence of a right well (and possibly the ability to tetris), then the feature should heavily encourage it.

- Players tend to build on the left. The left side of the board is disadvantaged in terms of getting pieces to the desired side. However, players tend to throw pieces to the left at every opportunity, to minimise the risk of topping off because they can easily throw pieces to the right in an emergency. The most basic way to implement this is to just check for the x-coordinate of the resulting move, but this can be potentially dangerous if the agent keeps moving to the left. We can introduce the checking of board height / landing height too, in order to balance this out.

- Players might play low (MIGHT, this is a risk-reward balance that players have to follow). Ideally, players play low and score Tetrises, but they may not be able to do so all the time. A basic implementation of this can just involve the average board height and the landing height too. These features are used in the previous point, so there's kinda a connection between the two.

###Training
- Due to the exploratory nature of Q-learning, the agent generally performs terribly until it stops exploring, due to the random nature of moves. As documented in https://github.com/nuno-faria/tetris-ai, the agent tends to only perform decently towards the end of exploring. This corresponds to what we're seeing with the NES agent too, except that it performs way worse.
- After training, we will need to observe how the agent attempts to move and stack pieces, and then tune the features to try to encourage better gameplay.


###References:
https://codemyroad.wordpress.com/2013/04/14/tetris-ai-the-near-perfect-player/
https://github.com/michiel-cox/Tetris-DQN
https://imake.ninja/el-tetris-an-improvement-on-pierre-dellacheries-algorithm/
https://github.com/nuno-faria/tetris-ai

###Notes:
- get_available_moves_state_info uses a single deepcopy, may have use two separate deepcopies

###General Gameplay and Terminology for NES Tetris:
- NES Tetris is divided into 3 sections: Lvl 18 / Pre-transition, Lvl 19-27 / Post-transition, Lvl 28 / Kill screen
- For human players, there is a limited number of lines that can conceivably be scored even by the best players, due to pieces dropping too fast. This is particularly for lvl 28 play, where each piece drops every frame, making it extremely hard to get pieces into an optimal setup. This is dubbed the "kill screen" where the game essentially ends for most players at that time. 
- Due to this, players aim to score as many points by stacking efficiently and getting as many Tetrises as possible, and the goal is to maintain a board that accomodates as many different shapes for the next piece. As NES tetris uses truly random piece selection, there is no guarantee of getting a desired piece, and often times players end up in "droughts" where they are not given any I / Long Bar pieces which are used to score Tetrises. Hence, an ideal board is one that can accomodate as many shapes for the next piece.
- Due to the initial placement and rotation of pieces, players tend to stack their pieces to the left, keeping what is called a "well" or an empty column on the rightmost column. Players try to stack pieces in such a way that it results in either a relatively flat, or slanted (left high, right low), since it is easier to get a piece to the right than it is to the left. The player aims to only score from the well, and will occasionally block the well in order to not immediately lose the game (this will be explained in a while).
- As players aim to score as many Tetrises, they tend to try to build as high as possible, which is considered to be "aggressive" since it is easier for a player to lose due to an unlucky streak of pieces. This allows players to use I pieces to score Tetrises only.
- "Burning" is a situation where a player intentionally scores non-Tetrises so as to preserve a good board state and survive a recent set of unlucky pieces. This is non-ideal, and players only do this in order to not lose the game immediately. Players tend to block or cover their well in such circumstances 
- Players can employ techniques such as "tucks" and "spins", where the player inserts a piece into holes or problematic board states to fix them. Due to the small timeframe that these moves require to be executed, players generally stray away from them if they are not skilled enough to consistently execute them. Failing such techniques may result in an even worse board state. It is not expected that agents are able to naturally perform such techniques without explicitly specifying features that identify situations where these can be done. Nevertheless, skilled players are able to execute these techniques to get themselves out of terrible board states.
- Generally, NES Tetris requires the player to play in a safe manner, as compared to other versions of Tetris where players are able to execute elaborate moves like spins, with ease. NES Tetris rewards players for their decision making, whilst being punishing for being indecisive and not forward-looking. Players aim to create board states that accomodates bad luck and to also play aggressively to score efficiently. It can be hard for the agent to pick up the nuances that players develop in their decision making, but it would possibly be interesting to see whether an agent can learn these or possibly develop something different.

###Board State Info:
Using the afforementioned overivew of gameplay, several features will be used for the agent:
- Lines Cleared - How many lines will be cleared by the move. Ideally, the agent tries to clear as many lines with the move, or doesn't clear any (so that it doesn't waste lines by burning).
- Bumpiness - The difference between heights of each column. Ideally, this should be a low number so that the board can accomodate different incoming pieces. If a board is too flat, then S / Z pieces can be problematic as they will produce an overhanging cell.
- Holes - How many cells have an empty cell below. Ideally, this should be zero, as the board should be packed efficiently. While holes are fixable with techniques such as tucks, they are generally discouraged.
- Wells - Presence of an empty column for Tetrises. Ideally, this should be present. The agent should try not to cover the well, and hence this feature will be all-or-nothing, in order to try to encourage this behavior.
- Board height - How high the current board is - Ideally, this should be low, so that the agent has more leeway in its play and doesn't lose due to unlucky pieces.
- Row / Column Transitions - How many filled / empty cells that are adjacent to the opposite kind with respect to row / columns. Ideally, this should be low, suggesting that pieces are either packed together well or do not have too many potholes in them:
Example: "XXXXX_____", "XXX_____XX" are good, compared to "X_X_X_X__X", "XX__XX_X_X".

- Piece coordinates - Location of piece and its previous location - Ideally, the agent should move only in 1 direction (to not waste moves)

Possible features that can be looked into:
- Hill - Board state where the board has a gradual slope from the left down to the right. Ideally, this slope is gradual so that the board is not considered bumpy and for ease of setting new pieces. 
- 3: Landing height
- 4: Row transitions
- 5: Column transitions
- 6: Cumulative wells
- 7: Eroded piece cells
- 8: Aggregate height

In [1]:
# imports

# board.py
import random
import copy

# gym environment
import gym
import pandas
from gym import spaces

import numpy as np

# reinforcment learning
import tensorflow as tf
import os
import statistics
from pathlib import Path
import time

In [2]:
# settings
STARTING_LEVEL = 18
IS_DAS_ON = False # delayed auto-shift, not implemented yet

# remove moves that are in opposite directions (IE move left then move right)
IS_ONE_DIRECTION_MOVEMENT = True 

IS_TRUE_RANDOM_PIECES = True # generates pieces randomly

LOOK_FORWARD_SHIFT_MAX_VAL = 3 # considers moving left and right at most N tiles

TRAIN = True

In [3]:
# fixed parameters

# board sizes
BOARD_HEIGHT = 20
BOARD_WIDTH = 10

# frames required for a piece to move down 1 cell vertically
FRAMES_PER_GRIDCELL_Y = [[48, 
   43, 38, 33, 28, 23, 
   18, 13, 8, 6, 5, 
   5, 5, 4, 4, 4, 
   3, 3, 3, 2, 2, 
   2, 2, 2, 2, 2,
   2, 2, 2, 1]]

# delayed auto-shift, forces first move to be delayed to 16 frames
DAS_DELAY = 16

FRAME_DELAY = 6 # how many frames it takes per move

# initial lines needed for first level increase
LINES_FIRST_LEVEL_JUMP = [[10,
   20, 30, 40, 50, 60, 
   70, 80, 90, 100, 100,
   100, 100, 100, 100, 100, 
   110, 120, 130, 140, 150,
   160, 170, 180, 190, 200,
   200, 200, 200]]

LINES_PER_LEVEL = 10 # how many lines per subsequent level increase


# reinforcement learning
ACTION_SIZE = 5
STATE_SIZE = 8

# colab paths
WEIGHT_PATH = os.path.join(os.getcwd(), 'weights.h5')
IMAGE_PATH = os.path.join(os.getcwd(), 'model.png')
LOG_DIR = os.path.join(os.getcwd(), 'logs')

#WEIGHT_PATH = os.path.join(os.path.dirname(__file__), 'weights.h5')
#IMAGE_PATH = os.path.join(os.path.dirname(__file__), 'model.png')
#LOG_DIR = os.path.join(os.path.dirname(__file__), 'logs')

In [4]:
# the possible Shapes of a Piece, these are generated as a list of templates
# id - the class ID of the piece
# orientations - all the possible orientations
class Shape:
  def __init__(self, id, orientations):
      self.id = id
      self.max_rotations = len(orientations)
      self.orientations = orientations
      self.shape_coord = [] # contains the coordinates of spaces filed by the shape
      
      # width and height of the Piece
      self.width = len(orientations[0][0])
      self.height = len(orientations[0])

      self.generate_shape_coord()

  # returns the shape orientation of a piece, given a rotation number
  def get_orientation(self, rotation):
      return self.orientations[rotation % self.max_rotations]

  # get the coordinates of cells occupied by a shape
  def get_coord_occupied(self, rotation):
      return self.shape_coord[rotation % self.max_rotations]

  # generates shape coord all at once
  def generate_shape_coord(self):
      for rotation in range(self.max_rotations):
          self.shape_coord.append(list(self.generate_shape_coord_iterative(rotation)))

  # generate the occupied coordinates of a shape
  def generate_shape_coord_iterative(self, rotation):
      orientation = self.get_orientation(rotation)
      width = self.width
      height = self.height
      for offset_x in range(width):
        for offset_y in range(height):
          if orientation[offset_y][offset_x] != ' ':
            yield offset_y, offset_x

  def get_id(self):
    return self.id

# the different possible Shapes of Pieces and their orientations
# id - 0 / empty, 1 / I, 2 / O, 3 / T, 4 / S, 5 / Z, 6 / J, 7 / L
# first shape is empty

# shape coords
SHAPE_NULL_COORDS = [[' ']]

SHAPE_I_COORDS = [[
    '    ',
    '    ',
    '####',
    '    ',
], [
    '  # ',
    '  # ',
    '  # ',
    '  # ',
]]

SHAPE_O_COORDS = [[
    '##',
    '##',
]]

SHAPE_T_COORDS = [[
    '   ',
    '###',
    ' # ',
], [
    ' # ',
    '## ',
    ' # ',
], [
    ' # ',
    '###',
    '   ',
], [
    ' # ',
    ' ##',
    ' # ',
]]

SHAPE_S_COORDS = [[
    '   ',
    ' ##',
    '## ',
], [
    ' # ',
    ' ##',
    '  #',
]]

SHAPE_Z_COORDS = [[
    '   ',
    '## ',
    ' ##',
], [
    '  #',
    ' ##',
    ' # ',
]]

SHAPE_J_COORDS = [[
    '   ',
    '###',
    '  #',
], [
    ' # ',
    ' # ',
    '## ',
], [
    '#  ',
    '###',
    '   ',
], [
    ' ##',
    ' # ',
    ' # ',
]]

SHAPE_L_COORDS = [[
    '   ',
    '###',
    '#  ',
], [
    '## ',
    ' # ',
    ' # ',
], [
    '  #',
    '###',
    '   ',
], [
    ' # ',
    ' # ',
    ' ##',
]]

SHAPE_NULL = Shape(0, SHAPE_NULL_COORDS)

SHAPE_I = Shape(1, SHAPE_I_COORDS)

SHAPE_O = Shape(2, SHAPE_O_COORDS)

SHAPE_T = Shape(3, SHAPE_T_COORDS)

SHAPE_S = Shape(4, SHAPE_S_COORDS)

SHAPE_Z = Shape(5, SHAPE_Z_COORDS)

SHAPE_J = Shape(6, SHAPE_J_COORDS)

SHAPE_L = Shape(7, SHAPE_L_COORDS)

# the list of templated shapes
SHAPES = [SHAPE_I, SHAPE_O, SHAPE_T, SHAPE_S, SHAPE_Z, SHAPE_J, SHAPE_L]
SHAPES_ID = [SHAPE_NULL, SHAPE_I, SHAPE_O, SHAPE_T, SHAPE_S, SHAPE_Z, SHAPE_J, SHAPE_L]
SHAPES_COORDS = [SHAPE_NULL_COORDS, SHAPE_I_COORDS, SHAPE_O_COORDS, SHAPE_T_COORDS, 
                 SHAPE_S_COORDS, SHAPE_Z_COORDS, SHAPE_J_COORDS, SHAPE_L_COORDS]
SHAPES_NAMES = ["Empty", "Long bar", "O", "T", "S", "Z", "J", "L"]

In [5]:
# the piece class itself, contains data on its coordinates and rotations
class Piece:
  def __init__(self, id, x = 0, y = 0, rotation = 0):
    self.x = x # coordinates of the top left corner of piece
    self.y = y
    self.id = id
    self.rotation = rotation
    self.shape_coords = None # the coordinates of the shape of the Piece
    self.coords = []
    self.prev_coords = []

    # general direction that the piece has been moving
    self.is_left = False
    self.is_right = False

  # rotates the piece, +1 is clockwise
  def rotate(self, rotation_val):
    if rotation_val == -1 or rotation_val == 1:
      self.rotation += rotation_val
      self.rotation = self.rotation % len(SHAPES_COORDS[self.id])

  # update the coordinates after a move and returns both the previous and
  # the new coordinates
  def move(self, x, y, rotation_val):
    self.prev_coords = self.coords
    self.x += x
    self.y += y
    self.rotate(rotation_val)
    self.coords = self.update_coords()

    if x <= -1 and x >= -LOOK_FORWARD_SHIFT_MAX_VAL:
      self.is_left = True
      self.is_right = False
    elif x >= 1 and x <= LOOK_FORWARD_SHIFT_MAX_VAL:
      self.is_left = False
      self.is_right = True

    return self.prev_coords, self.coords

  # returns the coordinates of cells occupied by a piece
  def get_coords(self):
    return self.coords

  # returns the previous coordinates of cells occupied by a piece
  def get_prev_coords(self):
    return self.prev_coords

  def get_rotation(self):
    return self.rotation

  # updates the coordinates of a piece based on its current x / y coordinates
  def update_coords(self):
    self.coords = []
    self.shape_coords = SHAPES_ID[self.id].get_coord_occupied(self.rotation)
    for coord in self.shape_coords:
      self.coords.append([self.y + coord[0], self.x + coord[1]])

    return self.coords

  # simulates a move and returns updated coords without touching original data
  def simulate_move(self, x, y, rotation_val):
    curr_x = self.x + x
    curr_y = self.y + y
    curr_rotation = self.rotation + rotation_val
    curr_rotation = curr_rotation % len(SHAPES_COORDS[self.id])

    coords = []
    shape_coords = SHAPES_ID[self.id].get_coord_occupied(curr_rotation)
    for coord in shape_coords:
      coords.append([curr_y + coord[0], curr_x + coord[1]])

    return coords

  # validates that the current state is legal
  # note: not working, need to insert reference to board for var "board"
  def is_valid_state(self):
    for coord in self.coords:
      x = coord[1]
      y = coord[0]

      if x < 0 or x >= BOARD_WIDTH or y < 0 or y >= BOARD_HEIGHT:
        return False
      if board[y][x] != 0 and board[y][x] != 8:
        return False

    return True

  # calculates the amount that a piece will fall after a move
  def get_y_coord_shift(self):
    return 1 # return a single drop for the time being for simplicity
    #return FRAME_DELAY / FRAMES_PER_GRIDCELL_Y[STARTING_LEVEL]

  def get_id(self):
    return self.id

  # prints metadata of piece
  def print_info(self):
    print("current id is:", self.get_id(), "/", SHAPES_NAMES[self.id],
          "Rotation:", self.rotation)
    print("coords:", self.get_coords())

    print("current piece arrangement")
    for line in SHAPES_COORDS[self.get_id()][self.rotation]:
      print("|", line, "|")

  # prints metadata of a simulated move
  def print_simulated_move(self, x, y, rotation_val):
    rotation = self.rotation + rotation_val
    rotation = rotation % len(SHAPES_COORDS[self.id])

    print("current id is:", self.get_id(), "/", SHAPES_NAMES[self.id],
          "Rotation:", rotation)
    print("coords:", self.simulate_move(x, y, rotation_val))

    print("current piece arrangement")
    for line in SHAPES_COORDS[self.get_id()][rotation]:
      print("|", line, "|")

  # prints the current direction that the piece is heading
  def print_direction(self):
    if self.is_left:
      print("left")
    elif self.is_right:
      print("right")
    else:
      print("neutral")

In [6]:
# the board contains the playing field
PIECE_ID_CURRENT = 8 # used to identify the current piece
PIECE_ID_EMPTY = 0
PIECE_OFFSET_INITIAL = [0, 2, 0, 1, 1, 1, 1, 1] # y offset needed for initial placement

MOVES = []

class Board:
  def __init__(self, height, width):
    self.height = height
    self.width = width

    self.pieces_table = [[0 for i in range(width)] for j in range(height)]

    self.piece = None
    self.piece_next = None

    self.ticks = 0 # the current frame count
    self.level = STARTING_LEVEL
    self.move_count = 0
    self.piece_count = 0
    self.line_clears = 0
    self.line_clears_for_next_level = LINES_FIRST_LEVEL_JUMP[0][self.level]

    self.scoring_system = self.generate_scoring_system()
    self.score = 0
    self.game_over = False

    # unused as holding pieces are not allowed in NES tetris
    self.piece_holding = None
    self.piece_last = None
    self.can_hold = False

    # generate the random bag of pieces
    self.bag = self.generate_bag()
    self.piece = self.create_piece()
    self.piece_next = self.create_piece()

  # generates a full bag of 7 pieces 
  def generate_bag(self):
    random_shapes = list(SHAPES)
    random.shuffle(random_shapes)

    bag = []
    # generate bag based on whether true random or fixed
    if IS_TRUE_RANDOM_PIECES:
      for i in range(7):
        id = random_shapes[random.randint(0, 6)].get_id()
        bag.append(Piece(id))
    else:
      for shapes in random_shapes:
        bag.append(Piece(shapes.get_id()))
    return bag

  # generates a random piece
  def create_piece(self):
    if not self.bag:
      self.bag = self.generate_bag()
    return self.bag.pop()

  # put the new piece on the board and then update the next piece
  def place_new_piece(self):
    # swap piece and then fill the bag if needed
    self.piece = self.piece_next
    self.piece_next = self.create_piece()

    # place the piece in the middle of the board
    self.piece.move(int(self.width / 2), 0 - PIECE_OFFSET_INITIAL[self.piece.get_id()], 0)

    if not self.update_board():
      self.game_over = True
      return

    self.piece_count += 1
    self.ticks = 0

  # moves the piece, assumes that move is legal and expects a tuple of (x, y, rotation)
  def move_piece(self, move, verbose = False):
    self.get_current_piece().move(move[0], move[1], move[2])
    self.update_board()
    self.lines_cleared_recently = 0

    if self.is_piece_set():
      self.set_piece()
      self.lines_cleared_recently = self.update_line_clear()
      self.update_score(self.lines_cleared_recently)      
      self.place_new_piece()

    if verbose:
      self.print_current_piece_state()
      print("cleared lines", lines_cleared)
      self.render_board()

  # sets the piece on the board, assumes that piece clears check for being set
  def set_piece(self):
    for coord in self.piece.get_coords():
      self.pieces_table[coord[0]][coord[1]] = self.piece.get_id()

  # manually sets the next piece, only done when the next piece has no 
  # available moves, and cannot be set because of this
  def force_set_next_piece(self):
    if self.is_piece_set():
      self.set_piece()

  # updates the current piece's coords on the board, and returns status
  def update_board(self):
    for coord in self.piece.get_prev_coords():
      if self.pieces_table[coord[0]][coord[1]] == PIECE_ID_CURRENT:
        self.pieces_table[coord[0]][coord[1]] = PIECE_ID_EMPTY
      else:
        #self.reset_move(self.piece.get_coords(), self.piece.get_prev_coords())
        return False

    for coord in self.piece.get_coords():
      if self.pieces_table[coord[0]][coord[1]] > PIECE_ID_EMPTY and self.pieces_table[coord[0]][coord[1]] < PIECE_ID_CURRENT:
        self.reset_move(self.piece.get_coords(), self.piece.get_prev_coords())
        return False
      else:
        self.pieces_table[coord[0]][coord[1]] = PIECE_ID_CURRENT

    self.move_count += 1
    return True

  # runs through the board, clears any filled lines and returns how many that were cleared
  def update_line_clear(self):
    lines_to_clear = [] # keep track of previous lines to not double-count
    lines_cleared = []

    # run through all coords of the current piece to get the lines to clear
    for coord in self.piece.get_coords():
      if self.is_line_clear(coord[0]) and coord[0] not in lines_to_clear:
        lines_to_clear.append(coord[0])

    # clear the lines
    lines_to_clear.reverse()
    for line in lines_to_clear:
      lines_cleared.append(self.clear_line(line))

    # add back lines
    self.add_empty_lines(len(lines_to_clear))

    self.line_clears += len(lines_to_clear)

    return len(lines_to_clear)

  # clears the specified line
  def clear_line(self, line_number):
    return self.pieces_table.pop(line_number)

  # adds an empty line
  def add_empty_lines(self, lines_to_add):
    for i in range(lines_to_add):
      self.pieces_table.insert(0, [0 for i in range(self.width)])

  # checks and updates level according to line clears
  def update_level(self):
    if self.lines >= self.line_clears_for_next_level:
      self.level += 1
      self.line_clears_for_next_level += 10

  # resets the previous move made, because it was illegal and executed halfway
  # this does not revert any overwritten cells that were previously occupied by a piece
  def reset_move(self, coords, prev_coords):
    # revert new state
    for coord in coords:
      if self.pieces_table[coord[0]][coord[1]] == PIECE_ID_CURRENT:
        self.pieces_table[coord[0]][coord[1]] = PIECE_ID_EMPTY

    # restore previous state
    for coords in prev_coords:
      self.pieces_table[coord[0]][coord[1]] = PIECE_ID_CURRENT

  # update frame tick count and move the piece down
  def update_frame_count(self):
    if IS_DAS_ON:
      self.ticks += DAS_DELAY
    self.ticks += FRAME_DELAY

  # updates the score based on the level
  def update_score(self, lines_cleared):
    self.score += self.scoring_system[lines_cleared]

  # returns a list of points given for line clears for the current level
  def generate_scoring_system(self):
    return [0, (40 * self.level + 1), (100 * self.level + 1), (300 * self.level + 1), (1200 * self.level + 1)]

  # return new final board state for state info
  def generate_state_info_board(self, move, new_coords, pieces_table_copy):
    # remove the piece
    for coord in self.piece.get_coords():
      pieces_table_copy[coord[0]][coord[1]] = PIECE_ID_EMPTY

    # try to do a hard drop
    hard_drop_value = 0
    for i in range(self.height):
      is_valid = True
      for coord in new_coords:
        if coord[0] + hard_drop_value + 1 >= self.height or pieces_table_copy[coord[0] + hard_drop_value + 1][coord[1]] !=  PIECE_ID_EMPTY:
          is_valid = False
          break

      if is_valid:
        hard_drop_value += 1
    
    # update the coords
    landing_height = 0
    for coord in new_coords:
      pieces_table_copy[coord[0] + hard_drop_value][coord[1]] = PIECE_ID_CURRENT
      landing_height = max(landing_height, coord[0] + hard_drop_value)

    #print("move", move)
    #self.render_given_board(pieces_table_copy)

    return landing_height, pieces_table_copy

  # prints out the entire board
  def render_board(self):
    for i in range(self.height):
      line_to_print = "|"
      for j in range(self.width):
        id = self.pieces_table[i][j]
        if id == PIECE_ID_EMPTY:
          line_to_print += " |"
        elif id == PIECE_ID_CURRENT:
          line_to_print += "=|"
        else:
          line_to_print += "X|"
      
      print(line_to_print)

  # renders a given board (for simulated boards)
  def render_given_board(self, board):
    for i in range(self.height):
      line_to_print = "|"
      for j in range(self.width):
        id = board[i][j]
        if id == PIECE_ID_EMPTY:
          line_to_print += " |"
        elif id == PIECE_ID_CURRENT:
          line_to_print += "=|"
        else:
          line_to_print += "X|"
      
      print(line_to_print)

  def print_current_piece_state(self):
    self.piece.print_info()

  def print_next_piece_state(self):
    self.piece_next.print_info()

  # validate if the move is legal or not
  def is_valid_move(self, piece, move, board, verbose = False):
    # ignore the elusive O spin
    if piece.get_id == 2 and move[2] != 0:
      return False

    coords = piece.simulate_move(move[0], move[1], move[2])
    if verbose:
      piece.print_simulated_move(move[0], move[1], move[2])
    for coord in coords:
      x = coord[1]
      y = coord[0]

      if x < 0 or x >= self.width or y < 0 or y >= self.height:
        return False
      if board[y][x] != 0 and board[y][x] != 8:
        return False

    return True

  # returns if the piece should be set into the board
  # piece is considered set if it touches another piece on the board
  def is_piece_set(self):
    for coord in self.piece.get_coords():
      if coord[0] == (self.height - 1):
        return True
      elif self.pieces_table[coord[0] + 1][coord[1]] > PIECE_ID_EMPTY and self.pieces_table[coord[0] + 1][coord[1]] < PIECE_ID_CURRENT:
        return True

    return False

  # returns if the game is over
  def is_game_over(self):
    return self.game_over

  # returns whether the piece should drop on its own and reset tick count if true
  def is_natural_drop(self):
    if self.ticks > FRAMES_PER_GRIDCELL_Y[0][self.level]:
      self.ticks = self.ticks % FRAMES_PER_GRIDCELL_Y[0][self.level]
      return True

    return False

  # returns if a given line in the board is to be cleared
  def is_line_clear(self, line_number):
    for i in range(len(self.pieces_table[line_number])):
      if self.pieces_table[line_number][i] == PIECE_ID_EMPTY:
        return False 
    return True

  # returns a list of tuples of move and state info of each possible move
  # each move is a tuple of 3 elements (x offset, y offset, rotation)
  # state info is a tuple of elements corresponding to information of the resulting board
  def get_available_moves_state_info(self, verbose = False):
    self.update_frame_count() # will move piece down naturally if needed

    # left / right / down / clockwise / anti-clockwise
    moves = self.generate_moves(LOOK_FORWARD_SHIFT_MAX_VAL)
    
    #if self.is_natural_drop():
      #moves = [[-1, 1, 0], [1, 1, 0], [0, 1, 0], [0, 1, 1], [0, 1, -1]]
      #moves = [[-1, 1, 0], [-2, 2, 0], [-3, 3, 0], [1, 1, 0], [2, 2, 0], [3, 3, 0], [0, 1, 0], [0, 1, 1], [0, 1, -1]]
    #else:
      #moves = [[-1, 0, 0], [1, 0, 0], [0, 1, 0], [0, 0, 1], [0, 0, -1]]
      #moves = [[-1, 0, 0], [-2, 0, 0], [-3, 0, 0], [1, 0, 0], [2, 0, 0], [3, 0 ,0], [0, 1, 0], [0, 0, 1], [0, 0, -1]]
    

    # remove moves that are in opposite direction of the piece direction
    if IS_ONE_DIRECTION_MOVEMENT:
      if self.piece.is_left:
        for i in range(LOOK_FORWARD_SHIFT_MAX_VAL):
          moves.pop(LOOK_FORWARD_SHIFT_MAX_VAL)
      elif self.piece.is_right:
        for i in range(LOOK_FORWARD_SHIFT_MAX_VAL):
          moves.pop(0)
      
    legal_moves = []

    for move in moves:
      test_pieces_table = copy.deepcopy(self.pieces_table)
      if self.is_valid_move(self.piece, move, test_pieces_table, verbose = verbose):
        legal_moves.append([move, self.get_board_info(move, test_pieces_table, verbose = verbose)])
    
    return legal_moves

  # returns the set of possible moves based on the level and how many shifts are considered
  def generate_moves(self, look_forward_shift_val):
    moves = []
    moves_left = []
    moves_right = []

    # add left / right
    for i in range(look_forward_shift_val):
      drop = (i + 1) * (3 / FRAMES_PER_GRIDCELL_Y[0][self.level])
      moves_left.append([(i + 1), int(drop), 0])
      moves_right.append([-(i + 1), int(drop), 0])

    # add down / rotates
    for move in moves_left:
      moves.append(move)
    for move in moves_right:
      moves.append(move)
    moves.append([0, 1, 0])
    moves.append([0, int(3 / FRAMES_PER_GRIDCELL_Y[0][self.level]), 1])
    moves.append([0, int(3 / FRAMES_PER_GRIDCELL_Y[0][self.level]), -1])

    return moves

  # returns the state info of the board, to be passed into the agent
  # more state info can be given to the agent if needed:
  # the piece is moved and then hard dropped to evaluate how good the move may be
  def get_board_info(self, move, pieces_table_copy, verbose = False):
    # get the new board state
    coords = self.piece.simulate_move(move[0], move[1], move[2]) # the new coords
    landing_height, simulated_pieces_table = self.generate_state_info_board(move, coords, pieces_table_copy)

    # Lines Cleared - How many lines will be cleared by the move
    lines_cleared = self.get_state_lines_cleared(simulated_pieces_table, coords)

    # Wells - Presence of an empty column for Tetrises
    wells = self.get_state_wells(simulated_pieces_table)

    # Board height - How high the current board is
    board_height = self.get_state_board_height(simulated_pieces_table)

    # Bumpiness - The difference between heights of each column
    bumpiness = self.get_state_bumpiness(simulated_pieces_table)

    # Holes - How many cells have an empty cell below
    holes = self.get_state_holes(simulated_pieces_table)

    # Row Transitions / Column Transitions - How many empty / filled cells are 
    # adjacent to the opposite on the same row
    row_transitions, col_transitions = self.get_state_transitions(simulated_pieces_table)

    # Piece coordinates - Location of piece and its previous location
    coord_x_new = coords[0][1]
    coord_y_new = coords[0][0]

    if verbose:
      print("lines cleared", lines_cleared, "wells", wells)
      print("board height", board_height, "bumpiness", bumpiness)
      print("holes", holes)
      print("coords new (x, y)" , coord_x_new, coord_y_new, "coords (x, y)", coord_x, coord_y)

    #return [lines_cleared, wells, board_height, bumpiness, holes, 
    #        coord_x_new, row_transitions, col_transitions]
    return [landing_height, lines_cleared, holes, row_transitions, col_transitions, 
            bumpiness, board_height, coord_x_new]

  def get_state_lines_cleared(self, board, coords):
    lines_to_clear = [] # keep track of previous lines to not double-count

    # run through all coords of the current piece to get the lines to clear
    for coord in coords:
      if coord[0] not in lines_to_clear:
        is_cleared = True
        for j in range(self.width): # by col
          if board[coord[0]][j] == PIECE_ID_EMPTY:
            is_cleared = False
            break

        if is_cleared:
          lines_to_clear.append(coord[0])

    return len(lines_to_clear)
  
  # returns how many wells there are in the board
  def get_state_wells(self, board):
    wells = 0
    for i in range(len(board[0])): # column
      is_well = True
      for j in range(len(board)): # row
        if board[j][i] != PIECE_ID_EMPTY:
          is_well = False
          break

      if is_well:
        wells += 1

    return wells

  # returns the average height of all non-empty columns
  def get_state_board_height(self, board):
    non_empty_columns = 0
    total_height = 0

    for i in range(len(board[0])): # column
      height = 0
      for j in range(len(board)): # row
        if board[j][i] != PIECE_ID_EMPTY:
          height += 1
      if height == 0:
        non_empty_columns += 1
      else:
        total_height += height

    return total_height / (self.width - non_empty_columns)

  # returns the sum total of differences between height of each column and its adjacent one
  def get_state_bumpiness(self, board):
    bumpiness = 0

    # iterate through each column
    for i in range(self.width):
      prev_height = 0
      height = 0
      for j in range(self.height):
        if board[j][i] != PIECE_ID_EMPTY: # get the first non-empty cell
          height = j
          if i != 0:
            bumpiness += abs(height - prev_height)
          prev_height = height

          break
      
    return bumpiness

  # returns how many cells that have an empty cell below
  def get_state_holes(self, board):
    holes = 0

    for i in range(self.width):
      is_overhang = False # presence of a cell above that is filled
      for j in range(self.height):
        if board[j][i] != PIECE_ID_EMPTY:
          is_overhang = True
        elif is_overhang and board[j][i] == PIECE_ID_EMPTY:
          holes += 1

    return holes

  # returns the row and col transitions
  # the aggregate number of different cells adjacent to one another in row / col
  def get_state_transitions(self, board):
    row_transitions = 0
    col_transitions = 0

    is_next_row_empty = True
    is_next_col_empty = True

    for i in range(self.height - 1):
      for j in range(self.width):
        # row transition
        if j != self.width - 1:
          if board[i][j] == PIECE_ID_EMPTY and board[i][j + 1] != PIECE_ID_EMPTY:
            row_transitions += 1
          elif board[i][j] != PIECE_ID_EMPTY and board[i][j + 1] == PIECE_ID_EMPTY:
            row_transitions += 1

        if board[i + 1][j] == PIECE_ID_EMPTY and board[i + 1][j] != PIECE_ID_EMPTY:
          col_transitions += 1
        elif board[i + 1][j] != PIECE_ID_EMPTY and board[i + 1][j] == PIECE_ID_EMPTY:
          col_transitions += 1

    return row_transitions, col_transitions


  # gets the current piece
  def get_current_piece(self):
    return self.piece

  def get_piece_count(self):
    return self.piece_count

  def get_move_count(self):
    return self.move_count

  def get_lines_cleared(self):
    return self.lines_cleared

  def get_level(self):
    return self.level

  def get_score(self):
    return self.score

  # return the number of lines that have been cleared in the most recent move
  def get_lines_cleared_recently(self):
    return self.lines_cleared_recently

  # returns increase in score from lines cleared
  def get_score_increase(self, lines_cleared):
    return self.scoring_system[lines_cleared]

  def get_deep_copy_pieces_table(self):
    return copy.deepcopy(self.pieces_table)

In [7]:
# gym environment that follows gym interface"""
class NesTetrisEnv(gym.Env):
  metadata = {'render.modes': ['console']}

  def __init__(self):
    super(NesTetrisEnv, self).__init__()
    # Define action and observation space

    # left / right / down / rotate clockwise / anti-clockwise
    self.action_space = spaces.Discrete(ACTION_SIZE)

    # [lines_cleared, wells, board_height, bumpiness, holes, coord_x_new, coord_y_new, coord_x, coord_y, piece_id, piece_rotation]
    self.observation_space = spaces.Discrete(STATE_SIZE)

  # Execute one time step within the environment
  # action takes in move data (x, y, rotation)
  def step(self, action):
    # run the action
    self.board.move_piece(action, verbose = False)

    # return available moves with state info
    moves = self.board.get_available_moves_state_info()

    done = self.board.is_game_over()

    reward = 1
    reward += self.board.get_score_increase(self.board.get_lines_cleared_recently())
    if done:
      reward -= 5

    return np.array(moves), reward, done, {}
    
  # reset the state of the environment to an initial state and return initial observation
  def reset(self, verbose = False):
    self.board = Board(BOARD_HEIGHT, BOARD_WIDTH)
    self.board.place_new_piece()
    
    if verbose:
      self.board.render_board()
      self.board.print_current_piece_state()

    return np.array(self.board.get_available_moves_state_info())
    
  # Render the environment to the screen
  def render(self, mode='human', close=False):
    self.board.render_board()
    

In [8]:
  # reinforcement learning
  class ExperienceBuffer:
    def __init__(self, buffer_size=20000):
      self.buffer = []
      self.buffer_size = buffer_size

    def add(self, experience):
      if len(self.buffer) > self.buffer_size:
        self.buffer.pop(-1)
      self.buffer.append(experience)

    def sample(self, size):
      return random.sample(self.buffer, size)

  class Network:
    def __init__(self, state_size=STATE_SIZE, discount=1, epsilon=1, epsilon_min=0.0001, epsilon_episode_limit=500):
      self.state_size = state_size
      self.model = self.create_model()
      self.discount = discount
      self.epsilon = epsilon
      self.epsilon_min = epsilon_min
      self.epsilon_episode_limit = epsilon_episode_limit
      self.epsilon_decay = (epsilon - epsilon_min) / epsilon_episode_limit
      self.experiences = ExperienceBuffer()
      self.tensorboard = tf.keras.callbacks.TensorBoard(log_dir=LOG_DIR,
                                                        histogram_freq=1000,
                                                        write_graph=True,
                                                        write_images=True)
      
    # setups and returns model
    def create_model(self, verbose = False):
      model =  tf.keras.models.Sequential([
          tf.keras.layers.Dense(32, input_dim=self.state_size, activation='relu'),
          tf.keras.layers.Dense(16, activation='relu'),
          tf.keras.layers.Dense(16, activation='relu'),
          tf.keras.layers.Dense(1, activation='linear'),
      ])

      '''
      
      model =  tf.keras.models.Sequential([
          tf.keras.layers.Dense(64, input_dim=self.state_size, activation='relu'),
          tf.keras.layers.Dense(64, activation='relu'),
          tf.keras.layers.Dense(32, activation='relu'),
          tf.keras.layers.Dense(1, activation='linear'),
      ])
      '''

      model.compile(optimizer='adam', loss='mse', metrics=['mean_squared_error'])

      if verbose:
        model.summary()

      tf.keras.utils.plot_model(model, IMAGE_PATH, show_shapes=True)

      return model

    # returns the best move, unless agent decides to explore
    def act(self, states):
      # no moves
      if len(states) == 0:
        return None, None
      # explore
      if random.uniform(0, 1) < self.epsilon:
        return np.array(states[random.randint(0, len(states) - 1)])

      best_rating = None
      best_state = None
      state_to_use = 0

      ratings = self.predict_ratings(states)
      for i in range(len(states)):
        if best_rating is None or (best_rating is not None and ratings[i] > best_rating):
          best_rating = ratings[i]
          state_to_use = i

      return states[state_to_use][0], states[state_to_use][1]

    # runs the state through the NN, and return the outputs
    def predict_ratings(self, states):
      if len(states[0]) == 1:
        inputs = np.array(states)
      else:
        inputs = np.array([state[1] for state in states])

      # run the prediction, catch ValueError when states contain List() objs because
      # the state contains moves / board info, which are of different dimensions
      # and are not convertible under np.array(), and end up as List()s
      try:
        predictions = self.model.predict(states)
      except ValueError:
        predictions = self.model.predict(np.array([state[1] for state in states]))

      return [predict[0] for predict in predictions]

    # trains for a given number of episodes, returns the amount of steps, reward and scores
    def train(self, env, episodes = 1):
      rewards = []
      scores = []
      steps = 0

      for episode in range(episodes):
        obs = env.reset()
        previous_state = env.board.get_board_info([0, 0, 0], env.board.get_deep_copy_pieces_table())

        done = False
        total_reward = 0

        while not done:
          action, state = self.act(obs)
          if action is None:
            done = True
            steps += 1
            total_reward -= 5
            continue

          obs, reward, done, info = env.step(action)
          self.experiences.add((previous_state, reward, state, done))
          previous_state = state
          steps += 1
          total_reward += reward

        rewards.append(total_reward)
        scores.append(env.board.get_score())

        self.learn()

      return [steps, rewards, scores]

    # load from local .h5
    def load(self):
      if Path(WEIGHT_PATH).is_file():
        self.model.load_weights(WEIGHT_PATH)

    # save to local .h5
    def save(self):
      if not os.path.exists(os.path.dirname(WEIGHT_PATH)):
        os.makedirs(os.path.dirname(WEIGHT_PATH))

      self.model.save_weights(WEIGHT_PATH)

    # model learns about reent experiences
    # batch_size corresponds to the random experiences from experience buffer
    def learn(self, batch_size = 512, epochs = 1):
      if len(self.experiences.buffer) < batch_size: # buffer too small, return
        return

      batch = self.experiences.sample(batch_size)
      train_x = []
      train_y = []

      states = np.array([[[0, 0, 0], x[2]] for x in batch])
      ratings = self.predict_ratings(states)

      for i, (previous_state, reward, next_state, done) in enumerate(batch):
        if not done:
          rating = ratings[i]
          q = reward + self.discount * rating
        else:
          q = reward
        train_x.append(previous_state)
        train_y.append(q)

      self.model.fit(np.array(train_x), np.array(train_y), batch_size=len(train_x), verbose=0,
                    epochs=epochs, callbacks=[self.tensorboard])
      self.epsilon = max(self.epsilon_min, self.epsilon - self.epsilon_decay)

In [None]:
# test code
'''
# non-env testing
# create the board
test_board = Board(BOARD_HEIGHT, BOARD_WIDTH)

# test the rendering
test_board.place_new_piece()
test_board.render_board()
test_board.print_current_piece_state()

# runs the game
while not test_board.is_game_over():

  moves = test_board.get_available_moves_state_info(verbose=False)
  if len(moves) is not 0:
    move_num = random.randint(0, len(moves) - 1)
    #print(moves[move_num])
    #print(test_board.piece.print_direction())
    test_board.move_piece(moves[move_num][0], verbose = False)
  else:
    print("topoff")
    break

print("final")
test_board.render_board()
test_board.print_current_piece_state()
print("pieces used", test_board.get_piece_count())
print("moves used", test_board.get_move_count())
'''

# test env
'''
env = NesTetrisEnv()
obs = env.reset()
network = Network()

done = False
total_games = 0
total_steps = 0
#print("obs", obs)

while not done:
  # get the next action
  if len(obs) != 0:
    move_num = random.randint(0, len(obs) - 1)
  else:
    print("no more moves")
    break

  #action, state = network.act(obs) # when the DQN is finished
  obs, reward, done, info = env.step(obs[move_num][0])
  env.render()
'''
if TRAIN:
  total_episodes = 1000
  epsilon_episode_finish_rate = 0.5
  epsilon_episode_limit = total_episodes * epsilon_episode_finish_rate
    
  env = NesTetrisEnv()
  obs = env.reset()
  network = Network(epsilon=0.95, epsilon_episode_limit=epsilon_episode_limit)
  #network.load()

  done = False
  total_games = 0
  total_steps = 0
  episodes = 50

  while not done:
    time_start = time.time()
    steps, rewards, scores = network.train(env, episodes=episodes)
    total_games += len(scores)
    total_steps += steps
    network.save()


    print("==================")
    print("* Total Games: ", total_games)
    print("* Took total / per game (seconds):", time.time() - time_start, "/", (time.time() - time_start) / episodes)
    print("* Total Steps: ", total_steps)
    print("* Epsilon: ", network.epsilon)
    print("*")
    print("* Average: ", sum(rewards) / len(rewards), "/", sum(scores) / len(scores))
    print("* Median: ", statistics.median(rewards), "/", statistics.median(scores))
    print("* Mean: ", statistics.mean(rewards), "/", statistics.mean(scores))
    print("* Min: ", min(rewards), "/", min(scores))
    print("* Max: ", max(rewards), "/", max(scores))
    print("==================")

    if total_games >=  total_episodes:
      done = True 




* Total Games:  50
* Took total / per game (seconds): 43.692585945129395 / 0.873851728439331
* Total Steps:  6527
* Epsilon:  0.8626091999999996
*
* Average:  125.38 / 0.0
* Median:  122.0 / 0.0
* Mean:  125.38 / 0
* Min:  86 / 0
* Max:  187 / 0
* Total Games:  100
* Took total / per game (seconds): 71.72825694084167 / 1.4345651483535766
* Total Steps:  13472
* Epsilon:  0.7676191999999993
*
* Average:  133.72 / 0.0
* Median:  132.0 / 0.0
* Mean:  133.72 / 0
* Min:  96 / 0
* Max:  175 / 0
* Total Games:  150
* Took total / per game (seconds): 113.31314420700073 / 2.2662629222869874
* Total Steps:  21517
* Epsilon:  0.6726291999999989
*
* Average:  170.14 / 14.42
* Median:  159.0 / 0.0
* Mean:  170.14 / 14.42
* Min:  107 / 0
* Max:  933 / 721
* Total Games:  200
* Took total / per game (seconds): 151.47299551963806 / 3.0294599199295043
* Total Steps:  29932
* Epsilon:  0.5776391999999986
*
* Average:  192.04 / 28.84
* Median:  162.0 / 0.0
* Mean:  192.04 / 28.84
* Min:  90 / 0
* Max:  9

In [None]:

env = NesTetrisEnv()
env.reset()
print(env.board.generate_moves(3))

In [None]:
# load and test
env = NesTetrisEnv()
obs = env.reset()
network = Network()
network.load()

done = False
total_games = 0
total_steps = 0

print("=====obs=====")
print(obs)
env.render()

while not done and total_steps <= 10000:
  # get the next action
  if len(obs) != 0:
    #move_num = random.randint(0, len(obs) - 1)
    action, state = network.act(obs)
  else:
    print("no more moves")
    break

  #action, state = network.act(obs) # when the DQN is finished
  obs, reward, done, info = env.step(action)

  # obs = [landing_height, lines_cleared, holes, row_transitions, col_transitions]
  print("=====obs=====")
  print(obs)
  env.render()
  print("=====================")
  total_steps += 1

print("=====finished=====")
env.render()