In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import tensorflow as tf
import numpy as np
import pygame
from datetime import datetime

from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.specs import array_spec
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym
from tf_agents.trajectories import time_step as ts

tf.compat.v1.enable_v2_behavior()

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
# class CardGameEnv(py_environment.PyEnvironment):

#   def __init__(self):
#     self._action_spec = array_spec.BoundedArraySpec(
#         shape=(), dtype=np.int32, minimum=0, maximum=1, name='action')
#     self._observation_spec = array_spec.BoundedArraySpec(
#         shape=(1,), dtype=np.int32, minimum=0, name='observation')
#     self._state = 0
#     self._episode_ended = False

#   def action_spec(self):
#     return self._action_spec

#   def observation_spec(self):
#     return self._observation_spec

#   def _reset(self):
#     self._state = 0
#     self._episode_ended = False
#     return ts.restart(np.array([self._state], dtype=np.int32))

#   def _step(self, action):

#     if self._episode_ended:
#       # The last action ended the episode. Ignore the current action and start
#       # a new episode.
#       return self.reset()

#     # Make sure episodes don't go on forever.
#     if action == 1:
#       self._episode_ended = True
#     elif action == 0:
#       new_card = np.random.randint(1, 11)
#       self._state += new_card
#     else:
#       raise ValueError('`action` should be 0 or 1.')

#     if self._episode_ended or self._state >= 21:
#       reward = self._state - 21 if self._state <= 21 else -21
#       return ts.termination(np.array([self._state], dtype=np.int32), reward)
#     else:
#       return ts.transition(
#           np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)

In [3]:
#set up game area layout
# By changing the block_size other aspects will scale
block_size = 30    # Size of side of square
blocks_w = 10      # game width (in blocks)
blocks_h = 20      # game height (in blocks)
border_w = 500
border_h = 100
end_wait = 2000

play_w = blocks_w * block_size  # Width is 10 blocks
play_h = blocks_h * block_size  # Height is 20 blocks
full_w = 2 * border_w + play_w
full_h = 2 * border_h + play_h + 50
top_left_x = border_w
top_left_y = 2 * border_h
start_x = int(blocks_w / 2)

NO_KEY = 0
LEFT_KEY = 1
RIGHT_KEY = 2
DOWN_KEY = 3
UP_KEY = 4


I = [['..0..',
      '..0..',
      '..0..',
      '..0..',
      '.....'],
     ['.....',
      '0000.',
      '.....',
      '.....',
      '.....']]
#J
J = [['.....',
      '.0...',
      '.000.',
      '.....',
      '.....'],
     ['.....',
      '..00.',
      '..0..',
      '..0..',
      '.....'],
     ['.....',
      '.....',
      '.000.',
      '...0.',
      '.....'],
     ['.....',
      '..0..',
      '..0..',
      '.00..',
      '.....']]
#L
L = [['.....',
      '.....',
      '.000.',
      '.0...',
      '.....'],
     ['.....',
      '.00..',
      '..0..',
      '..0..',
      '.....'],
     ['.....',
      '...0.',
      '.000.',
      '.....',
      '.....'],
     ['.....',
      '..0..',
      '..0..',
      '..00.',
      '.....']]
#O
O = [['.....',
      '.....',
      '.00..',
      '.00..',
      '.....']]
#S
S = [['.....',
      '..0..',
      '..00.',
      '...0.',
      '.....'],
     ['.....',
      '.....',
      '..00.',
      '.00..',
      '.....']]
#T
T = [['.....',
      '..0..',
      '.000.',
      '.....',
      '.....'],
     ['.....',
      '..0..',
      '..00.',
      '..0..',
      '.....'],
     ['.....',
      '.....',
      '.000.',
      '..0..',
      '.....'],
     ['.....',
      '..0..',
      '.00..',
      '..0..',
      '.....']]
#Z
Z = [['.....',
      '.....',
      '.00..',
      '..00.',
      '.....'],
     ['.....',
      '..0..',
      '.00..',
      '.0...',
      '.....']]
shapes = [I, J, L, O, S, T, Z]
shapes_color = [(255,0,0),(0,255,0),
                (0,0,255),(0,255,255),
                (255,0,255),(255,255,0),
                (128,0,128)]
pygame.font.init()

In [4]:
class Piece(object):
    def __init__ (self, x, y, shape):
        self.x = x
        self.y = y
        self.shape = shape
        self.color = shapes_color[shapes.index(shape)]
        self.rotation = int(np.random.rand()*len(shape))

def create_grid(locked_pos = {}):
    grid = [[(0,0,0) for _ in range(blocks_w)] for _ in range(blocks_h)]
    for hgt in range(blocks_h):
        for wid in range(blocks_w):
            if (wid, hgt) in locked_pos:
                clr = locked_pos[(wid,hgt)]
                grid[hgt][wid] = clr
    return grid

def convert_shape_fmt(this_shape):
    posit = []
    fmt = this_shape.shape [this_shape.rotation % len(this_shape.shape)]
    for i, line in enumerate(fmt):
        row = list(line)
        for j, col in enumerate(row):
            if col == '0':
                posit.append((int(this_shape.x + j), int(this_shape.y + i)))
                             
    for i, pos in enumerate(posit):
        posit[i] = (int(pos[0]-2), int(pos[1]-4))
    return posit

def valid_space(grid, piece):
    accepted_pos = [[(j,i) for j in range (blocks_w) if grid[i][j] == (0,0,0)] for i in range (blocks_h)]
    accepted_pos = [j for sub in accepted_pos for j in sub]
    formatted = convert_shape_fmt(piece)
    for pos in formatted:
        if pos not in accepted_pos:
            if pos[1] > -1:
                return False
    return True

def check_lost(positions):
    for pos in positions:
        x, y = pos
        if y < -1:
            return True
    return False

def get_shape():
    return Piece(start_x, 0, np.random.choice(shapes))

def draw_text_middle(surface, text, size, color):
    font = pygame.font.SysFont('Arial',size, bold = True)
    bolded = font.render(text, 1, color)
    surface.blit(bolded,((full_w-bolded.get_width())/2, (full_h-bolded.get_height())/2))

def draw_grid(surface, grid):
    for hgt in range(blocks_h+1):
        pygame.draw.line(surface,(128,128,128), 
                         (top_left_x, top_left_y + hgt*block_size), 
                         (top_left_x + blocks_w*block_size, top_left_y + hgt*block_size))
    for wid in range(blocks_w+1):
        pygame.draw.line(surface,(128,128,128), 
                         (top_left_x + wid*block_size, top_left_y), 
                         (top_left_x + wid*block_size, top_left_y + blocks_h*block_size))


def clear_rows(grid, locked):

    inc = 0
    # backward (bottom to top scan)
    for i in range(len(grid)-1, -1, -1):
        row = grid[i]
        if (0, 0, 0) not in row:
            inc += 1
            for j in range(len(row)):
                del locked[(j,i)]
        else:
            if inc > 0:
                k = i+inc
                for j in range(len(row)):
                    if (j,i) in locked:
                        locked[(j,k)] = locked[(j,i)]
                        del locked[(j,i)]

    return inc, locked

def draw_next_shape(surface, shp):
    font = pygame.font.SysFont('Arial',60)
    next_s = font.render('Next Shape:',1, (255,255,255))
    next_x = top_left_x + play_w + 150
    next_y = top_left_y + play_h/2 - 150
    fmt = shp.shape[shp.rotation]
    
    for i, line in enumerate(fmt):
        row = list(line)
        for j,col in enumerate(row):
            if col == '0':
                pygame.draw.rect(surface, shp.color, (next_x + j*block_size, 
                                 next_y + i*block_size, 
                                 block_size, block_size), 0)
    surface.blit(next_s, (next_x - 50, next_y - 100))

def add_score(score):
    now = datetime.now()
    dt_string = now.strftime("%Y/%m/%d %H:%M:%S\n")
    with open('data/score_log','a') as f:
        f.writelines(str(int(score))+','+dt_string)

def draw_window(surface, grid, score):
    surface.fill((0,0,0))
    pygame.font.init()
    font = pygame.font.SysFont('Arial',60)
    title = font.render('Tetris',1, (255,255,255))
    surface.blit(title,(full_w/2 - title.get_width()/2, 20))
    
    font = pygame.font.SysFont('Arial',60)
    score_s = font.render('Score:',1, (255,255,255))
    score_x = 50
    score_y = top_left_y + 50
    surface.blit(score_s, (score_x, score_y))
    score_s = font.render(str(score),1, (255,255,255))
    score_x = 100
    score_y += 100
    surface.blit(score_s, (score_x, score_y))
    
    for hgt in range(blocks_h):
        for wid in range(blocks_w):
            pygame.draw.rect(surface, grid[hgt][wid],
                             (top_left_x + wid*block_size, 
                            top_left_y + hgt*block_size, 
                            block_size, block_size), 0)
    pygame.draw.rect(surface, (255,0,0), (top_left_x, top_left_y, play_w, play_h), 4)
    draw_grid(surface, grid)
#     pygame.display.update()

In [5]:
class Tetris(py_environment.PyEnvironment):

    def __init__(self):
        self._action_spec = array_spec.BoundedArraySpec(
            shape=(), dtype=np.int32, minimum=0, maximum=4, name='action')
        self._observation_spec = array_spec.BoundedArraySpec(
            shape=(blocks_h,blocks_w), dtype=np.int32, minimum=0, name='observation')
        self._state = 0
        self._episode_ended = False
        win = pygame.display.set_mode((full_w, full_h))
        pygame.display.set_caption('Tetris')
        win.fill((0, 0, 0))
#         draw_text_middle(win,"Press any key to Start", 60, (255, 255, 255))
        self.locked_blocks = {}
        self.grid = create_grid(self.locked_blocks)

        self.change_piece = False
#     run = True
        self.next_piece = get_shape()
        self.curr_piece = get_shape()
        pygame.display.update()
    
#     clock = pygame.time.Clock()
#     fall_time = 0
#     fall_speed =.27
#     level_time = 0
#     score = 0

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self.grid
#         return self._observation_spec

    def _reset(self):
        self._state = 0
        self._episode_ended = False
        return ts.restart(np.array([self._state], dtype=np.int32))

    def _step(self, action):

#     if self._episode_ended:
#       # The last action ended the episode. Ignore the current action and start
#       # a new episode.
#       TETRIS BOARD CLOGGED
#       return self.reset()

#     # Make sure episodes don't go on forever.     
#     TETRIS GAME WILL END
#     if action == 1:
#       self._episode_ended = True
#     elif action == 0:
#       new_card = np.random.randint(1, 11)
#       self._state += new_card
#     else:
        if action < 0 or action > 4:
            raise ValueError('`action` should be between 0 and 4.')
        self.curr_piece.y += 1
        self.grid = create_grid(self.locked_blocks)
        if not (valid_space(self.grid, self.curr_piece)) and (self.curr_piece.y > 0):
            self.curr_piece.y -= 1
            self.change_piece = True
        
        self.grid = create_grid(self.locked_blocks)
        key_choice = action #(self.grid, self.curr_piece)
        if key_choice == LEFT_KEY:
            self.curr_piece.x -= 1
            if not (valid_space(self.grid, self.curr_piece)):
                self.curr_piece.x += 1
        if key_choice == RIGHT_KEY:
            curr_piece.x += 1
            if not (valid_space(self.grid, self.curr_piece)):
                self.curr_piece.x -= 1
        if key_choice == DOWN_KEY:
            while valid_space(self.grid, self.curr_piece):
                self.curr_piece.y += 1
            self.curr_piece.y -= 1
        if key_choice == UP_KEY:
            self.curr_piece.rotation += 1 
            self.curr_piece.rotation %= len(self.curr_piece.shape)
            if not (valid_space(self.grid, self.curr_piece)):
                if self.curr_piece.rotation == 0:
                    self.curr_piece.rotation = len(self.curr_piece.shape) - 1
                else:
                    self.curr_piece.rotation -= 1
        # if none of the above, piece drops one row

        shape_pos = convert_shape_fmt(self.curr_piece)
        for i in range(len(shape_pos)):
            x, y = shape_pos[i]
            if y > -1:
                self.grid[y][x] = self.curr_piece.color
        if self.change_piece:
            for pos in shape_pos:
                p = (pos[0],pos[1])
                locked_blocks[p] = curr_piece.color
            self.curr_piece = next_piece
            self.next_piece = get_shape()
            self.change_piece = False
            increment, new_block_set = clear_rows(self.grid, self.locked_blocks)
            score += increment * 10
            self.locked_blocks = new_block_set

        draw_window(win, self.grid, score)
        draw_next_shape(win, next_piece)
        pygame.display.update()

        self._state = 1
        if check_lost(self.locked_blocks):
#             draw_text_middle(win, "You Lost!", 80, (255, 255, 255))
            self._state = 2
            pygame.display.update()
#             pygame.time.delay(end_wait)
#             run = False
            add_score(score)
            self._episode_ended = False
        if self._episode_ended:
            return ts.termination(np.array([self._state], dtype=np.int32), score)
        else:
            return ts.transition(
                np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)
#     return win, score, grid, locked_blocks, curr_piece, next_piece, check_lost
#     if self._episode_ended or self._state >= 21:
#       reward = self._state - 21 if self._state <= 21 else -21
#       return ts.termination(np.array([self._state], dtype=np.int32), reward)
#     else:
#       return ts.transition(
#           np.array([self._state], dtype=np.int32), reward=0.0, discount=1.0)

In [6]:
# action = np.array(0, dtype=np.int32)  # test with do nothing
# time_step = environment.reset()
# print(time_step)
# while not time_step.is_last():
#   time_step = environment.step(action)
#   print(time_step)


In [7]:
environment = Tetris()
utils.validate_py_environment(environment, episodes=5)

ValueError: Given `time_step`: TimeStep(step_type=array(0, dtype=int32), reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=array([0], dtype=int32)) does not match expected `time_step_spec`: TimeStep(step_type=ArraySpec(shape=(), dtype=dtype('int32'), name='step_type'), reward=ArraySpec(shape=(), dtype=dtype('float32'), name='reward'), discount=BoundedArraySpec(shape=(), dtype=dtype('float32'), name='discount', minimum=0.0, maximum=1.0), observation=[[(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)], [(0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0), (0, 0, 0)]])

In [15]:
get_new_card_action = np.array(0, dtype=np.int32)
end_round_action = np.array(1, dtype=np.int32)

environment = Tetris()
time_step = environment.reset()
print(time_step)
cumulative_reward = time_step.reward

for _ in range(3):
  time_step = environment.step(get_new_card_action)
  print(time_step)
  cumulative_reward += time_step.reward

time_step = environment.step(end_round_action)
print(time_step)
cumulative_reward += time_step.reward
print('Final Reward = ', cumulative_reward)

TimeStep(step_type=array(0, dtype=int32), reward=array(0., dtype=float32), discount=array(1., dtype=float32), observation=array([0], dtype=int32))


NameError: name 'LEFT_KEY' is not defined

In [11]:
pygame.display.quit()